Skip to content

Commit 1c67d0b

Browse files
committed
[Refactor] LuceneChangesSnapshot to use accurate ops history
Improves the LuceneChangesSnapshot to get an accurate count of recovery operations using sort by sequence number optimization. Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
1 parent bdcaec5 commit 1c67d0b

File tree

14 files changed

+167
-66
lines changed

14 files changed

+167
-66
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ public void testShardChangesWithDefaultDocType() throws Exception {
764764
}
765765
IndexShard shard = indexService.getShard(0);
766766
try (
767-
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
767+
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean());
768768
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
769769
) {
770770
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);

server/src/main/java/org/opensearch/index/engine/Engine.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -735,8 +735,22 @@ public enum SearcherScope {
735735
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
736736
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
737737
*/
738-
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
739-
throws IOException;
738+
public abstract Translog.Snapshot newChangesSnapshot(
739+
String source,
740+
long fromSeqNo,
741+
long toSeqNo,
742+
boolean requiredFullRange,
743+
boolean accurateCount
744+
) throws IOException;
745+
746+
/**
747+
* Counts the number of history operations in the given sequence number range
748+
* @param source source of the request
749+
* @param fromSeqNo from sequence number; included
750+
* @param toSeqNumber to sequence number; included
751+
* @return number of history operations
752+
*/
753+
public abstract int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException;
740754

741755
public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);
742756

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2772,7 +2772,13 @@ long getNumDocUpdates() {
27722772
}
27732773

27742774
@Override
2775-
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
2775+
public Translog.Snapshot newChangesSnapshot(
2776+
String source,
2777+
long fromSeqNo,
2778+
long toSeqNo,
2779+
boolean requiredFullRange,
2780+
boolean accurateCount
2781+
) throws IOException {
27762782
ensureOpen();
27772783
refreshIfNeeded(source, toSeqNo);
27782784
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
@@ -2782,7 +2788,8 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
27822788
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
27832789
fromSeqNo,
27842790
toSeqNo,
2785-
requiredFullRange
2791+
requiredFullRange,
2792+
accurateCount
27862793
);
27872794
searcher = null;
27882795
return snapshot;
@@ -2798,6 +2805,21 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
27982805
}
27992806
}
28002807

2808+
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
2809+
ensureOpen();
2810+
refreshIfNeeded(source, toSeqNo);
2811+
try (Searcher s = acquireSearcher(source, SearcherScope.INTERNAL)) {
2812+
return LuceneChangesSnapshot.countNumberOfHistoryOperations(s, fromSeqNo, toSeqNo);
2813+
} catch (Exception e) {
2814+
try {
2815+
maybeFailEngine(source, e);
2816+
} catch (Exception innerException) {
2817+
e.addSuppressed(innerException);
2818+
}
2819+
throw e;
2820+
}
2821+
}
2822+
28012823
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
28022824
return getMinRetainedSeqNo() <= startingSeqNo;
28032825
}

server/src/main/java/org/opensearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,19 @@
3838
import org.apache.lucene.index.NumericDocValues;
3939
import org.apache.lucene.search.BooleanClause;
4040
import org.apache.lucene.search.BooleanQuery;
41-
import org.apache.lucene.search.DocValuesFieldExistsQuery;
41+
import org.apache.lucene.search.FieldDoc;
4242
import org.apache.lucene.search.IndexSearcher;
4343
import org.apache.lucene.search.Query;
4444
import org.apache.lucene.search.ScoreDoc;
4545
import org.apache.lucene.search.Sort;
4646
import org.apache.lucene.search.SortField;
4747
import org.apache.lucene.search.TopDocs;
48+
import org.apache.lucene.search.TopFieldCollector;
4849
import org.apache.lucene.util.ArrayUtil;
50+
import org.opensearch.Version;
4951
import org.opensearch.common.bytes.BytesReference;
5052
import org.opensearch.common.lucene.Lucene;
53+
import org.opensearch.common.lucene.search.Queries;
5154
import org.opensearch.core.internal.io.IOUtils;
5255
import org.opensearch.index.fieldvisitor.FieldsVisitor;
5356
import org.opensearch.index.mapper.SeqNoFieldMapper;
@@ -88,8 +91,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
8891
* @param toSeqNo the maximum requesting seq# - inclusive
8992
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
9093
*/
91-
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
92-
throws IOException {
94+
LuceneChangesSnapshot(
95+
Engine.Searcher engineSearcher,
96+
int searchBatchSize,
97+
long fromSeqNo,
98+
long toSeqNo,
99+
boolean requiredFullRange,
100+
boolean accurateCount
101+
) throws IOException {
93102
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
94103
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
95104
}
@@ -111,7 +120,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
111120
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
112121
this.indexSearcher.setQueryCache(null);
113122
this.parallelArray = new ParallelArray(this.searchBatchSize);
114-
final TopDocs topDocs = searchOperations(null);
123+
final TopDocs topDocs = searchOperations(null, accurateCount);
115124
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
116125
this.scoreDocs = topDocs.scoreDocs;
117126
fillParallelArray(scoreDocs, parallelArray);
@@ -187,7 +196,7 @@ private int nextDocIndex() throws IOException {
187196
// we have processed all docs in the current search - fetch the next batch
188197
if (docIndex == scoreDocs.length && docIndex > 0) {
189198
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
190-
scoreDocs = searchOperations(prev).scoreDocs;
199+
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
191200
fillParallelArray(scoreDocs, parallelArray);
192201
docIndex = 0;
193202
}
@@ -236,16 +245,31 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
236245
}
237246
}
238247

