From bbb34269e351f1f3eecfb4b0f0d7941c72a3a94c Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Wed, 30 Apr 2025 11:45:05 +0800 Subject: [PATCH 1/2] ZOOKEEPER-4925: Fix data loss due to propagation of discontinuous committedLog There are two variants of `ZooKeeperServer::processTxn`. Those two variants diverge significantly since ZOOKEEPER-3484. `processTxn(Request request)` pops outstanding change from `outstandingChanges` and adds txn to `committedLog` for follower to sync in addition to what `processTxn(TxnHeader hdr, Record txn)` does. The `Learner` uses `processTxn(TxnHeader hdr, Record txn)` to commit txn to memory after ZOOKEEPER-4394, which means it leaves `committedLog` untouched in `SYNCHRONIZATION` phase. This way, a stale follower will have hole in its `committedLog` after joining cluster. The stale follower will propagate the in memory hole to other stale nodes after becoming leader. This causes data loss. The test case fails on master and 3.9.3, and passes on 3.9.2. So only 3.9.3 is affected. This commit drops `processTxn(TxnHeader hdr, Record txn)` as `processTxn(Request request)` is capable in `SYNCHRONIZATION` phase too. Also, this commit rejects discontinuous proposals in `syncWithLeader` and `committedLog`, so to avoid possible data loss. Refs: ZOOKEEPER-4925, ZOOKEEPER-4394, ZOOKEEPER-3484 --- .../org/apache/zookeeper/server/Request.java | 13 +++ .../apache/zookeeper/server/TxnLogEntry.java | 4 + .../apache/zookeeper/server/ZKDatabase.java | 28 +++-- .../zookeeper/server/ZooKeeperServer.java | 21 ++-- .../zookeeper/server/quorum/Follower.java | 4 +- .../quorum/FollowerZooKeeperServer.java | 34 ++---- .../zookeeper/server/quorum/Learner.java | 56 +++++++--- .../zookeeper/server/quorum/Observer.java | 11 +- .../zookeeper/server/TxnLogDigestTest.java | 2 + .../zookeeper/server/ZxidRolloverTest.java | 2 + .../server/quorum/QuorumSyncTest.java | 100 ++++++++++++++++++ 11 files changed, 199 insertions(+), 76 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index c8b404e7928..8b61091719d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -78,6 +78,19 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon this.authInfo = null; } + public Request(TxnHeader hdr, Record txn, TxnDigest digest) { + this.sessionId = hdr.getClientId(); + this.cxid = hdr.getCxid(); + this.type = hdr.getType(); + this.hdr = hdr; + this.txn = txn; + this.zxid = hdr.getZxid(); + this.request = null; + this.cnxn = null; + this.authInfo = null; + this.txnDigest = digest; + } + public final long sessionId; public final int cxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java index 352eb81da90..409fd21fa91 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java @@ -47,4 +47,8 @@ public TxnHeader getHeader() { public TxnDigest getDigest() { return digest; } + + public Request toRequest() { + return new Request(header, txn, digest); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index c27ac6aa83b..2fd67da5063 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -58,6 +58,7 @@ import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; @@ -82,6 +83,8 @@ public class ZKDatabase { protected FileTxnSnapLog snapLog; protected long minCommittedLog, maxCommittedLog; + private final boolean allowDiscontinuousProposals = Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals"); + /** * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot */ @@ -170,8 +173,6 @@ public boolean isInitialized() { * data structures in zkdatabase. */ public void clear() { - minCommittedLog = 0; - maxCommittedLog = 0; /* to be safe we just create a new * datatree. */ @@ -182,6 +183,8 @@ public void clear() { try { lock.lock(); committedLog.clear(); + minCommittedLog = 0; + maxCommittedLog = 0; } finally { lock.unlock(); } @@ -320,17 +323,30 @@ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); - if (committedLog.size() > commitLogCount) { - committedLog.remove(); - minCommittedLog = committedLog.peek().getZxid(); - } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; + } else if (request.zxid <= maxCommittedLog) { + // This could happen if lastProcessedZxid is rewinded and database is re-synced. + // Currently, it only happens in test codes, but it should also be safe for production path. + return; + } else if (!allowDiscontinuousProposals + && request.zxid != maxCommittedLog + 1 + && ZxidUtils.getEpochFromZxid(request.zxid) <= ZxidUtils.getEpochFromZxid(maxCommittedLog)) { + String msg = String.format( + "Committed proposal cached out of order: 0x%s is not the next proposal of 0x%s", + ZxidUtils.zxidToString(request.zxid), + ZxidUtils.zxidToString(maxCommittedLog)); + LOG.error(msg); + throw new IllegalStateException(msg); } PureRequestProposal p = new PureRequestProposal(request); committedLog.add(p); maxCommittedLog = p.getZxid(); + if (committedLog.size() > commitLogCount) { + committedLog.remove(); + minCommittedLog = committedLog.peek().getZxid(); + } } finally { wl.unlock(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 1667efd3fbe..11ec1fb7413 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1846,13 +1846,6 @@ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader r cnxn.sendResponse(replyHeader, record, "response"); } - // entry point for quorum/Learner.java - public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { - processTxnForSessionEvents(null, hdr, txn); - return processTxnInDB(hdr, txn, null); - } - - // entry point for FinalRequestProcessor.java public ProcessTxnResult processTxn(Request request) { TxnHeader hdr = request.getHdr(); processTxnForSessionEvents(request, hdr, request.getTxn()); @@ -1864,8 +1857,10 @@ public ProcessTxnResult processTxn(Request request) { if (!writeRequest && !quorumRequest) { return new ProcessTxnResult(); } + + ProcessTxnResult rc; synchronized (outstandingChanges) { - ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); + rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. @@ -1886,13 +1881,13 @@ public ProcessTxnResult processTxn(Request request) { } } } + } - // do not add non quorum packets to the queue. - if (quorumRequest) { - getZKDatabase().addCommittedProposal(request); - } - return rc; + // do not add non quorum packets to the queue. + if (quorumRequest) { + getZKDatabase().addCommittedProposal(request); } + return rc; } private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 0eff9d24837..ca99974cb52 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -35,7 +35,6 @@ import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.SetDataTxn; -import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; /** @@ -164,7 +163,6 @@ protected void processPacket(QuorumPacket qp) throws Exception { TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData()); TxnHeader hdr = logEntry.getHeader(); Record txn = logEntry.getTxn(); - TxnDigest digest = logEntry.getDigest(); if (hdr.getZxid() != lastQueued + 1) { LOG.warn( "Got zxid 0x{} expected 0x{}", @@ -179,7 +177,7 @@ protected void processPacket(QuorumPacket qp) throws Exception { self.setLastSeenQuorumVerifier(qv, true); } - fzk.logRequest(hdr, txn, digest); + fzk.logRequest(logEntry.toRequest()); if (hdr != null) { /* * Request header is created only by the leader, so this is only set diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index b6766199988..1b0b5cd9276 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.management.JMException; -import org.apache.jute.Record; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.server.ExitCode; @@ -33,8 +32,6 @@ import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.txn.TxnDigest; -import org.apache.zookeeper.txn.TxnHeader; import org.apache.zookeeper.util.ServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,20 +76,17 @@ protected void setupRequestProcessors() { LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue<>(); - public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { - final Request request = buildRequestToProcess(hdr, txn, digest); + public void logRequest(Request request) { + if ((request.zxid & 0xffffffffL) != 0) { + pendingTxns.add(request); + } syncProcessor.processRequest(request); } /** - * Build a request for the txn and append it to the transaction log - * @param hdr the txn header - * @param txn the txn - * @param digest the digest of txn + * Append txn request to the transaction log directly without go through request processors. */ - public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException { - final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); - request.setTxnDigest(digest); + public void appendRequest(Request request) throws IOException { getZKDatabase().append(request); } @@ -188,20 +182,4 @@ protected void unregisterMetrics() { rootContext.unregisterGauge("synced_observers"); } - - /** - * Build a request for the txn - * @param hdr the txn header - * @param txn the txn - * @param digest the digest of txn - * @return a request moving through a chain of RequestProcessors - */ - private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) { - final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); - request.setTxnDigest(digest); - if ((request.zxid & 0xffffffffL) != 0) { - pendingTxns.add(request); - } - return request; - } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 1ef99e50aae..783dfbcc893 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -82,6 +82,10 @@ static class PacketInFlight { Record rec; TxnDigest digest; + Request toRequest() { + return new Request(hdr, rec, digest); + } + } QuorumPeer self; @@ -630,11 +634,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { pif.hdr = logEntry.getHeader(); pif.rec = logEntry.getTxn(); pif.digest = logEntry.getDigest(); - if (pif.hdr.getZxid() != lastQueued + 1) { - LOG.warn( - "Got zxid 0x{} expected 0x{}", + if (lastQueued == 0) { + LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid())); + } else if (pif.hdr.getZxid() != lastQueued + 1) { + if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { + String msg = String.format("DIFF sync got proposal 0x%s which is not next of last proposal 0x%s", + Long.toHexString(pif.hdr.getZxid()), + Long.toHexString(lastQueued)); + LOG.error(msg); + throw new Exception(msg); + } + // We can't tell whether it is a data loss. Given that new epoch is rare, + // log at warn should not be too verbose. + LOG.warn("DIFF sync got new epoch proposal 0x{}, last proposal 0x{}", Long.toHexString(pif.hdr.getZxid()), - Long.toHexString(lastQueued + 1)); + Long.toHexString(lastQueued)); } lastQueued = pif.hdr.getZxid(); @@ -666,7 +680,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { Long.toHexString(qp.getZxid()), Long.toHexString(pif.hdr.getZxid())); } else { - zk.processTxn(pif.hdr, pif.rec); + zk.processTxn(pif.toRequest()); packetsNotLogged.remove(); } } else { @@ -696,18 +710,27 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { packet.rec = logEntry.getTxn(); packet.hdr = logEntry.getHeader(); packet.digest = logEntry.getDigest(); - // Log warning message if txn comes out-of-order - if (packet.hdr.getZxid() != lastQueued + 1) { - LOG.warn( - "Got zxid 0x{} expected 0x{}", - Long.toHexString(packet.hdr.getZxid()), - Long.toHexString(lastQueued + 1)); + if (lastQueued == 0) { + LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(packet.hdr.getZxid())); + } else if (packet.hdr.getZxid() != lastQueued + 1) { + if (ZxidUtils.getEpochFromZxid(packet.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { + String msg = String.format("DIFF sync got proposal 0x%s which is not next of last proposal 0x%s", + Long.toHexString(packet.hdr.getZxid()), + Long.toHexString(lastQueued)); + LOG.error(msg); + throw new Exception(msg); + } + // We can't tell whether it is a data loss. Given that new epoch is rare, + // log at warn should not be too verbose. + LOG.warn("DIFF sync got new epoch proposal 0x{}, last proposal 0x{}", + Long.toHexString(packet.hdr.getZxid()), + Long.toHexString(lastQueued)); } lastQueued = packet.hdr.getZxid(); } if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot - zk.processTxn(packet.hdr, packet.rec); + zk.processTxn(packet.toRequest()); } else { packetsNotLogged.add(packet); packetsCommitted.add(qp.getZxid()); @@ -780,8 +803,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { continue; } packetsNotLogged.removeFirst(); - fzk.appendRequest(pif.hdr, pif.rec, pif.digest); - fzk.processTxn(pif.hdr, pif.rec); + fzk.appendRequest(pif.toRequest()); + fzk.processTxn(pif.toRequest()); } // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 @@ -823,7 +846,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); + fzk.logRequest(p.toRequest()); } LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size()); @@ -847,8 +870,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { continue; } packetsCommitted.remove(); - Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); - request.setTxnDigest(p.digest); + Request request = p.toRequest(); ozk.commitRequest(request); } } else { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index d3aa41b5fc7..334fa54c1bc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -202,12 +202,8 @@ protected void processPacket(QuorumPacket qp) throws Exception { case Leader.INFORM: ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1); logEntry = SerializeUtils.deserializeTxn(qp.getData()); - hdr = logEntry.getHeader(); - txn = logEntry.getTxn(); - digest = logEntry.getDigest(); - Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); + Request request = logEntry.toRequest(); request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY); - request.setTxnDigest(digest); ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk; obs.commitRequest(request); break; @@ -219,13 +215,10 @@ protected void processPacket(QuorumPacket qp) throws Exception { byte[] remainingdata = new byte[buffer.remaining()]; buffer.get(remainingdata); logEntry = SerializeUtils.deserializeTxn(remainingdata); - hdr = logEntry.getHeader(); txn = logEntry.getTxn(); - digest = logEntry.getDigest(); QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8)); - request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0); - request.setTxnDigest(digest); + request = logEntry.toRequest(); obs = (ObserverZooKeeperServer) zk; boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java index 75d6fe68000..b52ea34182a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java @@ -60,6 +60,7 @@ public class TxnLogDigestTest extends ClientBase { @BeforeEach public void setUp() throws Exception { + System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); super.setUp(); server = serverFactory.getZooKeeperServer(); zk = createClient(); @@ -67,6 +68,7 @@ public void setUp() throws Exception { @AfterEach public void tearDown() throws Exception { + System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); // server will be closed in super.tearDown super.tearDown(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java index 031ccc2f7da..b23fd80a338 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java @@ -60,6 +60,7 @@ private ZooKeeper getClient(int idx) { @BeforeEach public void setUp() throws Exception { System.setProperty("zookeeper.admin.enableServer", "false"); + System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true"); // set the snap count to something low so that we force log rollover // and verify that is working as part of the epoch rollover. @@ -215,6 +216,7 @@ private void adjustEpochNearEnd() { @AfterEach public void tearDown() throws Exception { + System.clearProperty("zookeeper.test.allowDiscontinuousProposals"); LOG.info("tearDown starting"); for (int i = 0; i < zkClients.length; i++) { zkClients[i].close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java new file mode 100644 index 00000000000..c4b7720cf92 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Comparator; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +public class QuorumSyncTest extends ZKTestCase { + private QuorumUtil qu; + + @AfterEach + public void tearDown() throws Exception { + if (qu != null) { + qu.shutdownAll(); + } + } + + @Test + public void testStaleDiffSync() throws Exception { + qu = new QuorumUtil(2); + qu.startAll(); + + int[] followerIds = qu.getFollowerQuorumPeers() + .stream() + .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed()) + .mapToInt(peer -> (int) peer.getMyId()).toArray(); + + int follower1 = followerIds[0]; + int follower2 = followerIds[1]; + + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + qu.shutdown(follower2); + + for (int i = 0; i < 10; i++) { + zk.create("/foo" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + qu.shutdown(follower1); + + for (int i = 0; i < 10; i++) { + zk.create("/bar" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + qu.restart(follower1); + } + + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower1))) { + for (int i = 0; i < 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + + for (int i = 0; i < 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + + qu.shutdown(qu.getLeaderServer()); + + qu.restart(follower2); + + try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower2))) { + for (int i = 0; i < 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + + for (int i = 0; i < 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + } +} From 8c2f5dbcf58fa828c1e943d663f96b6fad9005da Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Tue, 3 Jun 2025 12:29:14 +0800 Subject: [PATCH 2/2] Add separated code to enforce continuous proposals --- .../zookeeper/server/quorum/Learner.java | 64 ++++++++----------- 1 file changed, 28 insertions(+), 36 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 783dfbcc893..adf0ef6e510 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -539,6 +539,27 @@ protected long registerWithLeader(int pktType) throws IOException { } } + long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws Exception { + if (lastQueued == 0) { + LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid())); + } else if (pif.hdr.getZxid() != lastQueued + 1) { + if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { + String msg = String.format( + "DIFF sync got proposal 0x%s, last queued 0x%s, expected 0x%s", + Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued), + Long.toHexString(lastQueued + 1)); + LOG.error(msg); + throw new Exception(msg); + } + // We can't tell whether it is a data loss. Given that new epoch is rare, + // log at warn should not be too verbose. + LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, expected 0x{}", + Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued), + Long.toHexString(lastQueued + 1)); + } + return pif.hdr.getZxid(); + } + /** * Finally, synchronize our history with the Leader (if Follower) * or the LearnerMaster (if Observer). @@ -613,6 +634,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); + // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently + // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now. long lastQueued = 0; // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 @@ -634,23 +657,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { pif.hdr = logEntry.getHeader(); pif.rec = logEntry.getTxn(); pif.digest = logEntry.getDigest(); - if (lastQueued == 0) { - LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid())); - } else if (pif.hdr.getZxid() != lastQueued + 1) { - if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { - String msg = String.format("DIFF sync got proposal 0x%s which is not next of last proposal 0x%s", - Long.toHexString(pif.hdr.getZxid()), - Long.toHexString(lastQueued)); - LOG.error(msg); - throw new Exception(msg); - } - // We can't tell whether it is a data loss. Given that new epoch is rare, - // log at warn should not be too verbose. - LOG.warn("DIFF sync got new epoch proposal 0x{}, last proposal 0x{}", - Long.toHexString(pif.hdr.getZxid()), - Long.toHexString(lastQueued)); - } - lastQueued = pif.hdr.getZxid(); + lastQueued = enforceContinuousProposal(lastQueued, pif); if (pif.hdr.getType() == OpCode.reconfig) { SetDataTxn setDataTxn = (SetDataTxn) pif.rec; @@ -710,23 +717,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { packet.rec = logEntry.getTxn(); packet.hdr = logEntry.getHeader(); packet.digest = logEntry.getDigest(); - if (lastQueued == 0) { - LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(packet.hdr.getZxid())); - } else if (packet.hdr.getZxid() != lastQueued + 1) { - if (ZxidUtils.getEpochFromZxid(packet.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) { - String msg = String.format("DIFF sync got proposal 0x%s which is not next of last proposal 0x%s", - Long.toHexString(packet.hdr.getZxid()), - Long.toHexString(lastQueued)); - LOG.error(msg); - throw new Exception(msg); - } - // We can't tell whether it is a data loss. Given that new epoch is rare, - // log at warn should not be too verbose. - LOG.warn("DIFF sync got new epoch proposal 0x{}, last proposal 0x{}", - Long.toHexString(packet.hdr.getZxid()), - Long.toHexString(lastQueued)); - } - lastQueued = packet.hdr.getZxid(); + lastQueued = enforceContinuousProposal(lastQueued, packet); } if (!writeToTxnLog) { // Apply to db directly if we haven't taken the snapshot @@ -803,8 +794,9 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { continue; } packetsNotLogged.removeFirst(); - fzk.appendRequest(pif.toRequest()); - fzk.processTxn(pif.toRequest()); + Request request = pif.toRequest(); + fzk.appendRequest(request); + fzk.processTxn(request); } // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646