From 56085e159c5b4f21d5f14fe88e875be659089cd5 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sun, 4 May 2025 10:32:05 -0400 Subject: [PATCH] Fixing Flaky ReactorNetty4StreamingStressIT test case (another attempt) Signed-off-by: Andriy Redko --- .../rest/ReactorNetty4StreamingStressIT.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java index 2e1172a4c4380..013376cb96c69 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java @@ -13,6 +13,7 @@ import org.opensearch.client.Response; import org.opensearch.client.StreamingRequest; import org.opensearch.client.StreamingResponse; +import org.opensearch.common.settings.Settings; import org.opensearch.test.rest.OpenSearchRestTestCase; import org.junit.After; @@ -22,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -44,7 +46,11 @@ public void tearDown() throws Exception { super.tearDown(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15840") + @Override + protected Settings restClientSettings() { + return Settings.builder().put(super.restClientSettings()).put(CLIENT_SOCKET_TIMEOUT, "5s").build(); + } + public void testCloseClientStreamingRequest() throws Exception { final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true); @@ -67,17 +73,19 @@ public void testCloseClientStreamingRequest() throws Exception { final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); scheduler.advanceTimeBy(delay); /* emit first element */ - StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) - .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")) - .then(() -> { - try { - client().close(); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); - } - }) + StepVerifier.create( + Flux.from(streamingResponse.getBody()).timeout(Duration.ofSeconds(5)).map(b -> new String(b.array(), StandardCharsets.UTF_8)) + ).expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")).then(() -> { + try { + client().close(); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }) .then(() -> scheduler.advanceTimeBy(delay)) - .expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException) + .expectErrorMatches( + t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException || t instanceof TimeoutException + ) .verify(Duration.ofSeconds(10)); } }