51
51
import static org .hamcrest .Matchers .equalTo ;
52
52
import static org .hamcrest .Matchers .greaterThan ;
53
53
import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
54
- import static org .hamcrest .Matchers .hasItem ;
55
54
import static org .hamcrest .Matchers .is ;
56
55
import static org .hamcrest .Matchers .lessThan ;
57
56
import static org .hamcrest .Matchers .lessThanOrEqualTo ;
@@ -327,7 +326,6 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
327
326
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor ) testThreadPool .executor (ThreadPool .Names .MERGE );
328
327
Semaphore runMergeSemaphore = new Semaphore (0 );
329
328
Set <MergeTask > currentlyRunningMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
330
- Set <MergeTask > currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections .newConcurrentSet ();
331
329
while (mergesStillToComplete > 0 ) {
332
330
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet .isEmpty () || randomBoolean ())) {
333
331
MergeTask mergeTask = mock (MergeTask .class );
@@ -345,27 +343,17 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
345
343
}).when (mergeTask ).schedule ();
346
344
doAnswer (mock -> {
347
345
currentlyRunningMergeTasksSet .add (mergeTask );
348
- currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
349
346
// wait to be signalled before completing
350
347
runMergeSemaphore .acquire ();
351
- currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
352
348
currentlyRunningMergeTasksSet .remove (mergeTask );
353
349
return null ;
354
350
}).when (mergeTask ).run ();
355
351
doAnswer (mock -> {
356
- currentlyRunningOrAbortingMergeTasksSet .add (mergeTask );
357
352
// wait to be signalled before completing
358
353
runMergeSemaphore .acquire ();
359
- currentlyRunningOrAbortingMergeTasksSet .remove (mergeTask );
360
354
return null ;
361
355
}).when (mergeTask ).abort ();
362
- assertThat (runMergeSemaphore .availablePermits (), is (0 ));
363
- boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet .size () < mergeExecutorThreadCount ;
364
- boolean mergeTaskSubmitted = threadPoolMergeExecutorService .submitMergeTask (mergeTask );
365
- assertTrue (mergeTaskSubmitted );
366
- if (isAnyExecutorAvailable ) {
367
- assertBusy (() -> assertThat (currentlyRunningOrAbortingMergeTasksSet , hasItem (mergeTask )));
368
- }
356
+ assertTrue (threadPoolMergeExecutorService .submitMergeTask (mergeTask ));
369
357
long latestIORate = threadPoolMergeExecutorService .getTargetIORateBytesPerSec ();
370
358
// all currently running merge tasks must be IO throttled to the latest IO Rate
371
359
assertBusy (() -> {
0 commit comments