10
10
import org .apache .http .impl .nio .conn .PoolingNHttpClientConnectionManager ;
11
11
import org .apache .http .impl .nio .reactor .DefaultConnectingIOReactor ;
12
12
import org .apache .http .nio .reactor .IOReactorException ;
13
+ import org .elasticsearch .common .util .concurrent .DeterministicTaskQueue ;
13
14
import org .elasticsearch .core .TimeValue ;
14
15
import org .elasticsearch .test .ESTestCase ;
15
16
import org .elasticsearch .threadpool .Scheduler ;
16
17
import org .elasticsearch .threadpool .ThreadPool ;
17
- import org .junit .After ;
18
18
import org .junit .Before ;
19
19
20
20
import java .util .concurrent .CountDownLatch ;
21
21
import java .util .concurrent .TimeUnit ;
22
22
23
- import static org .elasticsearch .xpack .inference .Utils .inferenceUtilityPool ;
24
23
import static org .mockito .ArgumentMatchers .any ;
25
24
import static org .mockito .ArgumentMatchers .anyLong ;
26
25
import static org .mockito .Mockito .doAnswer ;
32
31
public class IdleConnectionEvictorTests extends ESTestCase {
33
32
34
33
private static final TimeValue TIMEOUT = new TimeValue (30 , TimeUnit .SECONDS );
35
- private ThreadPool threadPool ;
34
+ private DeterministicTaskQueue taskQueue ;
36
35
37
36
@ Before
38
37
public void init () {
39
- threadPool = createThreadPool (inferenceUtilityPool ());
40
- }
41
-
42
- @ After
43
- public void shutdown () {
44
- terminate (threadPool );
38
+ taskQueue = new DeterministicTaskQueue ();
45
39
}
46
40
47
41
public void testStart_CallsExecutorSubmit () throws IOReactorException {
@@ -87,7 +81,7 @@ public void testCloseExpiredConnections_IsCalled() throws InterruptedException {
87
81
var manager = mock (PoolingNHttpClientConnectionManager .class );
88
82
89
83
var evictor = new IdleConnectionEvictor (
90
- threadPool ,
84
+ taskQueue . getThreadPool () ,
91
85
manager ,
92
86
new TimeValue (1 , TimeUnit .NANOSECONDS ),
93
87
new TimeValue (1 , TimeUnit .NANOSECONDS )
@@ -100,7 +94,8 @@ public void testCloseExpiredConnections_IsCalled() throws InterruptedException {
100
94
return Void .TYPE ;
101
95
}).when (manager ).closeExpiredConnections ();
102
96
103
- evictor .start ();
97
+ startEvictor (evictor );
98
+
104
99
runLatch .await (TIMEOUT .getSeconds (), TimeUnit .SECONDS );
105
100
106
101
verify (manager , times (1 )).closeExpiredConnections ();
@@ -110,7 +105,7 @@ public void testCloseIdleConnections_IsCalled() throws InterruptedException {
110
105
var manager = mock (PoolingNHttpClientConnectionManager .class );
111
106
112
107
var evictor = new IdleConnectionEvictor (
113
- threadPool ,
108
+ taskQueue . getThreadPool () ,
114
109
manager ,
115
110
new TimeValue (1 , TimeUnit .NANOSECONDS ),
116
111
new TimeValue (1 , TimeUnit .NANOSECONDS )
@@ -123,40 +118,47 @@ public void testCloseIdleConnections_IsCalled() throws InterruptedException {
123
118
return Void .TYPE ;
124
119
}).when (manager ).closeIdleConnections (anyLong (), any ());
125
120
126
- evictor .start ();
121
+ startEvictor (evictor );
122
+
127
123
runLatch .await (TIMEOUT .getSeconds (), TimeUnit .SECONDS );
128
124
129
125
verify (manager , times (1 )).closeIdleConnections (anyLong (), any ());
130
126
}
131
127
132
128
public void testIsRunning_ReturnsTrue () throws IOReactorException {
133
129
var evictor = new IdleConnectionEvictor (
134
- threadPool ,
130
+ taskQueue . getThreadPool () ,
135
131
createConnectionManager (),
136
132
new TimeValue (1 , TimeUnit .SECONDS ),
137
133
new TimeValue (1 , TimeUnit .SECONDS )
138
134
);
139
135
140
- evictor .start ();
136
+ startEvictor (evictor );
137
+
141
138
assertTrue (evictor .isRunning ());
142
139
evictor .close ();
143
140
}
144
141
145
142
public void testIsRunning_ReturnsFalse () throws IOReactorException {
146
143
var evictor = new IdleConnectionEvictor (
147
- threadPool ,
144
+ taskQueue . getThreadPool () ,
148
145
createConnectionManager (),
149
146
new TimeValue (1 , TimeUnit .SECONDS ),
150
147
new TimeValue (1 , TimeUnit .SECONDS )
151
148
);
152
149
153
- evictor . start ( );
150
+ startEvictor ( evictor );
154
151
assertTrue (evictor .isRunning ());
155
152
156
153
evictor .close ();
157
154
assertFalse (evictor .isRunning ());
158
155
}
159
156
157
+ private void startEvictor (IdleConnectionEvictor evictor ) {
158
+ taskQueue .scheduleNow (evictor ::start );
159
+ taskQueue .runAllRunnableTasks ();
160
+ }
161
+
160
162
private static PoolingNHttpClientConnectionManager createConnectionManager () throws IOReactorException {
161
163
return new PoolingNHttpClientConnectionManager (new DefaultConnectingIOReactor ());
162
164
}
0 commit comments