Skip to content

Commit d7cbf15

Browse files
retagithub-actions[bot]
authored andcommitted
Enable merge on refresh and merge on commit on Opensearch (#2535)
Enables merge on refresh and merge on commit in Opensearch by way of two new index options: index.merge_on_flush.max_full_flush_merge_wait_time and index.merge_on_flush.enabled. Default merge_on_flush is disabled and wait time is 10s. Signed-off-by: Andriy Redko <andriy.redko@aiven.io> (cherry picked from commit 908682d)
1 parent d4b67dc commit d7cbf15

File tree

4 files changed

+274
-0
lines changed

4 files changed

+274
-0
lines changed

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
187187
IndexSettings.FINAL_PIPELINE,
188188
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
189189
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
190+
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
191+
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
190192

191193
// validate that built-in similarities don't get redefined
192194
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,27 @@ public final class IndexSettings {
503503
Setting.Property.IndexScope
504504
);
505505

506+
/**
507+
* Expert: sets the amount of time to wait for merges (during {@link org.apache.lucene.index.IndexWriter#commit}
508+
* or {@link org.apache.lucene.index.IndexWriter#getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...).
509+
* If this time is reached, we proceed with the commit based on segments merged up to that point. The merges are not
510+
* aborted, and will still run to completion independent of the commit or getReader call, like natural segment merges.
511+
*/
512+
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
513+
"index.merge_on_flush.max_full_flush_merge_wait_time",
514+
new TimeValue(10, TimeUnit.SECONDS),
515+
new TimeValue(0, TimeUnit.MILLISECONDS),
516+
Property.Dynamic,
517+
Property.IndexScope
518+
);
519+
520+
public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
521+
"index.merge_on_flush.enabled",
522+
false,
523+
Property.IndexScope,
524+
Property.Dynamic
525+
);
526+
506527
private final Index index;
507528
private final Version version;
508529
private final Logger logger;
@@ -584,6 +605,15 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
584605
*/
585606
private volatile int maxRegexLength;
586607

608+
/**
609+
* The max amount of time to wait for merges
610+
*/
611+
private volatile TimeValue maxFullFlushMergeWaitTime;
612+
/**
613+
* Is merge of flush enabled or not
614+
*/
615+
private volatile boolean mergeOnFlushEnabled;
616+
587617
/**
588618
* Returns the default search fields for this index.
589619
*/
@@ -696,6 +726,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
696726
mappingTotalFieldsLimit = scopedSettings.get(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING);
697727
mappingDepthLimit = scopedSettings.get(INDEX_MAPPING_DEPTH_LIMIT_SETTING);
698728
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
729+
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
730+
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
699731

700732
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
701733
scopedSettings.addSettingsUpdateConsumer(
@@ -765,6 +797,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
765797
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING, this::setMappingTotalFieldsLimit);
766798
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
767799
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
800+
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
801+
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
768802
}
769803

770804
private void setSearchIdleAfter(TimeValue searchIdleAfter) {
@@ -1328,4 +1362,20 @@ public long getMappingFieldNameLengthLimit() {
13281362
private void setMappingFieldNameLengthLimit(long value) {
13291363
this.mappingFieldNameLengthLimit = value;
13301364
}
1365+
1366+
private void setMaxFullFlushMergeWaitTime(TimeValue timeValue) {
1367+
this.maxFullFlushMergeWaitTime = timeValue;
1368+
}
1369+
1370+
private void setMergeOnFlushEnabled(boolean enabled) {
1371+
this.mergeOnFlushEnabled = enabled;
1372+
}
1373+
1374+
public TimeValue getMaxFullFlushMergeWaitTime() {
1375+
return this.maxFullFlushMergeWaitTime;
1376+
}
1377+
1378+
public boolean isMergeOnFlushEnabled() {
1379+
return mergeOnFlushEnabled;
1380+
}
13311381
}

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.lucene.index.ShuffleForcedMergePolicy;
5151
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
5252
import org.apache.lucene.index.Term;
53+
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
5354
import org.apache.lucene.search.BooleanClause;
5455
import org.apache.lucene.search.BooleanQuery;
5556
import org.apache.lucene.search.DocIdSetIterator;
@@ -2425,6 +2426,21 @@ private IndexWriterConfig getIndexWriterConfig() {
24252426
// to enable it.
24262427
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
24272428
}
2429+
2430+
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
2431+
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
2432+
if (maxFullFlushMergeWaitMillis > 0) {
2433+
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
2434+
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
2435+
} else {
2436+
logger.warn(
2437+
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
2438+
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
2439+
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
2440+
);
2441+
}
2442+
}
2443+
24282444
iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
24292445
iwc.setSimilarity(engineConfig.getSimilarity());
24302446
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,212 @@ public void testSegments() throws Exception {
494494
}
495495
}
496496

