Skip to content

Commit 8d17c8d

Browse files
authored
[Streaming Indexing] Introduce bulk HTTP API streaming flavor (opensearch-project#15381)
* [Streaming Indexing] Introduce bulk HTTP API streaming flavor Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Address code review comments Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Add more test cases Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Add more test cases Signed-off-by: Andriy Redko <andriy.redko@aiven.io> --------- Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent 23cba28 commit 8d17c8d

File tree

13 files changed

+271
-30
lines changed

13 files changed

+271
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
2727
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
2828
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
29+
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
2930
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
3031
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
3132

buildSrc/version.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ reactor_netty = 1.1.22
3737
reactor = 3.5.20
3838

3939
# client dependencies
40-
httpclient5 = 5.2.3
40+
httpclient5 = 5.3.1
4141
httpcore5 = 5.2.5
4242
httpclient = 4.5.14
4343
httpcore = 4.4.16

client/rest/licenses/httpclient5-5.2.3.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
56b53c8f4bcdaada801d311cf2ff8a24d6d96883

client/rest/src/main/java/org/opensearch/client/RestClient.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import java.util.zip.GZIPOutputStream;
115115

116116
import org.reactivestreams.Publisher;
117+
import reactor.core.publisher.Flux;
117118
import reactor.core.publisher.Mono;
118119
import reactor.core.publisher.MonoSink;
119120

@@ -416,7 +417,12 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
416417
try {
417418
final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message);
418419
if (responseOrResponseException.responseException == null) {
419-
return Mono.just(message);
420+
return Mono.just(
421+
new Message<>(
422+
message.getHead(),
423+
Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b)))
424+
)
425+
);
420426
} else {
421427
if (nodeTuple.nodes.hasNext()) {
422428
return Mono.from(streamRequest(nodeTuple, request));
@@ -431,6 +437,48 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
431437
});
432438
}
433439

440+
/**
441+
* Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence.
442+
* @param b {@link ByteBuffer} to split
443+
* @return individual chunks
444+
*/
445+
private static Collection<ByteBuffer> frame(ByteBuffer b) {
446+
final Collection<ByteBuffer> buffers = new ArrayList<>();
447+
448+
int position = b.position();
449+
while (b.hasRemaining()) {
450+
// Skip the chunk separator when it comes right at the beginning
451+
if (b.get() == '\r' && b.hasRemaining() && b.position() > 1) {
452+
if (b.get() == '\n') {
453+
final byte[] chunk = new byte[b.position() - position];
454+
455+
b.position(position);
456+
b.get(chunk);
457+
458+
// Do not copy the '\r\n' sequence
459+
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2));
460+
position = b.position();
461+
}
462+
}
463+
}
464+
465+
if (buffers.isEmpty()) {
466+
return Collections.singleton(b);
467+
}
468+
469+
// Copy last chunk
470+
if (position != b.position()) {
471+
final byte[] chunk = new byte[b.position() - position];
472+
473+
b.position(position);
474+
b.get(chunk);
475+
476+
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length));
477+
}
478+
479+
return buffers;
480+
}
481+
434482
private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
435483
throws IOException {
436484
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);

client/sniffer/licenses/httpclient5-5.2.3.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
56b53c8f4bcdaada801d311cf2ff8a24d6d96883

plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void tearDown() throws Exception {
4444
super.tearDown();
4545
}
4646

