Skip to content

Commit acee2ae

Browse files
authored
Fix Netty's ByteBuf leak (#15475)
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent f5da8c8 commit acee2ae

File tree

3 files changed

+11
-18
lines changed

3 files changed

+11
-18
lines changed

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,31 @@
88

99
package org.opensearch.http.reactor.netty4;
1010

11+
import org.opensearch.core.common.bytes.BytesArray;
1112
import org.opensearch.core.common.bytes.BytesReference;
1213
import org.opensearch.http.HttpChunk;
13-
import org.opensearch.transport.reactor.netty4.Netty4Utils;
14-
15-
import java.util.concurrent.atomic.AtomicBoolean;
1614

1715
import io.netty.buffer.ByteBuf;
1816

1917
class ReactorNetty4HttpChunk implements HttpChunk {
20-
private final AtomicBoolean released;
21-
private final boolean pooled;
22-
private final ByteBuf content;
18+
private final BytesArray content;
2319
private final boolean last;
2420

25-
ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
26-
this.content = content;
27-
this.pooled = true;
28-
this.released = new AtomicBoolean(false);
21+
ReactorNetty4HttpChunk(ByteBuf buf, boolean last) {
22+
// Since the chunks could be batched and processing could be delayed, we are copying the content here
23+
final byte[] content = new byte[buf.readableBytes()];
24+
buf.readBytes(content);
25+
this.content = new BytesArray(content);
2926
this.last = last;
3027
}
3128

3229
@Override
3330
public BytesReference content() {
34-
assert released.get() == false;
35-
return Netty4Utils.toBytesReference(content);
31+
return content;
3632
}
3733

3834
@Override
39-
public void close() {
40-
if (pooled && released.compareAndSet(false, true)) {
41-
content.release();
42-
}
43-
}
35+
public void close() {}
4436

4537
@Override
4638
public boolean isLast() {

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public void receiveChunk(HttpChunk message) {
103103
}
104104
} catch (final Exception ex) {
105105
producer.error(ex);
106+
} finally {
106107
message.close();
107108
}
108109
}

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void subscribe(Subscriber<? super HttpContent> s) {
4444
}
4545

4646
HttpChunk createChunk(HttpContent chunk, boolean last) {
47-
return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last);
47+
return new ReactorNetty4HttpChunk(chunk.content(), last);
4848
}
4949

5050
StreamingHttpChannel httpChannel() {

0 commit comments

Comments
 (0)