Skip to content

Commit 0ac0f3f

Browse files
committed
ZOOKEEPER-4882: Fix data loss after rejoin and restart of a node experienced temporary disk error
The cause is multifold: 1. Leader will commit a proposal once quorum acked. 2. Proposal is able to be committed in node's memory even if it has not been written to that node's disk. 3. In case of disk error, the txn log could lag behind memory database. This way, node experienced temporary disk error will have hole in its txn log after re-join. Once restarted, data will loss. This commit complains the lag so to reload disk database to memory. This way, the node will not be able to become leader and sync missing txns from leader. Refs: ZOOKEEPER-4882, ZOOKEEPER-4925
1 parent e5dd60b commit 0ac0f3f

File tree

8 files changed

+318
-13
lines changed

8 files changed

+318
-13
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,16 @@ public long loadDataBase() throws IOException {
299299
/**
300300
* Fast-forward the database adding transactions from the committed log into memory.
301301
* @return the last valid zxid.
302-
* @throws IOException
302+
* @throws IOException IO or inconsistent database error
303303
*/
304304
public long fastForwardDataBase() throws IOException {
305+
long lastLoggedZxid = snapLog.getLastLoggedZxid();
306+
if (lastLoggedZxid < dataTree.lastProcessedZxid) {
307+
String msg = String.format("memory database(zxid: 0x%s) is ahead of disk(zxid: 0x%s)",
308+
Long.toHexString(dataTree.lastProcessedZxid),
309+
Long.toHexString(lastLoggedZxid));
310+
throw new IOException(msg);
311+
}
305312
long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
306313
initialized = true;
307314
return zxid;

zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,7 @@ public final synchronized void shutdown(boolean fullyShutDown) {
935935
// This will fast-forward the database to the last recorded transaction
936936
zkDb.fastForwardDataBase();
937937
} catch (IOException e) {
938-
LOG.error("Error updating DB", e);
938+
LOG.error("Failed to update memory database, will clear it to avoid inconsistency", e);
939939
fullyShutDown = true;
940940
}
941941
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public class FileTxnLog implements TxnLog, Closeable {
147147
}
148148

149149
long lastZxidSeen;
150+
long lastZxidFlushed;
150151
volatile BufferedOutputStream logStream = null;
151152
volatile OutputArchive oa;
152153
volatile FileOutputStream fos = null;
@@ -366,7 +367,13 @@ public static File[] getLogFiles(File[] logDirList, long snapshotZxid) {
366367
* get the last zxid that was logged in the transaction logs
367368
* @return the last zxid logged in the transaction logs
368369
*/
369-
public long getLastLoggedZxid() {
370+
@Override
371+
public long getLastLoggedZxid() throws IOException {
372+
long lastFlushedZxid = getLastFlushedZxid();
373+
if (lastFlushedZxid > 0) {
374+
return lastFlushedZxid;
375+
}
376+
370377
File[] files = getLogFiles(logDir.listFiles(), 0);
371378
long maxLog = files.length > 0 ? Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;
372379

@@ -381,8 +388,6 @@ public long getLastLoggedZxid() {
381388
TxnHeader hdr = itr.getHeader();
382389
zxid = hdr.getZxid();
383390
}
384-
} catch (IOException e) {
385-
LOG.warn("Unexpected exception", e);
386391
}
387392
return zxid;
388393
}
@@ -427,6 +432,7 @@ public synchronized void commit() throws IOException {
427432
ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS);
428433
}
429434
}
435+
lastZxidFlushed = lastZxidSeen;
430436
while (streamsToFlush.size() > 1) {
431437
streamsToFlush.poll().close();
432438
}
@@ -442,6 +448,10 @@ public synchronized void commit() throws IOException {
442448
}
443449
}
444450

451+
private synchronized long getLastFlushedZxid() {
452+
return lastZxidFlushed;
453+
}
454+
445455
/**
446456
*
447457
* @return elapsed sync time of transaction log in milliseconds
@@ -494,8 +504,13 @@ public boolean truncate(long zxid) throws IOException {
494504
while (itr.goToNextLog()) {
495505
if (!itr.logFile.delete()) {
496506
LOG.warn("Unable to truncate {}", itr.logFile);
507+
throw new IOException("Unable to truncate " + itr.logFile);
497508
}
498509
}
510+
synchronized (this) {
511+
lastZxidSeen = zxid;
512+
lastZxidFlushed = zxid;
513+
}
499514
}
500515
return true;
501516
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,10 @@ public void processTransaction(
457457
* the last logged zxid on the transaction logs
458458
* @return the last logged zxid
459459
*/
460-
public long getLastLoggedZxid() {
461-
FileTxnLog txnLog = new FileTxnLog(dataDir);
462-
return txnLog.getLastLoggedZxid();
460+
public long getLastLoggedZxid() throws IOException {
461+
SnapshotInfo snapshotInfo = snapLog.getLastSnapshotInfo();
462+
long lastSnapZxid = snapshotInfo == null ? -1 : snapshotInfo.zxid;
463+
return Long.max(lastSnapZxid, txnLog.getLastLoggedZxid());
463464
}
464465

465466
/**

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2263,7 +2263,8 @@ public void setZKDatabase(ZKDatabase database) {
22632263
this.zkDb = database;
22642264
}
22652265

2266-
protected ZKDatabase getZkDb() {
2266+
// @VisibleForTesting
2267+
public ZKDatabase getZkDb() {
22672268
return zkDb;
22682269
}
22692270

zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public void commit(final Request request) {
293293
private void logEpochsAndLastLoggedTxnForAllServers() throws Exception {
294294
for (int i = 0; i < SERVER_COUNT; i++) {
295295
final QuorumPeer qp = mt[i].getQuorumPeer();
296-
if (qp != null) {
296+
if (qp != null && qp.getZkDb().isInitialized()) {
297297
LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s",
298298
qp.getMyId(), qp.getAcceptedEpoch(),
299299
qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid())));

0 commit comments

Comments
 (0)