-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Streaming Indexing] Enhance RestAction with request / response streaming support #13772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.http.HttpChunk; | ||
import org.opensearch.transport.reactor.netty4.Netty4Utils; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
|
||
class ReactorNetty4HttpChunk implements HttpChunk { | ||
private final AtomicBoolean released; | ||
private final boolean pooled; | ||
private final ByteBuf content; | ||
private final boolean last; | ||
|
||
ReactorNetty4HttpChunk(ByteBuf content, boolean last) { | ||
this.content = content; | ||
this.pooled = true; | ||
this.released = new AtomicBoolean(false); | ||
this.last = last; | ||
} | ||
|
||
@Override | ||
public BytesReference content() { | ||
assert released.get() == false; | ||
andrross marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Netty4Utils.toBytesReference(content); | ||
Check warning on line 35 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java
|
||
} | ||
|
||
@Override | ||
public void close() { | ||
if (pooled && released.compareAndSet(false, true)) { | ||
content.release(); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean isLast() { | ||
return last; | ||
Check warning on line 47 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.http.reactor.netty4; | ||
|
||
import org.opensearch.common.concurrent.CompletableContext; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.http.HttpChunk; | ||
import org.opensearch.http.HttpResponse; | ||
import org.opensearch.http.StreamingHttpChannel; | ||
import org.opensearch.transport.reactor.netty4.Netty4Utils; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import io.netty.buffer.Unpooled; | ||
import io.netty.handler.codec.http.DefaultHttpContent; | ||
import io.netty.handler.codec.http.FullHttpResponse; | ||
import io.netty.handler.codec.http.HttpContent; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.FluxSink; | ||
import reactor.netty.http.server.HttpServerRequest; | ||
import reactor.netty.http.server.HttpServerResponse; | ||
|
||
class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel { | ||
private final HttpServerRequest request; | ||
private final HttpServerResponse response; | ||
private final CompletableContext<Void> closeContext = new CompletableContext<>(); | ||
private final Publisher<HttpChunk> receiver; | ||
private final StreamingHttpContentSender sender; | ||
private volatile FluxSink<HttpChunk> producer; | ||
private volatile boolean lastChunkReceived = false; | ||
|
||
ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, StreamingHttpContentSender sender) { | ||
this.request = request; | ||
this.response = response; | ||
this.sender = sender; | ||
this.receiver = Flux.create(producer -> this.producer = producer); | ||
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); | ||
} | ||
|
||
@Override | ||
public boolean isOpen() { | ||
return true; | ||
Check warning on line 53 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
} | ||
|
||
@Override | ||
public void close() { | ||
request.withConnection(connection -> connection.channel().close()); | ||
} | ||
Check warning on line 59 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
|
||
@Override | ||
public void addCloseListener(ActionListener<Void> listener) { | ||
closeContext.addListener(ActionListener.toBiConsumer(listener)); | ||
} | ||
Check warning on line 64 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
|
||
@Override | ||
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) { | ||
sender.send(createContent(chunk), listener, chunk.isLast()); | ||
} | ||
|
||
@Override | ||
public void sendResponse(HttpResponse response, ActionListener<Void> listener) { | ||
sender.send(createContent(response), listener, true); | ||
} | ||
Check warning on line 74 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
|
||
@Override | ||
public void prepareResponse(int status, Map<String, List<String>> headers) { | ||
this.response.status(status); | ||
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v))); | ||
} | ||
Check warning on line 80 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
|
||
@Override | ||
public InetSocketAddress getRemoteAddress() { | ||
return (InetSocketAddress) response.remoteAddress(); | ||
Check warning on line 84 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
} | ||
|
||
@Override | ||
public InetSocketAddress getLocalAddress() { | ||
return (InetSocketAddress) response.hostAddress(); | ||
Check warning on line 89 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
} | ||
|
||
@Override | ||
public void receiveChunk(HttpChunk message) { | ||
try { | ||
if (lastChunkReceived) { | ||
return; | ||
Check warning on line 96 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
} | ||
|
||
producer.next(message); | ||
Check warning on line 99 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
if (message.isLast()) { | ||
lastChunkReceived = true; | ||
producer.complete(); | ||
Check warning on line 102 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
} | ||
} finally { | ||
message.close(); | ||
} | ||
} | ||
Check warning on line 107 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
|
||
@Override | ||
public boolean isReadable() { | ||
return producer != null; | ||
} | ||
|
||
@Override | ||
public boolean isWritable() { | ||
return sender.isReady(); | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super HttpChunk> subscriber) { | ||
receiver.subscribe(subscriber); | ||
} | ||
Check warning on line 122 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
|
||
private static HttpContent createContent(HttpResponse response) { | ||
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response; | ||
return new DefaultHttpContent(fullHttpResponse.content()); | ||
Check warning on line 126 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java
|
||
} | ||
|
||
private static HttpContent createContent(HttpChunk chunk) { | ||
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toByteBuffers(chunk.content()))); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.