Skip to content

Commit db179a3

Browse files
committed
[Streaming Indexing] Enhance RestAction with request / response streaming support
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent 66df930 commit db179a3

15 files changed

+483
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
1010
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
1111
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
12+
- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772))
1213

1314
### Dependencies
1415
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
13+
import io.netty.handler.codec.http.HttpContent;
14+
import org.reactivestreams.Publisher;
15+
16+
/**
17+
* The generic interface for chunked {@link HttpContent} producers (response streaming).
18+
*/
19+
interface HttpContentSender extends Publisher<HttpContent> {
20+
void send(HttpContent content, ActionListener<Void> listener, boolean isLast);
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.core.common.bytes.BytesReference;
12+
import org.opensearch.http.HttpChunk;
13+
import org.opensearch.transport.reactor.netty4.Netty4Utils;
14+
15+
import java.util.concurrent.atomic.AtomicBoolean;
16+
17+
import io.netty.buffer.ByteBuf;
18+
19+
class ReactorNetty4HttpChunk implements HttpChunk {
20+
private final AtomicBoolean released;
21+
private final boolean pooled;
22+
private final ByteBuf content;
23+
private final boolean last;
24+
25+
ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
26+
this(new AtomicBoolean(false), true, content, last);
27+
}
28+
29+
private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) {
30+
this.content = content;
31+
this.pooled = pooled;
32+
this.released = released;
33+
this.last = last;
34+
}
35+
36+
@Override
37+
public BytesReference content() {
38+
assert released.get() == false;
39+
return Netty4Utils.toBytesReference(content);
40+
}
41+
42+
@Override
43+
public void release() {
44+
if (pooled && released.compareAndSet(false, true)) {
45+
content.release();
46+
}
47+
}
48+
49+
@Override
50+
public boolean isLast() {
51+
return last;
52+
}
53+
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.opensearch.http.HttpServerChannel;
2727
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
2828
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
29+
import org.opensearch.rest.RestHandler;
30+
import org.opensearch.rest.RestRequest.Method;
2931
import org.opensearch.telemetry.tracing.Tracer;
3032
import org.opensearch.threadpool.ThreadPool;
3133
import org.opensearch.transport.reactor.SharedGroupFactory;
@@ -351,24 +353,42 @@ public List<String> protocols() {
351353
* @return response publisher
352354
*/
353355
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
354-
final NonStreamingRequestConsumer<HttpContent> consumer = new NonStreamingRequestConsumer<>(
355-
this,
356-
request,
357-
response,
358-
maxCompositeBufferComponents
359-
);
356+
final Method method = HttpConversionUtil.convertMethod(request.method());
357+
if (dispatcher.dispatchHandler(request.uri(), request.fullPath(), method, request.params())
358+
.map(RestHandler::supportsStreaming)
359+
.orElse(false)) {
360+
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
361+
this,
362+
request,
363+
response
364+
);
360365

361-
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);
362-
363-
return Mono.from(consumer).flatMap(hc -> {
364-
final FullHttpResponse r = (FullHttpResponse) hc;
365-
response.status(r.status());
366-
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
367-
response.chunkedTransfer(false);
368-
response.compression(true);
369-
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
370-
return Mono.from(response.sendObject(r.content()));
371-
});
366+
request.receiveContent()
367+
.switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT))
368+
.subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT));
369+
consumer.start();
370+
371+
return response.sendObject(consumer);
372+
} else {
373+
final ReactorNetty4NonStreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4NonStreamingRequestConsumer<>(
374+
this,
375+
request,
376+
response,
377+
maxCompositeBufferComponents
378+
);
379+
380+
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);
381+
382+
return Mono.from(consumer).flatMap(hc -> {
383+
final FullHttpResponse r = (FullHttpResponse) hc;
384+
response.status(r.status());
385+
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
386+
response.chunkedTransfer(false);
387+
response.compression(true);
388+
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
389+
return Mono.from(response.sendObject(r.content()));
390+
});
391+
}
372392
}
373393