497+
public void testMergeSegmentsOnCommitIsDisabled() throws Exception {
498+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
499+
500+
final Settings.Builder settings = Settings.builder()
501+
.put(defaultSettings.getSettings())
502+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
503+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
504+
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
505+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
506+
507+
try (
508+
Store store = createStore();
509+
InternalEngine engine = createEngine(
510+
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
511+
)
512+
) {
513+
assertThat(engine.segments(false), empty());
514+
int numDocsFirstSegment = randomIntBetween(5, 50);
515+
Set<String> liveDocsFirstSegment = new HashSet<>();
516+
for (int i = 0; i < numDocsFirstSegment; i++) {
517+
String id = Integer.toString(i);
518+
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
519+
engine.index(indexForDoc(doc));
520+
liveDocsFirstSegment.add(id);
521+
}
522+
engine.refresh("test");
523+
List<Segment> segments = engine.segments(randomBoolean());
524+
assertThat(segments, hasSize(1));
525+
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
526+
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
527+
assertFalse(segments.get(0).committed);
528+
int deletes = 0;
529+
int updates = 0;
530+
int appends = 0;
531+
int iterations = scaledRandomIntBetween(1, 50);
532+
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
533+
String idToUpdate = randomFrom(liveDocsFirstSegment);
534+
liveDocsFirstSegment.remove(idToUpdate);
535+
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
536+
if (randomBoolean()) {
537+
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
538+
deletes++;
539+
} else {
540+
engine.index(indexForDoc(doc));
541+
updates++;
542+
}
543+
if (randomBoolean()) {
544+
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
545+
appends++;
546+
}
547+
}
548+
549+
boolean committed = randomBoolean();
550+
if (committed) {
551+
engine.flush();
552+
}
553+
554+
engine.refresh("test");
555+
segments = engine.segments(randomBoolean());
556+
557+
assertThat(segments, hasSize(2));
558+
assertThat(segments, hasSize(2));
559+
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
560+
assertThat(segments.get(0).getDeletedDocs(), equalTo(updates + deletes));
561+
assertThat(segments.get(0).committed, equalTo(committed));
562+
563+
assertThat(segments.get(1).getNumDocs(), equalTo(updates + appends));
564+
assertThat(segments.get(1).getDeletedDocs(), equalTo(deletes)); // delete tombstones
565+
assertThat(segments.get(1).committed, equalTo(committed));
566+
}
567+
}
568+
569+
public void testMergeSegmentsOnCommit() throws Exception {
570+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
571+
572+
final Settings.Builder settings = Settings.builder()
573+
.put(defaultSettings.getSettings())
574+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
575+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
576+
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
577+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
578+
579+
try (
580+
Store store = createStore();
581+
InternalEngine engine = createEngine(
582+
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
583+
)
584+
) {
585+
assertThat(engine.segments(false), empty());
586+
int numDocsFirstSegment = randomIntBetween(5, 50);
587+
Set<String> liveDocsFirstSegment = new HashSet<>();
588+
for (int i = 0; i < numDocsFirstSegment; i++) {
589+
String id = Integer.toString(i);
590+
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
591+
engine.index(indexForDoc(doc));
592+
liveDocsFirstSegment.add(id);
593+
}
594+
engine.refresh("test");
595+
List<Segment> segments = engine.segments(randomBoolean());
596+
assertThat(segments, hasSize(1));
597+
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
598+
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
599+
assertFalse(segments.get(0).committed);
600+
int deletes = 0;
601+
int updates = 0;
602+
int appends = 0;
603+
int iterations = scaledRandomIntBetween(1, 50);
604+
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
605+
String idToUpdate = randomFrom(liveDocsFirstSegment);
606+
liveDocsFirstSegment.remove(idToUpdate);
607+
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
608+
if (randomBoolean()) {
609+
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
610+
deletes++;
611+
} else {
612+
engine.index(indexForDoc(doc));
613+
updates++;
614+
}
615+
if (randomBoolean()) {
616+
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
617+
appends++;
618+
}
619+
}
620+
621+
boolean committed = randomBoolean();
622+
if (committed) {
623+
engine.flush();
624+
}
625+
626+
engine.refresh("test");
627+
segments = engine.segments(randomBoolean());
628+
629+
// All segments have to be merged into one
630+
assertThat(segments, hasSize(1));
631+
assertThat(segments.get(0).getNumDocs(), equalTo(numDocsFirstSegment + appends - deletes));
632+
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
633+
assertThat(segments.get(0).committed, equalTo(committed));
634+
}
635+
}
636+
637+
// this test writes documents to the engine while concurrently flushing/commit
638+
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
639+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
640+
641+
final Settings.Builder settings = Settings.builder()
642+
.put(defaultSettings.getSettings())
643+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
644+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
645+
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
646+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
647+
648+
try (
649+
Store store = createStore();
650+
InternalEngine engine = createEngine(
651+
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
652+
)
653+
) {
654+
final int numIndexingThreads = scaledRandomIntBetween(3, 8);
655+
final int numDocsPerThread = randomIntBetween(500, 1000);
656+
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
657+
final List<Thread> indexingThreads = new ArrayList<>();
658+
final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
659+
// create N indexing threads to index documents simultaneously
660+
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
661+
final int threadIdx = threadNum;
662+
Thread indexingThread = new Thread(() -> {
663+
try {
664+
barrier.await(); // wait for all threads to start at the same time
665+
// index random number of docs
666+
for (int i = 0; i < numDocsPerThread; i++) {
667+
final String id = "thread" + threadIdx + "#" + i;
668+
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
669+
engine.index(indexForDoc(doc));
670+
}
671+
} catch (Exception e) {
672+
throw new RuntimeException(e);
673+
} finally {
674+
doneLatch.countDown();
675+
}
676+
677+
});
678+
indexingThreads.add(indexingThread);
679+
}
680+
681+
// start the indexing threads
682+
for (Thread thread : indexingThreads) {
683+
thread.start();
684+
}
685+
barrier.await(); // wait for indexing threads to all be ready to start
686+
assertThat(doneLatch.await(10, TimeUnit.SECONDS), is(true));
687+
688+
boolean committed = randomBoolean();
689+
if (committed) {
690+
engine.flush();
691+
}
692+
693+
engine.refresh("test");
694+
List<Segment> segments = engine.segments(randomBoolean());
695+
696+
// All segments have to be merged into one
697+
assertThat(segments, hasSize(1));
698+
assertThat(segments.get(0).getNumDocs(), equalTo(numIndexingThreads * numDocsPerThread));
699+
assertThat(segments.get(0).committed, equalTo(committed));
700+
}
701+
}
702+
497703
public void testCommitStats() throws IOException {
498704
final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
499705
final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

0 commit comments

Comments
 (0)