13
13
import org .opensearch .client .Response ;
14
14
import org .opensearch .client .StreamingRequest ;
15
15
import org .opensearch .client .StreamingResponse ;
16
+ import org .opensearch .common .settings .Settings ;
16
17
import org .opensearch .test .rest .OpenSearchRestTestCase ;
17
18
import org .junit .After ;
18
19
19
20
import java .io .IOException ;
20
21
import java .io .InterruptedIOException ;
21
22
import java .io .UncheckedIOException ;
23
+ import java .lang .management .ManagementFactory ;
24
+ import java .lang .management .ThreadMXBean ;
22
25
import java .nio .ByteBuffer ;
23
26
import java .nio .charset .StandardCharsets ;
24
27
import java .time .Duration ;
28
+ import java .util .Arrays ;
29
+ import java .util .Objects ;
30
+ import java .util .concurrent .TimeUnit ;
25
31
import java .util .concurrent .atomic .AtomicInteger ;
32
+ import java .util .function .Supplier ;
26
33
import java .util .stream .Stream ;
27
34
28
35
import reactor .core .publisher .Flux ;
@@ -44,7 +51,11 @@ public void tearDown() throws Exception {
44
51
super .tearDown ();
45
52
}
46
53
47
- @ AwaitsFix (bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15840" )
54
+ @ Override
55
+ protected Settings restClientSettings () {
56
+ return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "5s" ).build ();
57
+ }
58
+
48
59
public void testCloseClientStreamingRequest () throws Exception {
49
60
final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
50
61
@@ -67,11 +78,28 @@ public void testCloseClientStreamingRequest() throws Exception {
67
78
final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
68
79
scheduler .advanceTimeBy (delay ); /* emit first element */
69
80
81
+ final ThreadMXBean threadMXBean = ManagementFactory .getThreadMXBean ();
82
+ final Supplier <Long > captureHttpClientThreads = () -> Arrays .stream (threadMXBean .getAllThreadIds ())
83
+ .mapToObj (tid -> threadMXBean .getThreadInfo (tid ))
84
+ .filter (Objects ::nonNull )
85
+ .filter (t -> t .getThreadName ().startsWith ("httpclient-main-" ))
86
+ .count ();
87
+
70
88
StepVerifier .create (Flux .from (streamingResponse .getBody ()).map (b -> new String (b .array (), StandardCharsets .UTF_8 )))
71
89
.expectNextMatches (s -> s .contains ("\" result\" :\" created\" " ) && s .contains ("\" _id\" :\" 1\" " ))
72
90
.then (() -> {
73
91
try {
74
92
client ().close ();
93
+
94
+ // Await for the non-admin client to fully shutdown
95
+ final long await = TimeUnit .SECONDS .toNanos (5 );
96
+ final long started = System .nanoTime ();
97
+ while (captureHttpClientThreads .get () > 1 ) {
98
+ Thread .onSpinWait ();
99
+ if ((System .nanoTime () - started ) > await ) {
100
+ throw new InterruptedIOException ("The client is still shutting down" );
101
+ }
102
+ }
75
103
} catch (final IOException ex ) {
76
104
throw new UncheckedIOException (ex );
77
105
}
0 commit comments