13
13
import org .opensearch .client .Response ;
14
14
import org .opensearch .client .StreamingRequest ;
15
15
import org .opensearch .client .StreamingResponse ;
16
+ import org .opensearch .common .settings .Settings ;
16
17
import org .opensearch .test .rest .OpenSearchRestTestCase ;
17
18
import org .junit .After ;
18
19
22
23
import java .nio .ByteBuffer ;
23
24
import java .nio .charset .StandardCharsets ;
24
25
import java .time .Duration ;
26
+ import java .util .concurrent .TimeoutException ;
25
27
import java .util .concurrent .atomic .AtomicInteger ;
26
28
import java .util .stream .Stream ;
27
29
@@ -44,6 +46,11 @@ public void tearDown() throws Exception {
44
46
super .tearDown ();
45
47
}
46
48
49
+ @ Override
50
+ protected Settings restClientSettings () {
51
+ return Settings .builder ().put (super .restClientSettings ()).put (CLIENT_SOCKET_TIMEOUT , "5s" ).build ();
52
+ }
53
+
47
54
public void testCloseClientStreamingRequest () throws Exception {
48
55
final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
49
56
@@ -66,17 +73,19 @@ public void testCloseClientStreamingRequest() throws Exception {
66
73
final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
67
74
scheduler .advanceTimeBy (delay ); /* emit first element */
68
75
69
- StepVerifier .create (Flux . from ( streamingResponse . getBody ()). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 )))
70
- . expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " ))
71
- .then (() -> {
72
- try {
73
- client ().close ();
74
- } catch (final IOException ex ) {
75
- throw new UncheckedIOException (ex );
76
- }
77
- })
76
+ StepVerifier .create (
77
+ Flux . from ( streamingResponse . getBody ()). timeout ( Duration . ofSeconds ( 5 )). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 ))
78
+ ). expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " )) .then (() -> {
79
+ try {
80
+ client ().close ();
81
+ } catch (final IOException ex ) {
82
+ throw new UncheckedIOException (ex );
83
+ }
84
+ })
78
85
.then (() -> scheduler .advanceTimeBy (delay ))
79
- .expectErrorMatches (t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException )
86
+ .expectErrorMatches (
87
+ t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException || t instanceof TimeoutException
88
+ )
80
89
.verify (Duration .ofSeconds (10 ));
81
90
}
82
91
}
0 commit comments