47-
public void testStreamingRequest() throws IOException {
47+
public void testStreamingRequestNoBatching() throws IOException {
4848
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);
4949

5050
final Stream<String> stream = IntStream.range(1, 6)
@@ -85,6 +85,167 @@ public void testStreamingRequest() throws IOException {
8585
assertThat(count, equalTo(5));
8686
}
8787

88+
public void testStreamingRequestOneBatchBySize() throws IOException, InterruptedException {
89+
final Stream<String> stream = IntStream.range(1, 6)
90+
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
91+
92+
final Duration delay = Duration.ofMillis(1);
93+
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
94+
"POST",
95+
"/_bulk/stream",
96+
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
97+
);
98+
streamingRequest.addParameter("refresh", "true");
99+
streamingRequest.addParameter("batch_size", "5");
100+
101+
final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
102+
103+
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
104+
.expectNextMatches(
105+
s -> s.contains("\"result\":\"created\"")
106+
&& s.contains("\"_id\":\"1\"")
107+
&& s.contains("\"result\":\"created\"")
108+
&& s.contains("\"_id\":\"2\"")
109+
&& s.contains("\"result\":\"created\"")
110+
&& s.contains("\"_id\":\"3\"")
111+
&& s.contains("\"result\":\"created\"")
112+
&& s.contains("\"_id\":\"4\"")
113+
&& s.contains("\"result\":\"created\"")
114+
&& s.contains("\"_id\":\"5\"")
115+
)
116+
.expectComplete()
117+
.verify();
118+
119+
assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
120+
assertThat(streamingResponse.getWarnings(), empty());
121+
122+
final Request request = new Request("GET", "/test-streaming/_count");
123+
final Response response = client().performRequest(request);
124+
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
125+
final Integer count = objectPath.evaluate("count");
126+
assertThat(count, equalTo(5));
127+
}
128+
129+
public void testStreamingRequestManyBatchesBySize() throws IOException {
130+
final Stream<String> stream = IntStream.range(1, 6)
131+
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
132+
133+
final Duration delay = Duration.ofMillis(1);
134+
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
135+
"POST",
136+
"/_bulk/stream",
137+
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
138+
);
139+
streamingRequest.addParameter("refresh", "true");
140+
streamingRequest.addParameter("batch_size", "3");
141+
142+
final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
143+
144+
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
145+
.expectNextMatches(
146+
s -> s.contains("\"result\":\"created\"")
147+
&& s.contains("\"_id\":\"1\"")
148+
&& s.contains("\"result\":\"created\"")
149+
&& s.contains("\"_id\":\"2\"")
150+
&& s.contains("\"result\":\"created\"")
151+
&& s.contains("\"_id\":\"3\"")
152+
)
153+
.expectNextMatches(
154+
s -> s.contains("\"result\":\"created\"")
155+
&& s.contains("\"_id\":\"4\"")
156+
&& s.contains("\"result\":\"created\"")
157+
&& s.contains("\"_id\":\"5\"")
158+
)
159+
.expectComplete()
160+
.verify();
161+
162+
assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
163+
assertThat(streamingResponse.getWarnings(), empty());
164+
165+
final Request request = new Request("GET", "/test-streaming/_count");
166+
final Response response = client().performRequest(request);
167+
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
168+
final Integer count = objectPath.evaluate("count");
169+
assertThat(count, equalTo(5));
170+
}
171+
172+
public void testStreamingRequestManyBatchesByInterval() throws IOException {
173+
final Stream<String> stream = IntStream.range(1, 6)
174+
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
175+
176+
final Duration delay = Duration.ofMillis(500);
177+
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
178+
"POST",
179+
"/_bulk/stream",
180+
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
181+
);
182+
streamingRequest.addParameter("refresh", "true");
183+
streamingRequest.addParameter("batch_interval", "5s");
184+
185+
final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
186+
187+
// We don't check for a other documents here since those may appear in any of the chunks (it is very
188+
// difficult to get the timing right). But at the end, the total number of the documents is being checked.
189+
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
190+
.expectNextMatches(
191+
s -> s.contains("\"result\":\"created\"")
192+
&& s.contains("\"_id\":\"1\"")
193+
&& s.contains("\"result\":\"created\"")
194+
&& s.contains("\"_id\":\"2\"")
195+
&& s.contains("\"result\":\"created\"")
196+
&& s.contains("\"_id\":\"3\"")
197+
&& s.contains("\"result\":\"created\"")
198+
&& s.contains("\"_id\":\"4\"")
199+
&& s.contains("\"result\":\"created\"")
200+
&& s.contains("\"_id\":\"5\"")
201+
)
202+
.expectComplete()
203+
.verify();
204+
205+
assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
206+
assertThat(streamingResponse.getWarnings(), empty());
207+
208+
final Request request = new Request("GET", "/test-streaming/_count");
209+
final Response response = client().performRequest(request);
210+
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
211+
final Integer count = objectPath.evaluate("count");
212+
assertThat(count, equalTo(5));
213+
}
214+
215+
public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException {
216+
final Stream<String> stream = IntStream.range(1, 6)
217+
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");
218+
219+
final Duration delay = Duration.ofSeconds(1);
220+
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
221+
"POST",
222+
"/_bulk/stream",
223+
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
224+
);
225+
streamingRequest.addParameter("refresh", "true");
226+
streamingRequest.addParameter("batch_interval", "3s");
227+
streamingRequest.addParameter("batch_size", "5");
228+
229+
final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
230+
231+
// We don't check for a other documents here since those may appear in any of the chunks (it is very
232+
// difficult to get the timing right). But at the end, the total number of the documents is being checked.
233+
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
234+
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
235+
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"5\""))
236+
.expectComplete()
237+
.verify();
238+
239+
assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
240+
assertThat(streamingResponse.getWarnings(), empty());
241+
242+
final Request request = new Request("GET", "/test-streaming/_count");
243+
final Response response = client().performRequest(request);
244+
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
245+
final Integer count = objectPath.evaluate("count");
246+
assertThat(count, equalTo(5));
247+
}
248+
88249
public void testStreamingBadRequest() throws IOException {
89250
final Stream<String> stream = Stream.of(
90251
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n"

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ public void receiveChunk(HttpChunk message) {
103103
}
104104
} catch (final Exception ex) {
105105
producer.error(ex);
106-
} finally {
107106
message.close();
108107
}
109108
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void subscribe(Subscriber<? super HttpContent> s) {
4444
}
4545

4646
HttpChunk createChunk(HttpContent chunk, boolean last) {
47-
return new ReactorNetty4HttpChunk(chunk.copy().content(), last);
47+
return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last);
4848
}
4949

5050
StreamingHttpChannel httpChannel() {

0 commit comments

Comments
 (0)