13
13
import org .opensearch .client .Response ;
14
14
import org .opensearch .client .StreamingRequest ;
15
15
import org .opensearch .client .StreamingResponse ;
16
+ import org .opensearch .common .settings .Settings ;
16
17
import org .opensearch .test .rest .OpenSearchRestTestCase ;
17
18
import org .junit .After ;
18
19
22
23
import java .nio .ByteBuffer ;
23
24
import java .nio .charset .StandardCharsets ;
24
25
import java .time .Duration ;
26
+ import java .util .concurrent .TimeoutException ;
25
27
import java .util .concurrent .atomic .AtomicInteger ;
26
28
import java .util .stream .Stream ;
27
29
@@ -44,7 +46,11 @@ public void tearDown() throws Exception {
44
46
super .tearDown ();
45
47
}
46
48
47
- @ AwaitsFix (bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15840" )
49
+ @ Override
50
+ protected Settings restClientSettings () {
51
+ return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "5s" ).build ();
52
+ }
53
+
48
54
public void testCloseClientStreamingRequest () throws Exception {
49
55
final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
50
56
@@ -67,17 +73,19 @@ public void testCloseClientStreamingRequest() throws Exception {
67
73
final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
68
74
scheduler .advanceTimeBy (delay ); /* emit first element */
69
75
70
- StepVerifier .create (Flux . from ( streamingResponse . getBody ()). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 )))
71
- . expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " ))
72
- .then (() -> {
73
- try {
74
- client ().close ();
75
- } catch (final IOException ex ) {
76
- throw new UncheckedIOException (ex );
77
- }
78
- })
76
+ StepVerifier .create (
77
+ Flux . from ( streamingResponse . getBody ()). timeout ( Duration . ofSeconds ( 5 )). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 ))
78
+ ). expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " )) .then (() -> {
79
+ try {
80
+ client ().close ();
81
+ } catch (final IOException ex ) {
82
+ throw new UncheckedIOException (ex );
83
+ }
84
+ })
79
85
.then (() -> scheduler .advanceTimeBy (delay ))
80
- .expectErrorMatches (t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException )
86
+ .expectErrorMatches (
87
+ t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException || t instanceof TimeoutException
88
+ )
81
89
.verify (Duration .ofSeconds (10 ));
82
90
}
83
91
}
0 commit comments