Skip to content

WIP: Release transport messages incrementally while reading them #127435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public BytesReference slice(int from, int length) {
return CompositeBytesReference.ofMultiple(inSlice);
}

public BytesReference[] components() {
return references;
}

private int getOffsetIndex(int offset) {
final int i = Arrays.binarySearch(offsets, offset);
return i < 0 ? (-(i + 1)) - 1 : i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,128 @@ public long ramBytesUsed() {
return delegate.ramBytesUsed();
}

public static StreamInput consumingStreamInput(ReleasableBytesReference... references) throws IOException {
final BytesReference bytesReference;
final RefCounted[] refs = new RefCounted[references.length];
if (references.length == 1) {
final var ref = references[0];
bytesReference = ref;
refs[0] = ref.refCounted;
} else {
bytesReference = CompositeBytesReference.of(references);
for (int i = 0; i < references.length; i++) {
refs[i] = references[i].refCounted;
}
}
return new BytesReferenceStreamInput(bytesReference) {
private ReleasableBytesReference retainAndSkip(int len) throws IOException {
if (len == 0) {
return ReleasableBytesReference.empty();
}

int offset = offset();
skip(len);
// move the stream manually since creating the slice didn't move it
if (bytesReference instanceof ReleasableBytesReference releasable) {
ReleasableBytesReference res = releasable.retainedSlice(offset, len);
if (markEnd == 0 && available() == 0) {
close();
}
return res;
}
assert bytesReference instanceof CompositeBytesReference;
final CompositeBytesReference composite = (CompositeBytesReference) bytesReference;
// instead of reading the bytes from a stream we just create a slice of the underlying bytes
final BytesReference result = composite.slice(offset, len);
if (result instanceof ReleasableBytesReference releasable) {
return releasable.retain();
}
assert result instanceof CompositeBytesReference;
var compositeSlice = (CompositeBytesReference) result;
var components = compositeSlice.components();
final RefCounted[] refCounteds = new RefCounted[components.length];
for (int i = 0; i < components.length; i++) {
refCounteds[i] = ((ReleasableBytesReference) components[i]).retain();
}
if (markEnd == 0) {
maybeDiscardReadBytes(composite.components());
}
return new ReleasableBytesReference(compositeSlice, () -> {
for (int i = 0; i < refCounteds.length; i++) {
refCounteds[i].decRef();
refCounteds[i] = null;
}
});
}

private void maybeDiscardReadBytes(BytesReference[] components) {
int offset = offset();
int p = 0;
for (int i = 0; i < components.length; i++) {
p += components[i].length();
if (p >= offset) {
return;
}
var r = refs[i];
if (r != null) {
r.decRef();
refs[i] = null;
}
}
}

@Override
public ReleasableBytesReference readReleasableBytesReference() throws IOException {
final int len = readVInt();
return retainAndSkip(len);
}

@Override
public ReleasableBytesReference readReleasableBytesReference(int len) throws IOException {
return retainAndSkip(len);
}

@Override
public ReleasableBytesReference readAllToReleasableBytesReference() throws IOException {
return retainAndSkip(bytesReference.length() - offset());
}

@Override
public boolean supportReadAllToReleasableBytesReference() {
return true;
}

@Override
public void close() {
for (int i = 0; i < refs.length; i++) {
RefCounted ref = refs[i];
if (ref != null) {
refs[i] = null;
ref.decRef();
}
}
}

public void tryDiscard() {
if (markEnd == 0) {
if (bytesReference instanceof CompositeBytesReference c) {
maybeDiscardReadBytes(c.components());
} else if (available() == 0) {
close();
}
}
}

private int markEnd = 0;

@Override
public void mark(int readLimit) {
super.mark(readLimit);
markEnd = offset() + readLimit;
}
};
}

@Override
public StreamInput streamInput() throws IOException {
assert hasReferences();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public int read(byte[] b, int off, int len) throws IOException {
return delegate.read(b, off, len);
}

@Override
public void tryDiscard() {
delegate.tryDiscard();
}

@Override
public void close() throws IOException {
delegate.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,17 @@ public void setTransportVersion(TransportVersion version) {
@Override
public abstract int read(byte[] b, int off, int len) throws IOException;

public void tryDiscard() {}

/**
* Reads a bytes reference from this stream, copying any bytes read to a new {@code byte[]}. Use {@link #readReleasableBytesReference()}
* when reading large bytes references where possible top avoid needless allocations and copying.
*/
public BytesReference readBytesReference() throws IOException {
int length = readArraySize();
return readBytesReference(length);
var res = readBytesReference(length);
tryDiscard();
return res;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -95,19 +93,30 @@ public void aggregate(ReleasableBytesReference content) {

public InboundMessage finishAggregation() throws IOException {
ensureOpen();
final ReleasableBytesReference releasableContent;
final ReleasableBytesReference[] releasableContent;
final int len;
if (isFirstContent()) {
releasableContent = ReleasableBytesReference.empty();
releasableContent = new ReleasableBytesReference[] { ReleasableBytesReference.empty() };
len = 0;
} else if (contentAggregation == null) {
releasableContent = firstContent;
releasableContent = new ReleasableBytesReference[] { firstContent };
len = firstContent.length();
} else {
final ReleasableBytesReference[] references = contentAggregation.toArray(new ReleasableBytesReference[0]);
final BytesReference content = CompositeBytesReference.of(references);
releasableContent = new ReleasableBytesReference(content, () -> Releasables.close(references));
releasableContent = contentAggregation.toArray(new ReleasableBytesReference[0]);
int l = 0;
for (ReleasableBytesReference releasableBytesReference : releasableContent) {
l += releasableBytesReference.length();
}
len = l;
}

final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
final InboundMessage aggregated = new InboundMessage(
currentHeader,
ReleasableBytesReference.consumingStreamInput(releasableContent),
len,
breakerControl
);
boolean success = false;
try {
if (aggregated.getHeader().needsToReadVariableHeader()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.transport;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
Expand All @@ -23,7 +22,7 @@
public class InboundMessage implements Releasable {

private final Header header;
private final ReleasableBytesReference content;
private final int contentLength;
private final Exception exception;
private final boolean isPing;
private Releasable breakerRelease;
Expand All @@ -42,25 +41,27 @@ public class InboundMessage implements Releasable {
}
}

public InboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
public InboundMessage(Header header, StreamInput streamInput, int contentLength, Releasable breakerRelease) {
this.header = header;
this.content = content;
this.streamInput = streamInput;
streamInput.setTransportVersion(header.getVersion());
this.breakerRelease = breakerRelease;
this.exception = null;
this.isPing = false;
this.contentLength = contentLength;
}

public InboundMessage(Header header, Exception exception) {
this.header = header;
this.content = null;
this.contentLength = 0;
this.breakerRelease = null;
this.exception = exception;
this.isPing = false;
}

public InboundMessage(Header header, boolean isPing) {
this.header = header;
this.content = null;
this.contentLength = 0;
this.breakerRelease = null;
this.exception = null;
this.isPing = isPing;
Expand All @@ -71,11 +72,7 @@ public Header getHeader() {
}

public int getContentLength() {
if (content == null) {
return 0;
} else {
return content.length();
}
return contentLength;
}

public Exception getException() {
Expand All @@ -97,12 +94,6 @@ public Releasable takeBreakerReleaseControl() {
}

public StreamInput openOrGetStreamInput() throws IOException {
assert isPing == false && content != null;
assert (boolean) CLOSED.getAcquire(this) == false;
if (streamInput == null) {
streamInput = content.streamInput();
streamInput.setTransportVersion(header.getVersion());
}
return streamInput;
}

Expand All @@ -117,7 +108,7 @@ public void close() {
return;
}
try {
IOUtils.close(streamInput, content, breakerRelease);
IOUtils.close(streamInput, breakerRelease);
} catch (Exception e) {
assert false : e;
throw new ElasticsearchException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,35 +118,24 @@ private static String format(TcpChannel channel, InboundMessage message, String
if (message.isPing()) {
sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B');
} else {
boolean success = false;
Header header = message.getHeader();
int networkMessageSize = header.getNetworkMessageSize();
int messageLengthWithHeader = HEADER_SIZE + networkMessageSize;
StreamInput streamInput = message.openOrGetStreamInput();
try {
final long requestId = header.getRequestId();
final boolean isRequest = header.isRequest();
final String type = isRequest ? "request" : "response";
final String version = header.getVersion().toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);
final long requestId = header.getRequestId();
final boolean isRequest = header.isRequest();
final String type = isRequest ? "request" : "response";
final String version = header.getVersion().toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);

// TODO: Maybe Fix for BWC
if (header.needsToReadVariableHeader() == false && isRequest) {
sb.append(", action: ").append(header.getActionName());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
success = true;
} finally {
if (success) {
IOUtils.close(streamInput);
} else {
IOUtils.closeWhileHandlingException(streamInput);
}
// TODO: Maybe Fix for BWC
if (header.needsToReadVariableHeader() == false && isRequest) {
sb.append(", action: ").append(header.getActionName());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
}
return sb.toString();
}
Expand Down
Loading