374394
/**

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java renamed to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import reactor.netty.http.server.HttpServerRequest;
2424
import reactor.netty.http.server.HttpServerResponse;
2525

26-
class NonStreamingHttpChannel implements HttpChannel {
26+
class ReactorNetty4NonStreamingHttpChannel implements HttpChannel {
2727
private final HttpServerRequest request;
2828
private final HttpServerResponse response;
2929
private final CompletableContext<Void> closeContext = new CompletableContext<>();
3030
private final FluxSink<HttpContent> emitter;
3131

32-
NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
32+
ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
3333
this.request = request;
3434
this.response = response;
3535
this.emitter = emitter;
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import reactor.netty.http.server.HttpServerRequest;
2626
import reactor.netty.http.server.HttpServerResponse;
2727

28-
class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
28+
class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
2929
private final HttpServerRequest request;
3030
private final HttpServerResponse response;
3131
private final CompositeByteBuf content;
@@ -34,7 +34,7 @@ class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>,
3434
private final AtomicBoolean disposed = new AtomicBoolean(false);
3535
private volatile FluxSink<HttpContent> emitter;
3636

37-
NonStreamingRequestConsumer(
37+
ReactorNetty4NonStreamingRequestConsumer(
3838
AbstractHttpServerTransport transport,
3939
HttpServerRequest request,
4040
HttpServerResponse response,
@@ -64,12 +64,12 @@ public void accept(T message) {
6464
}
6565
}
6666

67-
public void process(HttpContent in, FluxSink<HttpContent> emitter) {
67+
void process(HttpContent in, FluxSink<HttpContent> emitter) {
6868
// Consume request body in full before dispatching it
6969
content.addComponent(true, in.content().retain());
7070

7171
if (in instanceof LastHttpContent) {
72-
final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter);
72+
final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter);
7373
final HttpRequest r = createRequest(request, content);
7474

7575
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.common.concurrent.CompletableContext;
12+
import org.opensearch.core.action.ActionListener;
13+
import org.opensearch.core.common.bytes.BytesReference;
14+
import org.opensearch.http.HttpChunk;
15+
import org.opensearch.http.HttpResponse;
16+
import org.opensearch.http.StreamingHttpChannel;
17+
import org.opensearch.transport.reactor.netty4.Netty4Utils;
18+
19+
import java.net.InetSocketAddress;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
import io.netty.buffer.Unpooled;
24+
import io.netty.handler.codec.http.DefaultHttpContent;
25+
import io.netty.handler.codec.http.FullHttpResponse;
26+
import io.netty.handler.codec.http.HttpContent;
27+
import org.reactivestreams.FlowAdapters;
28+
import org.reactivestreams.Publisher;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.FluxSink;
31+
import reactor.netty.http.server.HttpServerRequest;
32+
import reactor.netty.http.server.HttpServerResponse;
33+
34+
class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel {
35+
private final HttpServerRequest request;
36+
private final HttpServerResponse response;
37+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
38+
private final Publisher<HttpChunk> receiver;
39+
private final HttpContentSender sender;
40+
private volatile FluxSink<HttpChunk> producer;
41+
private volatile boolean lastChunkReceived = false;
42+
43+
ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) {
44+
this.request = request;
45+
this.response = response;
46+
this.sender = sender;
47+
this.receiver = Flux.create(producer -> this.producer = producer);
48+
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext));
49+
}
50+
51+
@Override
52+
public boolean isOpen() {
53+
return true;
54+
}
55+
56+
@Override
57+
public void close() {
58+
request.withConnection(connection -> connection.channel().close());
59+
}
60+
61+
@Override
62+
public void addCloseListener(ActionListener<Void> listener) {
63+
closeContext.addListener(ActionListener.toBiConsumer(listener));
64+
}
65+
66+
@Override
67+
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) {
68+
sender.send(createContent(chunk), listener, chunk.isLast());
69+
}
70+
71+
@Override
72+
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
73+
sender.send(createContent(response), listener, true);
74+
}
75+
76+
@Override
77+
public void prepareResponse(int status, Map<String, List<String>> headers) {
78+
this.response.status(status);
79+
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
80+
}
81+
82+
@Override
83+
public InetSocketAddress getRemoteAddress() {
84+
return (InetSocketAddress) response.remoteAddress();
85+
}
86+
87+
@Override
88+
public InetSocketAddress getLocalAddress() {
89+
return (InetSocketAddress) response.hostAddress();
90+
}
91+
92+
@Override
93+
public void receiveChunk(HttpChunk message) {
94+
if (lastChunkReceived) {
95+
return;
96+
}
97+
98+
producer.next(message);
99+
if (message.isLast()) {
100+
lastChunkReceived = true;
101+
producer.complete();
102+
}
103+
}
104+
105+
@Override
106+
public void subscribe(java.util.concurrent.Flow.Subscriber<? super HttpChunk> s) {
107+
receiver.subscribe(FlowAdapters.toSubscriber(s));
108+
}
109+
110+
private static HttpContent createContent(HttpResponse response) {
111+
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response;
112+
return new DefaultHttpContent(fullHttpResponse.content());
113+
}
114+
115+
private static HttpContent createContent(HttpChunk chunk) {
116+
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content())));
117+
}
118+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.reactor.netty4;
10+
11+
import org.opensearch.http.AbstractHttpServerTransport;
12+
import org.opensearch.http.HttpChunk;
13+
import org.opensearch.http.HttpRequest;
14+
import org.opensearch.http.StreamingHttpChannel;
15+
16+
import java.util.function.Consumer;
17+
18+
import io.netty.buffer.Unpooled;
19+
import io.netty.handler.codec.http.HttpContent;
20+
import io.netty.handler.codec.http.LastHttpContent;
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.Subscriber;
23+
import reactor.netty.http.server.HttpServerRequest;
24+
import reactor.netty.http.server.HttpServerResponse;
25+
26+
class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent> {
27+
private final AbstractHttpServerTransport transport;
28+
private final HttpServerRequest request;
29+
private final HttpContentSender sender;
30+
private final StreamingHttpChannel httpChannel;
31+
32+
ReactorNetty4StreamingRequestConsumer(AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response) {
33+
this.transport = transport;
34+
this.request = request;
35+
this.sender = new ReactorNetty4StreamingResponseProducer();
36+
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
37+
}
38+
39+
@Override
40+
public void accept(T message) {
41+
if (message instanceof LastHttpContent) {
42+
httpChannel.receiveChunk(createChunk(message, true));
43+
} else if (message instanceof HttpContent) {
44+
httpChannel.receiveChunk(createChunk(message, false));
45+
}
46+
}
47+
48+
@Override
49+
public void subscribe(Subscriber<? super HttpContent> s) {
50+
sender.subscribe(s);
51+
}
52+
53+
void start() {
54+
transport.incomingStream(createRequest(request), httpChannel);
55+
}
56+
57+
HttpRequest createRequest(HttpServerRequest request) {
58+
return new ReactorNetty4HttpRequest(request, Unpooled.EMPTY_BUFFER);
59+
}
60+
61+
HttpChunk createChunk(HttpContent chunk, boolean last) {
62+
return new ReactorNetty4HttpChunk(chunk.content(), last);
63+
}
64+
}

0 commit comments

Comments
 (0)