Skip to content

Commit e0f4cff

Browse files
TEST Fix ThreadPoolMergeSchedulerStressTestIT testMergingFallsBehindAndThenCatchesUp (#131636)
This addresses an unfounded test assumption that a merge following refreshes is a noop. Refreshes might trigger a merge, but segments can be part of a single merge task at a time so it's possible that after multiple independent merges finish, if the TieredMergePolicy is invoked again it might still find yet another merge to run. Fixes #131262
1 parent 7c8d580 commit e0f4cff

File tree

2 files changed

+8
-20
lines changed

2 files changed

+8
-20
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
1717
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
1818
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
19+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.util.CollectionUtils;
2122
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -44,6 +45,7 @@
4445
import java.util.concurrent.atomic.AtomicReference;
4546

4647
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
48+
import static org.hamcrest.Matchers.equalTo;
4749
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4850
import static org.hamcrest.Matchers.instanceOf;
4951
import static org.hamcrest.Matchers.is;
@@ -272,12 +274,11 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
272274
assertThat(testEnginePlugin.enqueuedMergesSet.size(), is(0));
273275
testEnginePlugin.mergeExecutorServiceReference.get().allDone();
274276
}, 1, TimeUnit.MINUTES);
275-
var segmentsCountAfterMergingCaughtUp = getSegmentsCountForAllShards("index");
276-
// force merge should be a noop after all available merging was done
277-
assertAllSuccessful(indicesAdmin().prepareForceMerge("index").get());
278-
var segmentsCountAfterForceMerge = getSegmentsCountForAllShards("index");
279-
assertThat(segmentsCountAfterForceMerge, is(segmentsCountAfterMergingCaughtUp));
280-
// let's also run a force-merge to 1 segment
277+
// indices stats says that no merge is currently running (meaning merging did catch up)
278+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("index").setMerge(true).get();
279+
long currentMergeCount = indicesStatsResponse.getIndices().get("index").getPrimaries().merge.getCurrent();
280+
assertThat(currentMergeCount, equalTo(0L));
281+
// run a force-merge to 1 segment to make sure nothing is broken
281282
assertAllSuccessful(indicesAdmin().prepareForceMerge("index").setMaxNumSegments(1).get());
282283
assertAllSuccessful(indicesAdmin().prepareRefresh("index").get());
283284
// assert one segment per shard
@@ -292,20 +293,6 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
292293
}
293294
}
294295

295-
private int getSegmentsCountForAllShards(String indexName) {
296-
// refresh, otherwise we'd be still seeing the old merged-away segments
297-
assertAllSuccessful(indicesAdmin().prepareRefresh(indexName).get());
298-
int count = 0;
299-
IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments(indexName).get();
300-
Iterator<IndexShardSegments> indexShardSegmentsIterator = indicesSegmentResponse.getIndices().get(indexName).iterator();
301-
while (indexShardSegmentsIterator.hasNext()) {
302-
for (ShardSegments segments : indexShardSegmentsIterator.next()) {
303-
count += segments.getSegments().size();
304-
}
305-
}
306-
return count;
307-
}
308-
309296
private TestEnginePlugin getTestEnginePlugin() {
310297
return getInstanceFromNode(PluginsService.class).filterPlugins(TestEnginePlugin.class).toList().get(0);
311298
}

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
328328
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
329329
Semaphore runMergeSemaphore = new Semaphore(0);
330330
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
331+
331332
while (mergesStillToComplete > 0) {
332333
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
333334
MergeTask mergeTask = mock(MergeTask.class);

0 commit comments

Comments
 (0)