Skip to content

Commit f879508

Browse files
Avoid building large CompositeByteBuf when sending transport messages (#105137)
We can avoid building composite byte buf instances on the transport layer (they have quite a bit of overhead and make heap dumps more complicated to read). There's no need to add another round of references to the BytesReference components here. Just write these out as they come in. This would allow for some efficiency improving follow-ups where we can essentially release the pages that have passed the write pipeline. To avoid having this explode the size of the queue for writes per channel, I moved that to a linked list. The slowdown from a linked list is irrelevant I believe. Mostly the queue is empty so it doesn't matter or if it isn't empty, operations other than dequeuing are much more important to performance in this logic anyway (+ Netty internally uses a LL down the line anyway). I would regard this as step-1 in making the serialisation here more lazy like on the REST layer to avoid copying bytes to the outbound buffer that we already have as `byte[]`.
1 parent fabcf70 commit f879508

File tree

4 files changed

+115
-22
lines changed

4 files changed

+115
-22
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
165165
// We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to
166166
// use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved.
167167
var wrapped = ActionListener.notifyOnce(listener);
168-
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(wrapped, channel));
168+
channel.writeAndFlush(reference, addPromise(wrapped, channel));
169169
if (channel.eventLoop().isShutdown()) {
170170
wrapped.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
171171
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,13 @@ public static void setAvailableProcessors(final int availableProcessors) {
7272
* pages of the BytesReference. Don't free the bytes of reference before the ByteBuf goes out of scope.
7373
*/
7474
public static ByteBuf toByteBuf(final BytesReference reference) {
75-
if (reference.length() == 0) {
76-
return Unpooled.EMPTY_BUFFER;
75+
if (reference.hasArray()) {
76+
return Unpooled.wrappedBuffer(reference.array(), reference.arrayOffset(), reference.length());
7777
}
78+
return compositeReferenceToByteBuf(reference);
79+
}
80+
81+
private static ByteBuf compositeReferenceToByteBuf(BytesReference reference) {
7882
final BytesRefIterator iterator = reference.iterator();
7983
// usually we have one, two, or three components from the header, the message, and a buffer
8084
final List<ByteBuf> buffers = new ArrayList<>(3);

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,25 @@
99
package org.elasticsearch.transport.netty4;
1010

1111
import io.netty.buffer.ByteBuf;
12+
import io.netty.buffer.Unpooled;
1213
import io.netty.channel.Channel;
1314
import io.netty.channel.ChannelDuplexHandler;
1415
import io.netty.channel.ChannelFuture;
1516
import io.netty.channel.ChannelHandlerContext;
1617
import io.netty.channel.ChannelPromise;
1718
import io.netty.util.concurrent.Future;
1819
import io.netty.util.concurrent.GenericFutureListener;
20+
import io.netty.util.concurrent.PromiseCombiner;
1921

22+
import org.apache.lucene.util.BytesRef;
23+
import org.apache.lucene.util.BytesRefIterator;
24+
import org.elasticsearch.common.bytes.BytesReference;
2025
import org.elasticsearch.common.util.concurrent.ThreadContext;
2126
import org.elasticsearch.transport.Transports;
2227

28+
import java.io.IOException;
2329
import java.nio.channels.ClosedChannelException;
24-
import java.util.ArrayDeque;
30+
import java.util.LinkedList;
2531
import java.util.Queue;
2632

2733
/**
@@ -32,7 +38,7 @@
3238
public final class Netty4WriteThrottlingHandler extends ChannelDuplexHandler {
3339

3440
public static final int MAX_BYTES_PER_WRITE = 1 << 18;
35-
private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();
41+
private final Queue<WriteOperation> queuedWrites = new LinkedList<>();
3642

3743
private final ThreadContext threadContext;
3844
private WriteOperation currentWrite;
@@ -42,17 +48,36 @@ public Netty4WriteThrottlingHandler(ThreadContext threadContext) {
4248
}
4349

4450
@Override
45-
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
46-
assert msg instanceof ByteBuf;
51+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
52+
if (msg instanceof BytesReference reference) {
53+
if (reference.hasArray()) {
54+
writeSingleByteBuf(ctx, Unpooled.wrappedBuffer(reference.array(), reference.arrayOffset(), reference.length()), promise);
55+
} else {
56+
BytesRefIterator iter = reference.iterator();
57+
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
58+
BytesRef next;
59+
while ((next = iter.next()) != null) {
60+
final ChannelPromise chunkPromise = ctx.newPromise();
61+
combiner.add((Future<Void>) chunkPromise);
62+
writeSingleByteBuf(ctx, Unpooled.wrappedBuffer(next.bytes, next.offset, next.length), chunkPromise);
63+
}
64+
combiner.finish(promise);
65+
}
66+
} else {
67+
assert msg instanceof ByteBuf;
68+
writeSingleByteBuf(ctx, (ByteBuf) msg, promise);
69+
}
70+
}
71+
72+
private void writeSingleByteBuf(ChannelHandlerContext ctx, ByteBuf buf, ChannelPromise promise) {
4773
assert Transports.assertDefaultThreadContext(threadContext);
4874
assert Transports.assertTransportThread();
49-
final ByteBuf buf = (ByteBuf) msg;
5075
if (ctx.channel().isWritable() && currentWrite == null && queuedWrites.isEmpty()) {
5176
// nothing is queued for writing and the channel is writable, just pass the write down the pipeline directly
5277
if (buf.readableBytes() > MAX_BYTES_PER_WRITE) {
5378
writeInSlices(ctx, promise, buf);
5479
} else {
55-
ctx.write(msg, promise);
80+
ctx.write(buf, promise);
5681
}
5782
} else {
5883
queueWrite(buf, promise);

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandlerTests.java

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
import io.netty.channel.ChannelPromise;
1515
import io.netty.channel.embedded.EmbeddedChannel;
1616

17+
import org.elasticsearch.common.bytes.BytesArray;
18+
import org.elasticsearch.common.bytes.BytesReference;
19+
import org.elasticsearch.common.bytes.CompositeBytesReference;
1720
import org.elasticsearch.common.settings.Settings;
1821
import org.elasticsearch.common.util.concurrent.ThreadContext;
1922
import org.elasticsearch.test.ESTestCase;
@@ -28,6 +31,7 @@
2831
import static org.hamcrest.Matchers.instanceOf;
2932
import static org.hamcrest.Matchers.lessThan;
3033
import static org.hamcrest.Matchers.lessThanOrEqualTo;
34+
import static org.hamcrest.Matchers.oneOf;
3135

3236
public class Netty4WriteThrottlingHandlerTests extends ESTestCase {
3337

@@ -56,42 +60,76 @@ public void testThrottlesLargeMessage() throws ExecutionException, InterruptedEx
5660
assertThat(writeableBytes, lessThan(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE));
5761
final int fullSizeChunks = randomIntBetween(2, 10);
5862
final int extraChunkSize = randomIntBetween(0, 10);
59-
final ByteBuf message = Unpooled.wrappedBuffer(
60-
randomByteArrayOfLength(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE * fullSizeChunks + extraChunkSize)
63+
final byte[] messageBytes = randomByteArrayOfLength(
64+
Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE * fullSizeChunks + extraChunkSize
6165
);
66+
final Object message = wrapAsNettyOrEsBuffer(messageBytes);
6267
final ChannelPromise promise = embeddedChannel.newPromise();
6368
transportGroup.getLowLevelGroup().submit(() -> embeddedChannel.write(message, promise)).get();
6469
assertThat(seen, hasSize(1));
65-
assertEquals(message.slice(0, Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE), seen.get(0));
70+
assertSliceEquals(seen.get(0), message, 0, Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE);
6671
assertFalse(promise.isDone());
6772
transportGroup.getLowLevelGroup().submit(embeddedChannel::flush).get();
6873
assertTrue(promise.isDone());
6974
assertThat(seen, hasSize(fullSizeChunks + (extraChunkSize == 0 ? 0 : 1)));
7075
assertTrue(capturingHandler.didWriteAfterThrottled);
7176
if (extraChunkSize != 0) {
72-
assertEquals(
73-
message.slice(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE * fullSizeChunks, extraChunkSize),
74-
seen.get(seen.size() - 1)
77+
assertSliceEquals(
78+
seen.get(seen.size() - 1),
79+
message,
80+
Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE * fullSizeChunks,
81+
extraChunkSize
7582
);
7683
}
7784
}
7885

79-
public void testPassesSmallMessageDirectly() throws ExecutionException, InterruptedException {
86+
public void testThrottleLargeCompositeMessage() throws ExecutionException, InterruptedException {
8087
final List<ByteBuf> seen = new CopyOnWriteArrayList<>();
8188
final CapturingHandler capturingHandler = new CapturingHandler(seen);
8289
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(
8390
capturingHandler,
8491
new Netty4WriteThrottlingHandler(new ThreadContext(Settings.EMPTY))
8592
);
93+
// we assume that the channel outbound buffer is smaller than Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE
8694
final int writeableBytes = Math.toIntExact(embeddedChannel.bytesBeforeUnwritable());
8795
assertThat(writeableBytes, lessThan(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE));
88-
final ByteBuf message = Unpooled.wrappedBuffer(
89-
randomByteArrayOfLength(randomIntBetween(0, Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE))
96+
final int fullSizeChunks = randomIntBetween(2, 10);
97+
final int extraChunkSize = randomIntBetween(0, 10);
98+
final byte[] messageBytes = randomByteArrayOfLength(
99+
Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE * fullSizeChunks + extraChunkSize
100+
);
101+
int splitOffset = randomIntBetween(0, messageBytes.length);
102+
final BytesReference message = CompositeBytesReference.of(
103+
new BytesArray(messageBytes, 0, splitOffset),
104+
new BytesArray(messageBytes, splitOffset, messageBytes.length - splitOffset)
105+
);
106+
final ChannelPromise promise = embeddedChannel.newPromise();
107+
transportGroup.getLowLevelGroup().submit(() -> embeddedChannel.write(message, promise)).get();
108+
assertThat(seen, hasSize(oneOf(1, 2)));
109+
assertSliceEquals(seen.get(0), message, 0, seen.get(0).readableBytes());
110+
assertFalse(promise.isDone());
111+
transportGroup.getLowLevelGroup().submit(embeddedChannel::flush).get();
112+
assertTrue(promise.isDone());
113+
assertThat(seen, hasSize(oneOf(fullSizeChunks, fullSizeChunks + 1)));
114+
assertTrue(capturingHandler.didWriteAfterThrottled);
115+
assertBufferEquals(Unpooled.compositeBuffer().addComponents(true, seen), message);
116+
}
117+
118+
public void testPassesSmallMessageDirectly() throws ExecutionException, InterruptedException {
119+
final List<ByteBuf> seen = new CopyOnWriteArrayList<>();
120+
final CapturingHandler capturingHandler = new CapturingHandler(seen);
121+
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(
122+
capturingHandler,
123+
new Netty4WriteThrottlingHandler(new ThreadContext(Settings.EMPTY))
90124
);
125+
final int writeableBytes = Math.toIntExact(embeddedChannel.bytesBeforeUnwritable());
126+
assertThat(writeableBytes, lessThan(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE));
127+
final byte[] messageBytes = randomByteArrayOfLength(randomIntBetween(0, Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE));
128+
final Object message = wrapAsNettyOrEsBuffer(messageBytes);
91129
final ChannelPromise promise = embeddedChannel.newPromise();
92130
transportGroup.getLowLevelGroup().submit(() -> embeddedChannel.write(message, promise)).get();
93131
assertThat(seen, hasSize(1)); // first message should be passed through straight away
94-
assertSame(message, seen.get(0));
132+
assertBufferEquals(seen.get(0), message);
95133
assertFalse(promise.isDone());
96134
transportGroup.getLowLevelGroup().submit(embeddedChannel::flush).get();
97135
assertTrue(promise.isDone());
@@ -107,13 +145,14 @@ public void testThrottlesOnUnwritable() throws ExecutionException, InterruptedEx
107145
);
108146
final int writeableBytes = Math.toIntExact(embeddedChannel.bytesBeforeUnwritable());
109147
assertThat(writeableBytes, lessThan(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE));
110-
final ByteBuf message = Unpooled.wrappedBuffer(randomByteArrayOfLength(writeableBytes + randomIntBetween(0, 10)));
148+
final byte[] messageBytes = randomByteArrayOfLength(writeableBytes + randomIntBetween(0, 10));
149+
final Object message = wrapAsNettyOrEsBuffer(messageBytes);
111150
final ChannelPromise promise = embeddedChannel.newPromise();
112151
transportGroup.getLowLevelGroup().submit(() -> embeddedChannel.write(message, promise)).get();
113152
assertThat(seen, hasSize(1)); // first message should be passed through straight away
114-
assertSame(message, seen.get(0));
153+
assertBufferEquals(seen.get(0), message);
115154
assertFalse(promise.isDone());
116-
final ByteBuf messageToQueue = Unpooled.wrappedBuffer(
155+
final Object messageToQueue = wrapAsNettyOrEsBuffer(
117156
randomByteArrayOfLength(randomIntBetween(0, Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE))
118157
);
119158
final ChannelPromise promiseForQueued = embeddedChannel.newPromise();
@@ -126,6 +165,31 @@ public void testThrottlesOnUnwritable() throws ExecutionException, InterruptedEx
126165
assertTrue(promiseForQueued.isDone());
127166
}
128167

168+
private static void assertBufferEquals(ByteBuf expected, Object message) {
169+
if (message instanceof ByteBuf buf) {
170+
assertSame(expected, buf);
171+
} else {
172+
assertEquals(expected, Netty4Utils.toByteBuf(asInstanceOf(BytesReference.class, message)));
173+
}
174+
}
175+
176+
private static void assertSliceEquals(ByteBuf expected, Object message, int index, int length) {
177+
assertEquals(
178+
(message instanceof ByteBuf buf ? buf : Netty4Utils.toByteBuf(asInstanceOf(BytesReference.class, message))).slice(
179+
index,
180+
length
181+
),
182+
expected
183+
);
184+
}
185+
186+
private static Object wrapAsNettyOrEsBuffer(byte[] messageBytes) {
187+
if (randomBoolean()) {
188+
return Unpooled.wrappedBuffer(messageBytes);
189+
}
190+
return new BytesArray(messageBytes);
191+
}
192+
129193
private static class CapturingHandler extends ChannelOutboundHandlerAdapter {
130194
private final List<ByteBuf> seen;
131195

0 commit comments

Comments
 (0)