239-
private TopDocs searchOperations(ScoreDoc after) throws IOException {
240-
final Query rangeQuery = new BooleanQuery.Builder().add(
241-
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo),
242-
BooleanClause.Occur.MUST
243-
)
244-
// exclude non-root nested documents
245-
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
248+
private static Query operationsRangeQuery(long fromSeqNo, long toSeqNo) {
249+
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
250+
.add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST) // exclude non-root nested docs
246251
.build();
252+
}
253+
254+
static int countNumberOfHistoryOperations(Engine.Searcher searcher, long fromSeqNo, long toSeqNo) throws IOException {
255+
if (fromSeqNo > toSeqNo || fromSeqNo < 0 || toSeqNo < 0) {
256+
throw new IllegalArgumentException("Invalid sequence range; fromSeqNo [" + fromSeqNo + "] toSeqNo [" + toSeqNo + "]");
257+
}
258+
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
259+
return indexSearcher.count(operationsRangeQuery(fromSeqNo, toSeqNo));
260+
}
261+
262+
private TopDocs searchOperations(FieldDoc after, boolean accurate) throws IOException {
263+
final Query rangeQuery = operationsRangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
247264
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
248-
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
265+
final TopFieldCollector topFieldCollector = TopFieldCollector.create(
266+
sortedBySeqNo,
267+
searchBatchSize,
268+
after,
269+
accurate ? Integer.MAX_VALUE : 0
270+
);
271+
indexSearcher.search(rangeQuery, topFieldCollector);
272+
return topFieldCollector.topDocs();
249273
}
250274

251275
private Translog.Operation readDocAsOp(int docIndex) throws IOException {

server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,23 @@ public Closeable acquireHistoryRetentionLock() {
325325
}
326326

327327
@Override
328-
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
328+
public Translog.Snapshot newChangesSnapshot(
329+
String source,
330+
long fromSeqNo,
331+
long toSeqNo,
332+
boolean requiredFullRange,
333+
boolean accurateCount
334+
) {
329335
return newEmptySnapshot();
330336
}
331337

338+
@Override
339+
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
340+
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) {
341+
return snapshot.totalOperations();
342+
}
343+
}
344+
332345
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
333346
// we can do operation-based recovery if we don't have to replay any operation.
334347
return startingSeqNo > seqNoStats.getMaxSeqNo();

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2230,16 +2230,6 @@ public Closeable acquireHistoryRetentionLock() {
22302230
return getEngine().acquireHistoryRetentionLock();
22312231
}
22322232

