Skip to content

Commit 68e0a29

Browse files
committed
Fixing Flaky ReactorNetty4StreamingStressIT test case (another attempt)
Signed-off-by: Andriy Redko <drreta@gmail.com>
1 parent 99ac67e commit 68e0a29

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

client/rest/src/main/java/org/opensearch/client/RestClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,14 @@ public boolean isRunning() {
312312
return client.getStatus() == IOReactorStatus.ACTIVE;
313313
}
314314

315+
/**
316+
* check client shutdown (closed) status
317+
* @return client shutdown (closed) status
318+
*/
319+
public boolean isClosed() {
320+
return client.getStatus() == IOReactorStatus.SHUT_DOWN;
321+
}
322+
315323
/**
316324
* Sends a streaming request to the OpenSearch cluster that the client points to and returns streaming response. <strong>This is an experimental API</strong>.
317325
* @param request streaming request

plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.ByteBuffer;
2323
import java.nio.charset.StandardCharsets;
2424
import java.time.Duration;
25+
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.atomic.AtomicInteger;
2627
import java.util.stream.Stream;
2728

@@ -44,7 +45,6 @@ public void tearDown() throws Exception {
4445
super.tearDown();
4546
}
4647

47-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15840")
4848
public void testCloseClientStreamingRequest() throws Exception {
4949
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);
5050

@@ -72,12 +72,25 @@ public void testCloseClientStreamingRequest() throws Exception {
7272
.then(() -> {
7373
try {
7474
client().close();
75+
76+
final long await = TimeUnit.SECONDS.toNanos(5);
77+
final long started = System.nanoTime();
78+
while (client().isClosed() == false) {
79+
Thread.onSpinWait();
80+
if ((System.nanoTime() - started) > await) {
81+
throw new InterruptedIOException("The client is still shutting down");
82+
}
83+
}
7584
} catch (final IOException ex) {
7685
throw new UncheckedIOException(ex);
7786
}
7887
})
7988
.then(() -> scheduler.advanceTimeBy(delay))
80-
.expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException)
89+
.then(() -> scheduler.advanceTimeBy(delay))
90+
.then(() -> scheduler.advanceTimeBy(delay))
91+
.expectErrorMatches(
92+
t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException || t instanceof UncheckedIOException
93+
)
8194
.verify(Duration.ofSeconds(10));
8295
}
8396
}

0 commit comments

Comments
 (0)