Skip to content

Commit d153a7f

Browse files
Fix ThreadPoolMergeExecutorServiceTests testIORateIsAdjustedForAllRunningMergeTasks (#130545) (#130562)
The test submits merge tasks that support IO throttling, and asserts that all the currently running merge tasks are indeed IO throttled after the new one was submitted. The test erroneously tried to assert a property on the set of currently running merge tasks, which is very difficult to do since all merge tasks are possibly backlogged and re-enqueued asynchronously multiple times before they are run or aborted (so looking at the threadpool merge task queue there's no telling which merge task will execute first). Fixes #129531
1 parent dc7e696 commit d153a7f

File tree

2 files changed

+1
-16
lines changed

2 files changed

+1
-16
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,6 @@ tests:
276276
- class: org.elasticsearch.packaging.test.DockerTests
277277
method: test010Install
278278
issue: https://github.com/elastic/elasticsearch/issues/125680
279-
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceTests
280-
method: testIORateIsAdjustedForAllRunningMergeTasks
281-
issue: https://github.com/elastic/elasticsearch/issues/129531
282279
- class: org.elasticsearch.packaging.test.DockerTests
283280
method: test600Interrupt
284281
issue: https://github.com/elastic/elasticsearch/issues/128144

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import static org.hamcrest.Matchers.equalTo;
5252
import static org.hamcrest.Matchers.greaterThan;
5353
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
54-
import static org.hamcrest.Matchers.hasItem;
5554
import static org.hamcrest.Matchers.is;
5655
import static org.hamcrest.Matchers.lessThan;
5756
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -329,7 +328,6 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
329328
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
330329
Semaphore runMergeSemaphore = new Semaphore(0);
331330
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
332-
Set<MergeTask> currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet();
333331
while (mergesStillToComplete > 0) {
334332
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
335333
MergeTask mergeTask = mock(MergeTask.class);
@@ -347,27 +345,17 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
347345
}).when(mergeTask).schedule();
348346
doAnswer(mock -> {
349347
currentlyRunningMergeTasksSet.add(mergeTask);
350-
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
351348
// wait to be signalled before completing
352349
runMergeSemaphore.acquire();
353-
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
354350
currentlyRunningMergeTasksSet.remove(mergeTask);
355351
return null;
356352
}).when(mergeTask).run();
357353
doAnswer(mock -> {
358-
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
359354
// wait to be signalled before completing
360355
runMergeSemaphore.acquire();
361-
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
362356
return null;
363357
}).when(mergeTask).abort();
364-
assertThat(runMergeSemaphore.availablePermits(), is(0));
365-
boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount;
366-
boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask);
367-
assertTrue(mergeTaskSubmitted);
368-
if (isAnyExecutorAvailable) {
369-
assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)));
370-
}
358+
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
371359
long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
372360
// all currently running merge tasks must be IO throttled to the latest IO Rate
373361
assertBusy(() -> {

0 commit comments

Comments
 (0)