Skip to content

Commit 0138b6e

Browse files
committed
Change abstraction point for transport protocol
The previous implementation had a transport switch point in InboundPipeline when the bytes were initially pulled off the wire. There was no implementation for any other protocol as the `canHandleBytes` method was hardcoded to return true. I believe this is the wrong point to switch on the protocol. This change makes NativeInboundBytesHandler protocol agnostic beyond the header. With this change, a complete message is parsed from the stream of bytes, with the header schema being unchanged from what exists today. The protocol switch point will now be at `InboundHandler::inboundMessage`. The header will indicate what protocol was used to serialize the the non-header bytes of the message and then invoke the appropriate handler based on that field. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent 46a269e commit 0138b6e

File tree

11 files changed

+158
-126
lines changed

11 files changed

+158
-126
lines changed

server/src/main/java/org/opensearch/transport/Header.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class Header {
5555

5656
private static final String RESPONSE_NAME = "NO_ACTION_NAME_FOR_RESPONSES";
5757

58+
private final TransportProtocol protocol;
5859
private final int networkMessageSize;
5960
private final Version version;
6061
private final long requestId;
@@ -64,13 +65,18 @@ public class Header {
6465
Tuple<Map<String, String>, Map<String, Set<String>>> headers;
6566
Set<String> features;
6667

67-
Header(int networkMessageSize, long requestId, byte status, Version version) {
68+
Header(TransportProtocol protocol, int networkMessageSize, long requestId, byte status, Version version) {
69+
this.protocol = protocol;
6870
this.networkMessageSize = networkMessageSize;
6971
this.version = version;
7072
this.requestId = requestId;
7173
this.status = status;
7274
}
7375

76+
TransportProtocol getTransportProtocol() {
77+
return protocol;
78+
}
79+
7480
public int getNetworkMessageSize() {
7581
return networkMessageSize;
7682
}
@@ -142,6 +148,8 @@ void finishParsingHeader(StreamInput input) throws IOException {
142148
@Override
143149
public String toString() {
144150
return "Header{"
151+
+ protocol
152+
+ "}{"
145153
+ networkMessageSize
146154
+ "}{"
147155
+ version

server/src/main/java/org/opensearch/transport/InboundAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) {
114114
}
115115
}
116116

117-
public NativeInboundMessage finishAggregation() throws IOException {
117+
public ProtocolInboundMessage finishAggregation() throws IOException {
118118
ensureOpen();
119119
final ReleasableBytesReference releasableContent;
120120
if (isFirstContent()) {

server/src/main/java/org/opensearch/transport/InboundDecoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,12 @@ private int headerBytesToRead(BytesReference reference) {
187187
// exposed for use in tests
188188
static Header readHeader(Version version, int networkMessageSize, BytesReference bytesReference) throws IOException {
189189
try (StreamInput streamInput = bytesReference.streamInput()) {
190-
streamInput.skip(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
190+
TransportProtocol protocol = TransportProtocol.fromBytes(streamInput.readByte(), streamInput.readByte());
191+
streamInput.skip(TcpHeader.MESSAGE_LENGTH_SIZE);
191192
long requestId = streamInput.readLong();
192193
byte status = streamInput.readByte();
193194
Version remoteVersion = Version.fromId(streamInput.readInt());
194-
Header header = new Header(networkMessageSize, requestId, status, remoteVersion);
195+
Header header = new Header(protocol, networkMessageSize, requestId, status, remoteVersion);
195196
final IllegalStateException invalidVersion = ensureVersionCompatibility(remoteVersion, version, header.isHandshake());
196197
if (invalidVersion != null) {
197198
throw invalidVersion;

server/src/main/java/org/opensearch/transport/InboundHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class InboundHandler {
5656

5757
private volatile long slowLogThresholdMs = Long.MAX_VALUE;
5858

59-
private final Map<String, ProtocolMessageHandler> protocolMessageHandlers;
59+
private final Map<TransportProtocol, ProtocolMessageHandler> protocolMessageHandlers;
6060

6161
InboundHandler(
6262
String nodeName,
@@ -75,7 +75,7 @@ public class InboundHandler {
7575
) {
7676
this.threadPool = threadPool;
7777
this.protocolMessageHandlers = Map.of(
78-
NativeInboundMessage.NATIVE_PROTOCOL,
78+
TransportProtocol.NATIVE,
7979
new NativeMessageHandler(
8080
nodeName,
8181
version,
@@ -114,9 +114,9 @@ void inboundMessage(TcpChannel channel, ProtocolInboundMessage message) throws E
114114
}
115115

116116
private void messageReceivedFromPipeline(TcpChannel channel, ProtocolInboundMessage message, long startTime) throws IOException {
117-
ProtocolMessageHandler protocolMessageHandler = protocolMessageHandlers.get(message.getProtocol());
117+
ProtocolMessageHandler protocolMessageHandler = protocolMessageHandlers.get(message.getTransportProtocol());
118118
if (protocolMessageHandler == null) {
119-
throw new IllegalStateException("No protocol message handler found for protocol: " + message.getProtocol());
119+
throw new IllegalStateException("No protocol message handler found for protocol: " + message.getTransportProtocol());
120120
}
121121
protocolMessageHandler.messageReceived(channel, message, startTime, slowLogThresholdMs, messageListener);
122122
}

server/src/main/java/org/opensearch/transport/InboundPipeline.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public class InboundPipeline implements Releasable {
6363
private final ArrayDeque<ReleasableBytesReference> pending = new ArrayDeque<>(2);
6464
private boolean isClosed = false;
6565
private final BiConsumer<TcpChannel, ProtocolInboundMessage> messageHandler;
66-
private final List<InboundBytesHandler> protocolBytesHandlers;
67-
private InboundBytesHandler currentHandler;
66+
private final InboundBytesHandler bytesHandler;
6867

6968
public InboundPipeline(
7069
Version version,
@@ -95,17 +94,14 @@ public InboundPipeline(
9594
this.statsTracker = statsTracker;
9695
this.decoder = decoder;
9796
this.aggregator = aggregator;
98-
this.protocolBytesHandlers = List.of(new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker));
97+
this.bytesHandler = new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker);
9998
this.messageHandler = messageHandler;
10099
}
101100

102101
@Override
103102
public void close() {
104103
isClosed = true;
105-
if (currentHandler != null) {
106-
currentHandler.close();
107-
currentHandler = null;
108-
}
104+
bytesHandler.close();
109105
Releasables.closeWhileHandlingException(decoder, aggregator);
110106
Releasables.closeWhileHandlingException(pending);
111107
pending.clear();
@@ -127,22 +123,6 @@ public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference
127123
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
128124
statsTracker.markBytesRead(reference.length());
129125
pending.add(reference.retain());
130-
131-
// If we don't have a current handler, we should try to find one based on the protocol of the incoming bytes.
132-
if (currentHandler == null) {
133-
for (InboundBytesHandler handler : protocolBytesHandlers) {
134-
if (handler.canHandleBytes(reference)) {
135-
currentHandler = handler;
136-
break;
137-
}
138-
}
139-
}
140-
141-
// If we have a current handler determined based on protocol, we should continue to use it for the fragmented bytes.
142-
if (currentHandler != null) {
143-
currentHandler.doHandleBytes(channel, reference, messageHandler);
144-
} else {
145-
throw new IllegalStateException("No bytes handler found for the incoming transport protocol");
146-
}
126+
bytesHandler.doHandleBytes(channel, reference, messageHandler);
147127
}
148128
}

server/src/main/java/org/opensearch/transport/ProtocolInboundMessage.java

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.opensearch.transport;
1010

1111
import org.opensearch.common.annotation.PublicApi;
12+
import org.opensearch.common.bytes.ReleasableBytesReference;
13+
import org.opensearch.common.lease.Releasable;
14+
import org.opensearch.common.lease.Releasables;
1215

1316
/**
1417
* Base class for inbound data as a message.
@@ -17,11 +20,89 @@
1720
* @opensearch.internal
1821
*/
1922
@PublicApi(since = "2.14.0")
20-
public interface ProtocolInboundMessage {
23+
public abstract class ProtocolInboundMessage implements Releasable {
2124

22-
/**
23-
* @return the protocol used to encode this message
24-
*/
25-
public String getProtocol();
25+
protected final Header header;
26+
protected final ReleasableBytesReference content;
27+
protected final Exception exception;
28+
protected final boolean isPing;
29+
private Releasable breakerRelease;
2630

31+
public ProtocolInboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
32+
this.header = header;
33+
this.content = content;
34+
this.breakerRelease = breakerRelease;
35+
this.exception = null;
36+
this.isPing = false;
37+
}
38+
39+
public ProtocolInboundMessage(Header header, Exception exception) {
40+
this.header = header;
41+
this.content = null;
42+
this.breakerRelease = null;
43+
this.exception = exception;
44+
this.isPing = false;
45+
}
46+
47+
public ProtocolInboundMessage(Header header, boolean isPing) {
48+
this.header = header;
49+
this.content = null;
50+
this.breakerRelease = null;
51+
this.exception = null;
52+
this.isPing = isPing;
53+
}
54+
55+
TransportProtocol getTransportProtocol() {
56+
return header.getTransportProtocol();
57+
}
58+
59+
public String getProtocol() {
60+
return header.getTransportProtocol().toString();
61+
}
62+
63+
public Header getHeader() {
64+
return header;
65+
}
66+
67+
public int getContentLength() {
68+
if (content == null) {
69+
return 0;
70+
} else {
71+
return content.length();
72+
}
73+
}
74+
75+
public Exception getException() {
76+
return exception;
77+
}
78+
79+
public boolean isPing() {
80+
return isPing;
81+
}
82+
83+
public boolean isShortCircuit() {
84+
return exception != null;
85+
}
86+
87+
public Releasable takeBreakerReleaseControl() {
88+
final Releasable toReturn = breakerRelease;
89+
breakerRelease = null;
90+
if (toReturn != null) {
91+
return toReturn;
92+
} else {
93+
return () -> {};
94+
}
95+
}
96+
97+
98+
99+
@Override
100+
public void close() {
101+
Releasables.closeWhileHandlingException(content, breakerRelease);
102+
}
103+
104+
@Override
105+
public String toString() {
106+
return "InboundMessage{" + header + "}";
107+
}
27108
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
enum TransportProtocol {
12+
NATIVE;
13+
14+
public static TransportProtocol fromBytes(byte b1, byte b2) {
15+
if (b1 == 'E' && b2 == 'S') {
16+
return NATIVE;
17+
}
18+
19+
throw new IllegalArgumentException("Unknown transport protocol: [" + b1 + ", " + b2 + "]");
20+
}
21+
}

server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private void forwardFragments(
151151
messageHandler.accept(channel, PING_MESSAGE);
152152
} else if (fragment == InboundDecoder.END_CONTENT) {
153153
assert aggregator.isAggregating();
154-
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
154+
try (ProtocolInboundMessage aggregated = aggregator.finishAggregation()) {
155155
statsTracker.markMessageReceived();
156156
messageHandler.accept(channel, aggregated);
157157
}

server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundMessage.java

Lines changed: 5 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -49,81 +49,25 @@
4949
* @opensearch.api
5050
*/
5151
@PublicApi(since = "2.14.0")
52-
public class NativeInboundMessage implements Releasable, ProtocolInboundMessage {
52+
public class NativeInboundMessage extends ProtocolInboundMessage {
5353

5454
/**
5555
* The protocol used to encode this message
5656
*/
5757
public static String NATIVE_PROTOCOL = "native";
5858

59-
private final Header header;
60-
private final ReleasableBytesReference content;
61-
private final Exception exception;
62-
private final boolean isPing;
63-
private Releasable breakerRelease;
6459
private StreamInput streamInput;
6560

6661
public NativeInboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
67-
this.header = header;
68-
this.content = content;
69-
this.breakerRelease = breakerRelease;
70-
this.exception = null;
71-
this.isPing = false;
62+
super(header, content, breakerRelease);
7263
}
7364

7465
public NativeInboundMessage(Header header, Exception exception) {
75-
this.header = header;
76-
this.content = null;
77-
this.breakerRelease = null;
78-
this.exception = exception;
79-
this.isPing = false;
66+
super(header, exception);
8067
}
8168

8269
public NativeInboundMessage(Header header, boolean isPing) {
83-
this.header = header;
84-
this.content = null;
85-
this.breakerRelease = null;
86-
this.exception = null;
87-
this.isPing = isPing;
88-
}
89-
90-
@Override
91-
public String getProtocol() {
92-
return NATIVE_PROTOCOL;
93-
}
94-
95-
public Header getHeader() {
96-
return header;
97-
}
98-
99-
public int getContentLength() {
100-
if (content == null) {
101-
return 0;
102-
} else {
103-
return content.length();
104-
}
105-
}
106-
107-
public Exception getException() {
108-
return exception;
109-
}
110-
111-
public boolean isPing() {
112-
return isPing;
113-
}
114-
115-
public boolean isShortCircuit() {
116-
return exception != null;
117-
}
118-
119-
public Releasable takeBreakerReleaseControl() {
120-
final Releasable toReturn = breakerRelease;
121-
breakerRelease = null;
122-
if (toReturn != null) {
123-
return toReturn;
124-
} else {
125-
return () -> {};
126-
}
70+
super(header, isPing);
12771
}
12872

12973
public StreamInput openOrGetStreamInput() throws IOException {
@@ -138,12 +82,6 @@ public StreamInput openOrGetStreamInput() throws IOException {
13882
@Override
13983
public void close() {
14084
IOUtils.closeWhileHandlingException(streamInput);
141-
Releasables.closeWhileHandlingException(content, breakerRelease);
85+
super.close();
14286
}
143-
144-
@Override
145-
public String toString() {
146-
return "InboundMessage{" + header + "}";
147-
}
148-
14987
}

0 commit comments

Comments
 (0)