Skip to content

Commit 7ba1155

Browse files
committed
Fix QueueResizableOpenSearchThreadPoolExecutorTests
There was a race condition in testResizeQueueDown() where depending on random parameters we could submit up to 1002 tasks into an executor with a queue size of 900. That introduced a race condition where if the tasks didn't execute fast enough then a rejected execution exception could happen and fail the test. The fix is to resize down to a queue size of 1500 to ensure there is enough capacity even if all tasks are submitted before any can be executed. And finally I refactored the tests to reduce duplication of code and ensure the executor gets shutdown properly even in case of a test failure. This will avoid the spurious thread leak failure if a test case exits because of a failure. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent 6719b1f commit 7ba1155

File tree

1 file changed

+35
-107
lines changed

1 file changed

+35
-107
lines changed

server/src/test/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutorTests.java

+35-107
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import org.opensearch.common.settings.Settings;
1212
import org.opensearch.test.OpenSearchTestCase;
13+
import org.opensearch.threadpool.ThreadPool;
14+
import org.junit.After;
1315

1416
import java.util.concurrent.TimeUnit;
1517
import java.util.function.Function;
@@ -23,158 +25,84 @@
2325
* based on the time taken for each event.
2426
*/
2527
public class QueueResizableOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase {
26-
public void testResizeQueueSameSize() throws Exception {
27-
ThreadContext context = new ThreadContext(Settings.EMPTY);
28-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
28+
private QueueResizableOpenSearchThreadPoolExecutor executor;
29+
private ResizableBlockingQueue<Runnable> queue;
30+
private int measureWindow;
2931

32+
private void createExecutor(int queueSize, Function<Runnable, WrappedRunnable> runnableWrapper) {
3033
int threads = randomIntBetween(1, 10);
31-
int measureWindow = randomIntBetween(100, 200);
32-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
33-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
34+
measureWindow = randomIntBetween(100, 200);
35+
ThreadContext context = new ThreadContext(Settings.EMPTY);
36+
this.queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueSize);
37+
this.executor = new QueueResizableOpenSearchThreadPoolExecutor(
3438
"test-threadpool",
3539
threads,
3640
threads,
3741
1000,
3842
TimeUnit.MILLISECONDS,
3943
queue,
40-
fastWrapper(),
44+
runnableWrapper,
4145
OpenSearchExecutors.daemonThreadFactory("queuetest"),
4246
new OpenSearchAbortPolicy(),
4347
context
4448
);
4549
executor.prestartAllCoreThreads();
4650
logger.info("--> executor: {}", executor);
51+
}
52+
53+
@After
54+
public void stopExecutor() {
55+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
56+
}
57+
58+
public void testResizeQueueSameSize() throws Exception {
59+
createExecutor(2000, fastWrapper());
4760

4861
// Execute a task multiple times that takes 1ms
4962
assertThat(executor.resize(1000), equalTo(1000));
5063
executeTask(executor, (measureWindow * 5) + 2);
5164

52-
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(1000)); });
53-
executor.shutdown();
54-
executor.awaitTermination(10, TimeUnit.SECONDS);
65+
assertBusy(() -> assertThat(queue.capacity(), lessThanOrEqualTo(1000)));
5566
}
5667

5768
public void testResizeQueueUp() throws Exception {
58-
ThreadContext context = new ThreadContext(Settings.EMPTY);
59-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
60-
61-
int threads = randomIntBetween(1, 10);
62-
int measureWindow = randomIntBetween(100, 200);
63-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
64-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
65-
"test-threadpool",
66-
threads,
67-
threads,
68-
1000,
69-
TimeUnit.MILLISECONDS,
70-
queue,
71-
fastWrapper(),
72-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
73-
new OpenSearchAbortPolicy(),
74-
context
75-
);
76-
executor.prestartAllCoreThreads();
77-
logger.info("--> executor: {}", executor);
78-
69+
createExecutor(2000, fastWrapper());
7970
// Execute a task multiple times that takes 1ms
8071
assertThat(executor.resize(3000), equalTo(3000));
8172
executeTask(executor, (measureWindow * 5) + 2);
8273

