Skip to content

Commit c540cb1

Browse files
Fix ThreadPoolMergeExecutorServiceTests testIORateIsAdjustedForAllRunningMergeTasks (elastic#130545) (elastic#130563)
The test submits merge tasks that support IO throttling, and asserts that all the currently running merge tasks are indeed IO throttled after the new one was submitted. The test erroneously tried to assert a property on the set of currently running merge tasks, which is very difficult to do since all merge tasks are possibly backlogged and re-enqueued asynchronously multiple times before they are run or aborted (so looking at the threadpool merge task queue there's no telling which merge task will execute first). Fixes elastic#129531
1 parent b17d583 commit c540cb1

File tree

1 file changed

+1
-13
lines changed

1 file changed

+1
-13
lines changed

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import static org.hamcrest.Matchers.equalTo;
5252
import static org.hamcrest.Matchers.greaterThan;
5353
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
54-
import static org.hamcrest.Matchers.hasItem;
5554
import static org.hamcrest.Matchers.is;
5655
import static org.hamcrest.Matchers.lessThan;
5756
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -327,7 +326,6 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
327326
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
328327
Semaphore runMergeSemaphore = new Semaphore(0);
329328
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
330-
Set<MergeTask> currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet();
331329
while (mergesStillToComplete > 0) {
332330
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
333331
MergeTask mergeTask = mock(MergeTask.class);
@@ -345,27 +343,17 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
345343
}).when(mergeTask).schedule();
346344
doAnswer(mock -> {
347345
currentlyRunningMergeTasksSet.add(mergeTask);
348-
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
349346
// wait to be signalled before completing
350347
runMergeSemaphore.acquire();
351-
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
352348
currentlyRunningMergeTasksSet.remove(mergeTask);
353349
return null;
354350
}).when(mergeTask).run();
355351
doAnswer(mock -> {
356-
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
357352
// wait to be signalled before completing
358353
runMergeSemaphore.acquire();
359-
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
360354
return null;
361355
}).when(mergeTask).abort();
362-
assertThat(runMergeSemaphore.availablePermits(), is(0));
363-
boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount;
364-
boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask);
365-
assertTrue(mergeTaskSubmitted);
366-
if (isAnyExecutorAvailable) {
367-
assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)));
368-
}
356+
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
369357
long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
370358
// all currently running merge tasks must be IO throttled to the latest IO Rate
371359
assertBusy(() -> {

0 commit comments

Comments
 (0)