2233-
/**
2234-
*
2235-
* Creates a new history snapshot for reading operations since
2236-
* the provided starting seqno (inclusive) and ending seqno (inclusive)
2237-
* The returned snapshot can be retrieved from either Lucene index or translog files.
2238-
*/
2239-
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
2240-
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true);
2241-
}
2242-
22432233
/**
22442234
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
22452235
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
@@ -2257,6 +2247,17 @@ public long getMinRetainedSeqNo() {
22572247
return getEngine().getMinRetainedSeqNo();
22582248
}
22592249

2250+
/**
2251+
* Counts the number of history operations within the provided sequence numbers
2252+
* @param source source of the requester (e.g., peer-recovery)
2253+
* @param fromSeqNo from sequence number, included
2254+
* @param toSeqNo to sequence number, included
2255+
* @return number of history operations in the sequence number range
2256+
*/
2257+
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
2258+
return getEngine().countNumberOfHistoryOperations(source, fromSeqNo, toSeqNo);
2259+
}
2260+
22602261
/**
22612262
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
22622263
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
@@ -2268,8 +2269,14 @@ public long getMinRetainedSeqNo() {
22682269
* if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing.
22692270
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
22702271
*/
2271-
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
2272-
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange);
2272+
public Translog.Snapshot newChangesSnapshot(
2273+
String source,
2274+
long fromSeqNo,
2275+
long toSeqNo,
2276+
boolean requiredFullRange,
2277+
boolean accurateCount
2278+
) throws IOException {
2279+
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, accurateCount);
22732280
}
22742281

22752282
public List<Segment> segments(boolean verbose) {

server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
104104
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
105105
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
106106
// Also fail the resync early if the shard is shutting down
107-
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
107+
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true);
108108
final Translog.Snapshot originalSnapshot = snapshot;
109109
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
110110
@Override

server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public class RecoverySourceHandler {
132132
private final CancellableThreads cancellableThreads = new CancellableThreads();
133133
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
134134
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
135-
private static final String PEER_RECOVERY_NAME = "peer-recovery";
135+
public static final String PEER_RECOVERY_NAME = "peer-recovery";
136136

137137
public RecoverySourceHandler(
138138
IndexShard shard,
@@ -272,7 +272,7 @@ && isTargetSameHistory()
272272
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
273273

274274
try {
275-
final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
275+
final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo);
276276
final Releasable releaseStore = acquireStore(shard.store());
277277
resources.add(releaseStore);
278278
sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> {
@@ -319,7 +319,7 @@ && isTargetSameHistory()
319319
sendFileStep.whenComplete(r -> {
320320
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
321321
// For a sequence based recovery, the target can keep its local translog
322-
prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
322+
prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
323323
}, onFailure);
324324

325325
prepareEngineStep.whenComplete(prepareEngineTime -> {
@@ -340,9 +340,15 @@ && isTargetSameHistory()
340340

341341
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
342342
if (logger.isTraceEnabled()) {
343-
logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
343+
logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo));
344344
}
345-
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false);
345+
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(
346+
PEER_RECOVERY_NAME,
347+
startingSeqNo,
348+
Long.MAX_VALUE,
349+
false,
350+
true
351+
);
346352
resources.add(phase2Snapshot);
347353
retentionLock.close();
348354

@@ -403,10 +409,13 @@ private boolean isTargetSameHistory() {
403409
return targetHistoryUUID.equals(shard.getHistoryUUID());
404410
}
405411

406-
private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
407-
try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) {
408-
return snapshot.totalOperations();
409-
}
412+
/**
413+
* Counts the number of history operations from the starting sequence number
414+
* @param startingSeqNo the starting sequence number to count; included
415+
* @return number of history operations
416+
*/
417+
private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException {
418+
return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE);
410419
}
411420

412421
static void runUnderPrimaryPermit(

server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,11 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe
344344

345345
private boolean hasUncommittedOperations() throws IOException {
346346
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
347-
try (
348-
Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false)
349-
) {
350-
return snapshot.totalOperations() > 0;
351-
}
347+
return indexShard.countNumberOfHistoryOperations(
348+
RecoverySourceHandler.PEER_RECOVERY_NAME,
349+
localCheckpointOfCommit + 1,
350+
Long.MAX_VALUE
351+
) > 0;
352352
}
353353

354354
@Override

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6362,8 +6362,12 @@ public void onFailure(Exception e) {
63626362
@Override
63636363
protected void doRun() throws Exception {
63646364
latch.await();
6365-
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true);
6366-
changes.close();
6365+
if (randomBoolean()) {
6366+
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean());
6367+
changes.close();
6368+
} else {
6369+
engine.countNumberOfHistoryOperations("test", min, max);
6370+
}
63676371
}
63686372
});
63696373
snapshotThreads[i].start();

0 commit comments

Comments
 (0)