diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index c8b404e7928..8b61091719d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -78,6 +78,19 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon this.authInfo = null; } + public Request(TxnHeader hdr, Record txn, TxnDigest digest) { + this.sessionId = hdr.getClientId(); + this.cxid = hdr.getCxid(); + this.type = hdr.getType(); + this.hdr = hdr; + this.txn = txn; + this.zxid = hdr.getZxid(); + this.request = null; + this.cnxn = null; + this.authInfo = null; + this.txnDigest = digest; + } + public final long sessionId; public final int cxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java index 352eb81da90..409fd21fa91 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java @@ -47,4 +47,8 @@ public TxnHeader getHeader() { public TxnDigest getDigest() { return digest; } + + public Request toRequest() { + return new Request(header, txn, digest); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index c27ac6aa83b..2fd67da5063 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -58,6 +58,7 @@ import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; @@ -82,6 +83,8 @@ public class ZKDatabase { protected FileTxnSnapLog snapLog; protected long minCommittedLog, maxCommittedLog; + private final boolean allowDiscontinuousProposals = Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals"); + /** * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot */ @@ -170,8 +173,6 @@ public boolean isInitialized() { * data structures in zkdatabase. */ public void clear() { - minCommittedLog = 0; - maxCommittedLog = 0; /* to be safe we just create a new * datatree. */ @@ -182,6 +183,8 @@ public void clear() { try { lock.lock(); committedLog.clear(); + minCommittedLog = 0; + maxCommittedLog = 0; } finally { lock.unlock(); } @@ -320,17 +323,30 @@ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); - if (committedLog.size() > commitLogCount) { - committedLog.remove(); - minCommittedLog = committedLog.peek().getZxid(); - } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; + } else if (request.zxid <= maxCommittedLog) { + // This could happen if lastProcessedZxid is rewinded and database is re-synced. + // Currently, it only happens in test codes, but it should also be safe for production path. + return; + } else if (!allowDiscontinuousProposals + && request.zxid != maxCommittedLog + 1 + && ZxidUtils.getEpochFromZxid(request.zxid) <= ZxidUtils.getEpochFromZxid(maxCommittedLog)) { + String msg = String.format( + "Committed proposal cached out of order: 0x%s is not the next proposal of 0x%s", + ZxidUtils.zxidToString(request.zxid), + ZxidUtils.zxidToString(maxCommittedLog)); + LOG.error(msg); + throw new IllegalStateException(msg); } PureRequestProposal p = new PureRequestProposal(request); committedLog.add(p); maxCommittedLog = p.getZxid(); + if (committedLog.size() > commitLogCount) { + committedLog.remove(); + minCommittedLog = committedLog.peek().getZxid(); + } } finally { wl.unlock(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 1667efd3fbe..11ec1fb7413 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1846,13 +1846,6 @@ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader r cnxn.sendResponse(replyHeader, record, "response"); } - // entry point for quorum/Learner.java - public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { - processTxnForSessionEvents(null, hdr, txn); - return processTxnInDB(hdr, txn, null); - } - - // entry point for FinalRequestProcessor.java public ProcessTxnResult processTxn(Request request) { TxnHeader hdr = request.getHdr(); processTxnForSessionEvents(request, hdr, request.getTxn()); @@ -1864,8 +1857,10 @@ public ProcessTxnResult processTxn(Request request) { if (!writeRequest && !quorumRequest) { return new ProcessTxnResult(); } + + ProcessTxnResult rc; synchronized (outstandingChanges) { - ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); + rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. @@ -1886,13 +1881,13 @@ public ProcessTxnResult processTxn(Request request) { } } } + } - // do not add non quorum packets to the queue. - if (quorumRequest) { - getZKDatabase().addCommittedProposal(request); - } - return rc; + // do not add non quorum packets to the queue. + if (quorumRequest) { + getZKDatabase().addCommittedProposal(request); } + return rc; } private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 0eff9d24837..ca99974cb52 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -35,7 +35,6 @@ import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.SetDataTxn; -import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; /** @@ -164,7 +163,6 @@ protected void processPacket(QuorumPacket qp) throws Exception { TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData()); TxnHeader hdr = logEntry.getHeader(); Record txn = logEntry.getTxn(); - TxnDigest digest = logEntry.getDigest(); if (hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", @@ -179,7 +177,7 @@ protected void processPacket(QuorumPacket qp) throws Exception { self.setLastSeenQuorumVerifier(qv, true); } - fzk.logRequest(hdr, txn, digest); + fzk.logRequest(logEntry.toRequest()); if (hdr != null) { /* * Request header is created only by the leader, so this is only set diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index b6766199988..1b0b5cd9276 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.management.JMException; -import org.apache.jute.Record; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.server.ExitCode; @@ -33,8 +32,6 @@ import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.txn.TxnDigest; -import org.apache.zookeeper.txn.TxnHeader; import org.apache.zookeeper.util.ServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,20 +76,17 @@ protected void setupRequestProcessors() { LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue<>(); - public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { - final Request request = buildRequestToProcess(hdr, txn, digest); + public void logRequest(Request request) { + if ((request.zxid & 0xffffffffL) != 0) { + pendingTxns.add(request); + } syncProcessor.processRequest(request); } /** - * Build a request for the txn and append it to the transaction log - * @param hdr the txn header - * @param txn the txn - * @param digest the digest of txn + * Append txn request to the transaction log directly without go through request processors. */ - public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException { - final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); - request.setTxnDigest(digest); + public void appendRequest(Request request) throws IOException { getZKDatabase().append(request); } @@ -188,20 +182,4 @@ protected void unregisterMetrics() { rootContext.unregisterGauge("synced_observers"); } - - /** - * Build a request for the txn - * @param hdr the txn header - * @param txn the txn - * @param digest the digest of txn - * @return a request moving through a chain of RequestProcessors - */ - private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) { - final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); - request.setTxnDigest(digest); - if ((request.zxid & 0xffffffffL) != 0) { - pendingTxns.add(request); - } - return request; - } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 1ef99e50aae..adf0ef6e510 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -82,6 +82,10 @@ static class PacketInFlight { Record rec; TxnDigest digest; + Request toRequest() { + return new Request(hdr, rec, digest); + } + } QuorumPeer self; @@ -535,6 +539,27 @@ protected long registerWithLeader(int pktType) throws IOException { } } + long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws Exception { + if (lastQueued == 0) { + LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid())); + } else if (pif.hdr.getZxid() != lastQueued + 1) { + if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { + String msg = String.format( + "DIFF sync got proposal 0x%s, last queued 0x%s, expected 0x%s", + Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued), + Long.toHexString(lastQueued + 1)); + LOG.error(msg); + throw new Exception(msg); + } + // We can't tell whether it is a data loss. Given that new epoch is rare, + // log at warn should not be too verbose. + LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, expected 0x{}", + Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued), + Long.toHexString(lastQueued + 1)); + } + return pif.hdr.getZxid(); + } + /** * Finally, synchronize our history with the Leader (if Follower) * or the LearnerMaster (if Observer). @@ -609,6 +634,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); + // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently + // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now. long lastQueued = 0; // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 @@ -630,13 +657,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { pif.hdr = logEntry.getHeader(); pif.rec = logEntry.getTxn(); pif.digest = logEntry.getDigest(); - if (pif.hdr.getZxid() != lastQueued + 1) { - LOG.warn( - "Got zxid 0x{} expected 0x{}", - Long.toHexString(pif.hdr.getZxid()), - Long.toHexString(lastQueued + 1)); - } - lastQueued = pif.hdr.getZxid(); + lastQueued = enforceContinuousProposal(lastQueued, pif); if (pif.hdr.getType() == OpCode.reconfig) { SetDataTxn setDataTxn = (SetDataTxn) pif.rec; @@ -666,7 +687,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { Long.toHexString(qp.getZxid()), Long.toHexString(pif.hdr.getZxid())); } else { - zk.processTxn(pif.hdr, pif.rec); + zk.processTxn(pif.toRequest()); packetsNotLogged.remove(); } } else { @@ -696,18 +717,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { packet.rec = logEntry.getTxn(); packet.hdr = logEntry.getHeader(); packet.digest = logEntry.getDigest(); - // Log warning message if txn comes out-of-order - if (packet.hdr.getZxid() != lastQueued + 1) { - LOG.warn( - "Got zxid 0x{} expected 0x{}", - Long.toHexString(packet.hdr.getZxid()), - Long.toHexString(lastQueued + 1)); - } - lastQueued = packet.hdr.getZxid(); + lastQueued = enforceContinuousProposal(lastQueued, packet); } if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot - zk.processTxn(packet.hdr, packet.rec); + zk.processTxn(packet.toRequest()); } else { packetsNotLogged.add(packet); packetsCommitted.add(qp.getZxid()); @@ -780,8 +794,9 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { continue; } packetsNotLogged.removeFirst(); - fzk.appendRequest(pif.hdr, pif.rec, pif.digest); - fzk.processTxn(pif.hdr, pif.rec); + Request request = pif.toRequest(); + fzk.appendRequest(request); + fzk.processTxn(request); } // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 @@ -823,7 +838,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); + fzk.logRequest(p.toRequest()); } LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size()); @@ -847,8 +862,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { continue; } packetsCommitted.remove(); - Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); - request.setTxnDigest(p.digest); + Request request = p.toRequest(); ozk.commitRequest(request); } } else { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index d3aa41b5fc7..334fa54c1bc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -202,12 +202,8 @@ protected void processPacket(QuorumPacket qp) throws Exception { case Leader.INFORM: ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1); logEntry = SerializeUtils.deserializeTxn(qp.getData()); - hdr = logEntry.getHeader(); - txn = logEntry.getTxn(); - digest = logEntry.getDigest(); - Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); + Request request = logEntry.toRequest(); request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY); - request.setTxnDigest(digest); ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk; obs.commitRequest(request); break; @@ -219,13 +215,10 @@ protected void processPacket(QuorumPacket qp) throws Exception { byte[] remainingdata = new byte[buffer.remaining()]; buffer.get(remainingdata); logEntry = SerializeUtils.deserializeTxn(remainingdata); - hdr = logEntry.getHeader(); txn = logEntry.getTxn(); - digest = logEntry.getDigest(); QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8)); - request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); - request.setTxnDigest(digest); + request = logEntry.toRequest(); obs = (ObserverZooKeeperServer) zk; boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java index 75d6fe68000..b52ea34182a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java @@ -60,6 +60,7 @@ public class TxnLogDigestTest extends ClientBase { @BeforeEach public void setUp() throws Exception { + System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); super.setUp(); server = serverFactory.getZooKeeperServer(); zk = createClient(); @@ -67,6 +68,7 @@ public void setUp() throws Exception { @AfterEach public void tearDown() throws Exception { + System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); // server will be closed in super.tearDown super.tearDown(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java index 031ccc2f7da..b23fd80a338 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java @@ -60,6 +60,7 @@ private ZooKeeper getClient(int idx) { @BeforeEach public void setUp() throws Exception { System.setProperty("zookeeper.admin.enableServer", "false"); + System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); // set the snap count to something low so that we force log rollover // and verify that is working as part of the epoch rollover. @@ -215,6 +216,7 @@ private void adjustEpochNearEnd() { @AfterEach public void tearDown() throws Exception { + System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); LOG.info("tearDown starting"); for (int i = 0; i < zkClients.length; i++) { zkClients[i].close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java new file mode 100644 index 00000000000..c4b7720cf92 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Comparator; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +public class QuorumSyncTest extends ZKTestCase { + private QuorumUtil qu; + + @AfterEach + public void tearDown() throws Exception { + if (qu != null) { + qu.shutdownAll(); + } + } + + @Test + public void testStaleDiffSync() throws Exception { + qu = new QuorumUtil(2); + qu.startAll(); + + int[] followerIds = qu.getFollowerQuorumPeers() + .stream() + .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed()) + .mapToInt(peer -> (int) peer.getMyId()).toArray(); + + int follower1 = followerIds[0]; + int follower2 = followerIds[1]; + + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + qu.shutdown(follower2); + + for (int i = 0; i < 10; i++) { + zk.create("/foo" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + qu.shutdown(follower1); + + for (int i = 0; i < 10; i++) { + zk.create("/bar" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + qu.restart(follower1); + } + + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower1))) { + for (int i = 0; i < 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + + for (int i = 0; i < 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + + qu.shutdown(qu.getLeaderServer()); + + qu.restart(follower2); + + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower2))) { + for (int i = 0; i < 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + + for (int i = 0; i < 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + } +}