Skip to content

Commit ef8ffa3

Browse files
committed
[Streaming Indexing] Enhance RestAction with request / response streaming support
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent 09c2c7d commit ef8ffa3

21 files changed

+810
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
1313
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))
1414
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
15+
- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772))
1516

1617
### Dependencies
1718
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

libs/core/src/main/java/org/opensearch/core/xcontent/XContentBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
*/
6565
@PublicApi(since = "1.0.0")
6666
public final class XContentBuilder implements Closeable, Flushable {
67+
public static final XContentBuilder NO_CONTENT = new XContentBuilder();
6768

6869
/**
6970
* Create a new {@link XContentBuilder} using the given {@link XContent} content.
@@ -230,6 +231,14 @@ public XContentBuilder(XContent xContent, OutputStream os, Set<String> includes,
230231
this.generator = xContent.createGenerator(bos, includes, excludes);
231232
}
232233

234+
/**
235+
* Creates an instance of the {@link XContentBuilder} that has no content
236+
*/
237+
private XContentBuilder() {
238+
this.bos = null;
239+
this.generator = null;
240+
}
241+
233242
public MediaType contentType() {
234243
return generator.contentType();
235244
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
13+
import io.netty.handler.codec.http.HttpContent;
14+
import org.reactivestreams.Publisher;
15+
16+
/**
17+
* The generic interface for chunked {@link HttpContent} producers (response streaming).
18+
*/
19+
interface HttpContentSender extends Publisher<HttpContent> {
20+
void send(HttpContent content, ActionListener<Void> listener, boolean isLast);
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.core.common.bytes.BytesReference;
12+
import org.opensearch.http.HttpChunk;
13+
import org.opensearch.transport.reactor.netty4.Netty4Utils;
14+
15+
import java.util.concurrent.atomic.AtomicBoolean;
16+
17+
import io.netty.buffer.ByteBuf;
18+
19+
class ReactorNetty4HttpChunk implements HttpChunk {
20+
private final AtomicBoolean released;
21+
private final boolean pooled;
22+
private final ByteBuf content;
23+
private final boolean last;
24+
25+
ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
26+
this(new AtomicBoolean(false), true, content, last);
27+
}
28+
29+
private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) {
30+
this.content = content;
31+
this.pooled = pooled;
32+
this.released = released;
33+
this.last = last;
34+
}
35+
36+
@Override
37+
public BytesReference content() {
38+
assert released.get() == false;
39+
return Netty4Utils.toBytesReference(content);
40+
}
41+
42+
@Override
43+
public void release() {
44+
if (pooled && released.compareAndSet(false, true)) {
45+
content.release();
46+
}
47+
}
48+
49+
@Override
50+
public boolean isLast() {
51+
return last;
52+
}
53+
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.opensearch.http.HttpServerChannel;
2727
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
2828
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
29+
import org.opensearch.rest.RestHandler;
30+
import org.opensearch.rest.RestRequest.Method;
2931
import org.opensearch.telemetry.tracing.Tracer;
3032
import org.opensearch.threadpool.ThreadPool;
3133
import org.opensearch.transport.reactor.SharedGroupFactory;
@@ -40,6 +42,7 @@
4042
import java.time.Duration;
4143
import java.util.Arrays;
4244
import java.util.List;
45+
import java.util.Optional;
4346

4447
import io.netty.buffer.ByteBufAllocator;
4548
import io.netty.channel.ChannelOption;
@@ -351,24 +354,46 @@ public List<String> protocols() {
351354
* @return response publisher
352355
*/
353356
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
354-
final NonStreamingRequestConsumer<HttpContent> consumer = new NonStreamingRequestConsumer<>(
355-
this,
356-
request,
357-
response,
358-
maxCompositeBufferComponents
357+
final Method method = HttpConversionUtil.convertMethod(request.method());
358+
final Optional<RestHandler> dispatchHandlerOpt = dispatcher.dispatchHandler(
359+
request.uri(),
360+
request.fullPath(),
361+
method,
362+
request.params()
359363
);
364+
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
365+
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
366+
this,
367+
request,
368+
response
369+
);
370+
371+
request.receiveContent()
372+
.switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT))
373+
.subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT));
374+
consumer.start();
375+
376+
return response.sendObject(consumer);
377+
} else {
378+
final ReactorNetty4NonStreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4NonStreamingRequestConsumer<>(
379+
this,
380+
request,
381+
response,
382+
maxCompositeBufferComponents
383+
);
360384

361-
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);
362-
363-
return Mono.from(consumer).flatMap(hc -> {
364-
final FullHttpResponse r = (FullHttpResponse) hc;
365-
response.status(r.status());
366-
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
367-
response.chunkedTransfer(false);
368-
response.compression(true);
369-
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
370-
return Mono.from(response.sendObject(r.content()));
371-
});
385+
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);
386+
387+
return Mono.from(consumer).flatMap(hc -> {
388+
final FullHttpResponse r = (FullHttpResponse) hc;
389+
response.status(r.status());
390+
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
391+
response.chunkedTransfer(false);
392+
response.compression(true);
393+
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
394+
return Mono.from(response.sendObject(r.content()));
395+
});
396+
}
372397
}
373398

374399
/**

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java renamed to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import reactor.netty.http.server.HttpServerRequest;
2424
import reactor.netty.http.server.HttpServerResponse;
2525

26-
class NonStreamingHttpChannel implements HttpChannel {
26+
class ReactorNetty4NonStreamingHttpChannel implements HttpChannel {
2727
private final HttpServerRequest request;
2828
private final HttpServerResponse response;
2929
private final CompletableContext<Void> closeContext = new CompletableContext<>();
3030
private final FluxSink<HttpContent> emitter;
3131

32-
NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
32+
ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
3333
this.request = request;
3434
this.response = response;
3535
this.emitter = emitter;
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import reactor.netty.http.server.HttpServerRequest;
2626
import reactor.netty.http.server.HttpServerResponse;
2727

28-
class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
28+
class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
2929
private final HttpServerRequest request;
3030
private final HttpServerResponse response;
3131
private final CompositeByteBuf content;
@@ -34,7 +34,7 @@ class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>,
3434
private final AtomicBoolean disposed = new AtomicBoolean(false);
3535
private volatile FluxSink<HttpContent> emitter;
3636

37-
NonStreamingRequestConsumer(
37+
ReactorNetty4NonStreamingRequestConsumer(
3838
AbstractHttpServerTransport transport,
3939
HttpServerRequest request,
4040
HttpServerResponse response,
@@ -64,12 +64,12 @@ public void accept(T message) {
6464
}
6565
}
6666

67-
public void process(HttpContent in, FluxSink<HttpContent> emitter) {
67+
void process(HttpContent in, FluxSink<HttpContent> emitter) {
6868
// Consume request body in full before dispatching it
6969
content.addComponent(true, in.content().retain());
7070

7171
if (in instanceof LastHttpContent) {
72-
final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter);
72+
final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter);
7373
final HttpRequest r = createRequest(request, content);
7474

7575
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.common.concurrent.CompletableContext;
12+
import org.opensearch.core.action.ActionListener;
13+
import org.opensearch.core.common.bytes.BytesReference;
14+
import org.opensearch.http.HttpChunk;
15+
import org.opensearch.http.HttpResponse;
16+
import org.opensearch.http.StreamingHttpChannel;
17+
import org.opensearch.transport.reactor.netty4.Netty4Utils;
18+
19+
import java.net.InetSocketAddress;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
import io.netty.buffer.Unpooled;
24+
import io.netty.handler.codec.http.DefaultHttpContent;
25+
import io.netty.handler.codec.http.FullHttpResponse;
26+
import io.netty.handler.codec.http.HttpContent;
27+
import org.reactivestreams.Publisher;
28+
import org.reactivestreams.Subscriber;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.FluxSink;
31+
import reactor.netty.http.server.HttpServerRequest;
32+
import reactor.netty.http.server.HttpServerResponse;
33+
34+
class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel {
35+
private final HttpServerRequest request;
36+
private final HttpServerResponse response;
37+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
38+
private final Publisher<HttpChunk> receiver;
39+
private final HttpContentSender sender;
40+
private volatile FluxSink<HttpChunk> producer;
41+
private volatile boolean lastChunkReceived = false;
42+
43+
ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) {
44+
this.request = request;
45+
this.response = response;
46+
this.sender = sender;
47+
this.receiver = Flux.create(producer -> this.producer = producer);
48+
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext));
49+
}
50+
51+
@Override
52+
public boolean isOpen() {
53+
return true;
54+
}
55+
56+
@Override
57+
public void close() {
58+
request.withConnection(connection -> connection.channel().close());
59+
}
60+
61+
@Override
62+
public void addCloseListener(ActionListener<Void> listener) {
63+
closeContext.addListener(ActionListener.toBiConsumer(listener));
64+
}
65+
66+
@Override
67+
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) {
68+
sender.send(createContent(chunk), listener, chunk.isLast());
69+
}
70+
71+
@Override
72+
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
73+
sender.send(createContent(response), listener, true);
74+
}
75+
76+
@Override
77+
public void prepareResponse(int status, Map<String, List<String>> headers) {
78+
this.response.status(status);
79+
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
80+
}
81+
82+
@Override
83+
public InetSocketAddress getRemoteAddress() {
84+
return (InetSocketAddress) response.remoteAddress();
85+
}
86+
87+
@Override
88+
public InetSocketAddress getLocalAddress() {
89+
return (InetSocketAddress) response.hostAddress();
90+
}
91+
92+
@Override
93+
public void receiveChunk(HttpChunk message) {
94+
if (lastChunkReceived) {
95+
return;
96+
}
97+
98+
producer.next(message);
99+
if (message.isLast()) {
100+
lastChunkReceived = true;
101+
producer.complete();
102+
}
103+
}
104+
105+
@Override
106+
public void subscribe(Subscriber<? super HttpChunk> subscriber) {
107+
receiver.subscribe(subscriber);
108+
}
109+
110+
private static HttpContent createContent(HttpResponse response) {
111+
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response;
112+
return new DefaultHttpContent(fullHttpResponse.content());
113+
}
114+
115+
private static HttpContent createContent(HttpChunk chunk) {
116+
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content())));
117+
}
118+
}

0 commit comments

Comments
 (0)