83-
assertBusy(() -> { assertThat(queue.capacity(), greaterThanOrEqualTo(2000)); });
84-
executor.shutdown();
85-
executor.awaitTermination(10, TimeUnit.SECONDS);
74+
assertBusy(() -> assertThat(queue.capacity(), greaterThanOrEqualTo(2000)));
8675
}
8776

8877
public void testResizeQueueDown() throws Exception {
89-
ThreadContext context = new ThreadContext(Settings.EMPTY);
90-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
91-
92-
int threads = randomIntBetween(1, 10);
93-
int measureWindow = randomIntBetween(100, 200);
94-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
95-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
96-
"test-threadpool",
97-
threads,
98-
threads,
99-
1000,
100-
TimeUnit.MILLISECONDS,
101-
queue,
102-
fastWrapper(),
103-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
104-
new OpenSearchAbortPolicy(),
105-
context
106-
);
107-
executor.prestartAllCoreThreads();
108-
logger.info("--> executor: {}", executor);
109-
78+
createExecutor(2000, fastWrapper());
11079
// Execute a task multiple times that takes 1ms
111-
assertThat(executor.resize(900), equalTo(900));
80+
assertThat(executor.resize(1500), equalTo(1500));
11281
executeTask(executor, (measureWindow * 5) + 2);
11382

114-
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(900)); });
115-
executor.shutdown();
116-
executor.awaitTermination(10, TimeUnit.SECONDS);
83+
assertBusy(() -> assertThat(queue.capacity(), lessThanOrEqualTo(1500)));
11784
}
11885

11986
public void testExecutionEWMACalculation() throws Exception {
120-
ThreadContext context = new ThreadContext(Settings.EMPTY);
121-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
122-
123-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
124-
"test-threadpool",
125-
1,
126-
1,
127-
1000,
128-
TimeUnit.MILLISECONDS,
129-
queue,
130-
fastWrapper(),
131-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
132-
new OpenSearchAbortPolicy(),
133-
context
134-
);
135-
executor.prestartAllCoreThreads();
136-
logger.info("--> executor: {}", executor);
137-
87+
createExecutor(100, fastWrapper());
13888
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
13989
executeTask(executor, 1);
140-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); });
90+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)));
14191
executeTask(executor, 1);
142-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); });
92+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)));
14393
executeTask(executor, 1);
144-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); });
94+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)));
14595
executeTask(executor, 1);
146-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); });
96+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)));
14797
executeTask(executor, 1);
148-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); });
149-
150-
executor.shutdown();
151-
executor.awaitTermination(10, TimeUnit.SECONDS);
98+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)));
15299
}
153100

154101
/** Use a runnable wrapper that simulates a task with unknown failures. */
155-
public void testExceptionThrowingTask() throws Exception {
156-
ThreadContext context = new ThreadContext(Settings.EMPTY);
157-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
158-
159-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
160-
"test-threadpool",
161-
1,
162-
1,
163-
1000,
164-
TimeUnit.MILLISECONDS,
165-
queue,
166-
exceptionalWrapper(),
167-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
168-
new OpenSearchAbortPolicy(),
169-
context
170-
);
171-
executor.prestartAllCoreThreads();
172-
logger.info("--> executor: {}", executor);
173-
102+
public void testExceptionThrowingTask() {
103+
createExecutor(100, exceptionalWrapper());
174104
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
175105
executeTask(executor, 1);
176-
executor.shutdown();
177-
executor.awaitTermination(10, TimeUnit.SECONDS);
178106
}
179107

180108
private Function<Runnable, WrappedRunnable> fastWrapper() {
@@ -198,7 +126,7 @@ private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, in
198126
}
199127
}
200128

201-
public class SettableTimedRunnable extends TimedRunnable {
129+
private static class SettableTimedRunnable extends TimedRunnable {
202130
private final long timeTaken;
203131
private final boolean testFailedOrRejected;
204132

0 commit comments

Comments
 (0)