Skip to content

Commit 38e668c

Browse files
committed
Replacing InboundMessage with NativeInboundMessage for deprecation
Signed-off-by: Vacha Shah <vachshah@amazon.com>
1 parent d202d90 commit 38e668c

File tree

8 files changed

+80
-38
lines changed

8 files changed

+80
-38
lines changed

server/src/main/java/org/opensearch/transport/InboundAggregator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.core.common.bytes.BytesArray;
4141
import org.opensearch.core.common.bytes.BytesReference;
4242
import org.opensearch.core.common.bytes.CompositeBytesReference;
43+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4344

4445
import java.io.IOException;
4546
import java.util.ArrayList;
@@ -113,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) {
113114
}
114115
}
115116

116-
public InboundMessage finishAggregation() throws IOException {
117+
public NativeInboundMessage finishAggregation() throws IOException {
117118
ensureOpen();
118119
final ReleasableBytesReference releasableContent;
119120
if (isFirstContent()) {
@@ -127,7 +128,7 @@ public InboundMessage finishAggregation() throws IOException {
127128
}
128129

129130
final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
130-
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
131+
final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl);
131132
boolean success = false;
132133
try {
133134
if (aggregated.getHeader().needsToReadVariableHeader()) {
@@ -142,7 +143,7 @@ public InboundMessage finishAggregation() throws IOException {
142143
if (isShortCircuited()) {
143144
aggregated.close();
144145
success = true;
145-
return new InboundMessage(aggregated.getHeader(), aggregationException);
146+
return new NativeInboundMessage(aggregated.getHeader(), aggregationException);
146147
} else {
147148
success = true;
148149
return aggregated;

server/src/main/java/org/opensearch/transport/NativeMessageHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.telemetry.tracing.Tracer;
5252
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
5353
import org.opensearch.threadpool.ThreadPool;
54+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
5455

5556
import java.io.EOFException;
5657
import java.io.IOException;
@@ -111,7 +112,7 @@ public void messageReceived(
111112
long slowLogThresholdMs,
112113
TransportMessageListener messageListener
113114
) throws IOException {
114-
InboundMessage inboundMessage = (InboundMessage) message;
115+
NativeInboundMessage inboundMessage = (NativeInboundMessage) message;
115116
TransportLogger.logInboundMessage(channel, inboundMessage);
116117
if (inboundMessage.isPing()) {
117118
keepAlive.receiveKeepAlive(channel);
@@ -122,7 +123,7 @@ public void messageReceived(
122123

123124
private void handleMessage(
124125
TcpChannel channel,
125-
InboundMessage message,
126+
NativeInboundMessage message,
126127
long startTime,
127128
long slowLogThresholdMs,
128129
TransportMessageListener messageListener
@@ -194,7 +195,7 @@ private Map<String, Collection<String>> extractHeaders(Map<String, String> heade
194195
private <T extends TransportRequest> void handleRequest(
195196
TcpChannel channel,
196197
Header header,
197-
InboundMessage message,
198+
NativeInboundMessage message,
198199
TransportMessageListener messageListener
199200
) throws IOException {
200201
final String action = header.getActionName();

server/src/main/java/org/opensearch/transport/TransportLogger.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
4141
import org.opensearch.core.common.io.stream.StreamInput;
4242
import org.opensearch.core.compress.CompressorRegistry;
43+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4344

4445
import java.io.IOException;
4546

@@ -64,7 +65,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
6465
}
6566
}
6667

67-
static void logInboundMessage(TcpChannel channel, InboundMessage message) {
68+
static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) {
6869
if (logger.isTraceEnabled()) {
6970
try {
7071
String logMessage = format(channel, message, "READ");
@@ -136,7 +137,7 @@ private static String format(TcpChannel channel, BytesReference message, String
136137
return sb.toString();
137138
}
138139

139-
private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
140+
private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException {
140141
final StringBuilder sb = new StringBuilder();
141142
sb.append(channel);
142143

server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.opensearch.transport.InboundAggregator;
1717
import org.opensearch.transport.InboundBytesHandler;
1818
import org.opensearch.transport.InboundDecoder;
19-
import org.opensearch.transport.InboundMessage;
2019
import org.opensearch.transport.ProtocolInboundMessage;
2120
import org.opensearch.transport.StatsTracker;
2221
import org.opensearch.transport.TcpChannel;
@@ -32,7 +31,7 @@
3231
public class NativeInboundBytesHandler implements InboundBytesHandler {
3332

3433
private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
35-
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
34+
private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true);
3635

3736
private final ArrayDeque<ReleasableBytesReference> pending;
3837
private final InboundDecoder decoder;
@@ -152,7 +151,7 @@ private void forwardFragments(
152151
messageHandler.accept(channel, PING_MESSAGE);
153152
} else if (fragment == InboundDecoder.END_CONTENT) {
154153
assert aggregator.isAggregating();
155-
try (InboundMessage aggregated = aggregator.finishAggregation()) {
154+
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
156155
statsTracker.markMessageReceived();
157156
messageHandler.accept(channel, aggregated);
158157
}

server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.core.common.breaker.CircuitBreakingException;
4343
import org.opensearch.core.common.bytes.BytesArray;
4444
import org.opensearch.test.OpenSearchTestCase;
45+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4546
import org.junit.Before;
4647

4748
import java.io.IOException;
@@ -107,7 +108,7 @@ public void testInboundAggregation() throws IOException {
107108
}
108109

109110
// Signal EOS
110-
InboundMessage aggregated = aggregator.finishAggregation();
111+
NativeInboundMessage aggregated = aggregator.finishAggregation();
111112

112113
assertThat(aggregated, notNullValue());
113114
assertFalse(aggregated.isPing());
@@ -138,7 +139,7 @@ public void testInboundUnknownAction() throws IOException {
138139
assertEquals(0, content.refCount());
139140

140141
// Signal EOS
141-
InboundMessage aggregated = aggregator.finishAggregation();
142+
NativeInboundMessage aggregated = aggregator.finishAggregation();
142143

143144
assertThat(aggregated, notNullValue());
144145
assertTrue(aggregated.isShortCircuit());
@@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException {
161162
content1.close();
162163

163164
// Signal EOS
164-
InboundMessage aggregated1 = aggregator.finishAggregation();
165+
NativeInboundMessage aggregated1 = aggregator.finishAggregation();
165166

166167
assertEquals(0, content1.refCount());
167168
assertThat(aggregated1, notNullValue());
@@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException {
180181
content2.close();
181182

182183
// Signal EOS
183-
InboundMessage aggregated2 = aggregator.finishAggregation();
184+
NativeInboundMessage aggregated2 = aggregator.finishAggregation();
184185

185186
assertEquals(1, content2.refCount());
186187
assertThat(aggregated2, notNullValue());
@@ -199,7 +200,7 @@ public void testCircuitBreak() throws IOException {
199200
content3.close();
200201

201202
// Signal EOS
202-
InboundMessage aggregated3 = aggregator.finishAggregation();
203+
NativeInboundMessage aggregated3 = aggregator.finishAggregation();
203204

204205
assertEquals(1, content3.refCount());
205206
assertThat(aggregated3, notNullValue());
@@ -263,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
263264
content.close();
264265

265266
// Signal EOS
266-
InboundMessage aggregated = aggregator.finishAggregation();
267+
NativeInboundMessage aggregated = aggregator.finishAggregation();
267268

268269
assertThat(aggregated, notNullValue());
269270
assertFalse(header.needsToReadVariableHeader());

server/src/test/java/org/opensearch/transport/InboundHandlerTests.java

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.opensearch.test.VersionUtils;
5757
import org.opensearch.threadpool.TestThreadPool;
5858
import org.opensearch.threadpool.ThreadPool;
59+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
5960
import org.junit.After;
6061
import org.junit.Before;
6162

@@ -142,7 +143,7 @@ public void testPing() throws Exception {
142143
);
143144
requestHandlers.registerHandler(registry);
144145

145-
handler.inboundMessage(channel, new InboundMessage(null, true));
146+
handler.inboundMessage(channel, new NativeInboundMessage(null, true));
146147
if (channel.isServerChannel()) {
147148
BytesReference ping = channel.getMessageCaptor().get();
148149
assertEquals('E', ping.get(0));
@@ -208,7 +209,11 @@ public TestResponse read(StreamInput in) throws IOException {
208209
BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput());
209210
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);
210211
Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version);
211-
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
212+
NativeInboundMessage requestMessage = new NativeInboundMessage(
213+
requestHeader,
214+
ReleasableBytesReference.wrap(requestContent),
215+
() -> {}
216+
);
212217
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
213218
handler.inboundMessage(channel, requestMessage);
214219

@@ -229,7 +234,11 @@ public TestResponse read(StreamInput in) throws IOException {
229234
BytesReference fullResponseBytes = channel.getMessageCaptor().get();
230235
BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize);
231236
Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version);
232-
InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {});
237+
NativeInboundMessage responseMessage = new NativeInboundMessage(
238+
responseHeader,
239+
ReleasableBytesReference.wrap(responseContent),
240+
() -> {}
241+
);
233242
responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput());
234243
handler.inboundMessage(channel, responseMessage);
235244

@@ -256,7 +265,7 @@ public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exce
256265
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)),
257266
remoteVersion
258267
);
259-
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
268+
final NativeInboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
260269
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
261270
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
262271
requestHeader.features = Set.of();
@@ -296,7 +305,7 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws
296305
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)),
297306
remoteVersion
298307
);
299-
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
308+
final NativeInboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
300309
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
301310
requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
302311
requestHeader.features = Set.of();
@@ -327,13 +336,17 @@ public void testLogsSlowInboundProcessing() throws Exception {
327336
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)),
328337
remoteVersion
329338
);
330-
final InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {
331-
try {
332-
TimeUnit.SECONDS.sleep(1L);
333-
} catch (InterruptedException e) {
334-
throw new AssertionError(e);
339+
final NativeInboundMessage requestMessage = new NativeInboundMessage(
340+
requestHeader,
341+
ReleasableBytesReference.wrap(BytesArray.EMPTY),
342+
() -> {
343+
try {
344+
TimeUnit.SECONDS.sleep(1L);
345+
} catch (InterruptedException e) {
346+
throw new AssertionError(e);
347+
}
335348
}
336-
});
349+
);
337350
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
338351
requestHeader.headers = Tuple.tuple(Collections.emptyMap(), Collections.emptyMap());
339352
requestHeader.features = Set.of();
@@ -407,7 +420,11 @@ public void onResponseSent(long requestId, String action, Exception error) {
407420
BytesReference fullRequestBytes = BytesReference.fromByteBuffer((ByteBuffer) buffer.flip());
408421
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);
409422
Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version);
410-
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
423+
NativeInboundMessage requestMessage = new NativeInboundMessage(
424+
requestHeader,
425+
ReleasableBytesReference.wrap(requestContent),
426+
() -> {}
427+
);
411428
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
412429
handler.inboundMessage(channel, requestMessage);
413430

@@ -474,7 +491,11 @@ public void onResponseSent(long requestId, String action, Exception error) {
474491
// Create the request payload by intentionally stripping 1 byte away
475492
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize - 1);
476493
Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version);
477-
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
494+
NativeInboundMessage requestMessage = new NativeInboundMessage(
495+
requestHeader,
496+
ReleasableBytesReference.wrap(requestContent),
497+
() -> {}
498+
);
478499
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
479500
handler.inboundMessage(channel, requestMessage);
480501

@@ -540,7 +561,11 @@ public TestResponse read(StreamInput in) throws IOException {
540561
BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput());
541562
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);
542563
Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version);
543-
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
564+
NativeInboundMessage requestMessage = new NativeInboundMessage(
565+
requestHeader,
566+
ReleasableBytesReference.wrap(requestContent),
567+
() -> {}
568+
);
544569
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
545570
handler.inboundMessage(channel, requestMessage);
546571

@@ -562,7 +587,11 @@ public TestResponse read(StreamInput in) throws IOException {
562587
BytesReference fullResponseBytes = BytesReference.fromByteBuffer((ByteBuffer) buffer.flip());
563588
BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize);
564589
Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version);
565-
InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {});
590+
NativeInboundMessage responseMessage = new NativeInboundMessage(
591+
responseHeader,
592+
ReleasableBytesReference.wrap(responseContent),
593+
() -> {}
594+
);
566595
responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput());
567596
handler.inboundMessage(channel, responseMessage);
568597

@@ -628,7 +657,11 @@ public TestResponse read(StreamInput in) throws IOException {
628657
BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput());
629658
BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);
630659
Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version);
631-
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
660+
NativeInboundMessage requestMessage = new NativeInboundMessage(
661+
requestHeader,
662+
ReleasableBytesReference.wrap(requestContent),
663+
() -> {}
664+
);
632665
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
633666
handler.inboundMessage(channel, requestMessage);
634667

@@ -645,7 +678,11 @@ public TestResponse read(StreamInput in) throws IOException {
645678
// Create the response payload by intentionally stripping 1 byte away
646679
BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize - 1);
647680
Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version);
648-
InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {});
681+
NativeInboundMessage responseMessage = new NativeInboundMessage(
682+
responseHeader,
683+
ReleasableBytesReference.wrap(responseContent),
684+
() -> {}
685+
);
649686
responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput());
650687
handler.inboundMessage(channel, responseMessage);
651688

@@ -654,8 +691,8 @@ public TestResponse read(StreamInput in) throws IOException {
654691
assertThat(exceptionCaptor.get().getMessage(), containsString("Failed to deserialize response from handler"));
655692
}
656693

657-
private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) {
658-
return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {}) {
694+
private static NativeInboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) {
695+
return new NativeInboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {}) {
659696
@Override
660697
public StreamInput openOrGetStreamInput() {
661698
final StreamInput streamInput = new InputStreamStreamInput(new InputStream() {

server/src/test/java/org/opensearch/transport/InboundPipelineTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.core.common.bytes.BytesArray;
5050
import org.opensearch.core.common.bytes.BytesReference;
5151
import org.opensearch.test.OpenSearchTestCase;
52+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
5253

5354
import java.io.IOException;
5455
import java.util.ArrayList;
@@ -74,7 +75,7 @@ public void testPipelineHandling() throws IOException {
7475
final List<ReleasableBytesReference> toRelease = new ArrayList<>();
7576
final BiConsumer<TcpChannel, ProtocolInboundMessage> messageHandler = (c, m) -> {
7677
try {
77-
InboundMessage message = (InboundMessage) m;
78+
NativeInboundMessage message = (NativeInboundMessage) m;
7879
final Header header = message.getHeader();
7980
final MessageData actualData;
8081
final Version version = header.getVersion();

0 commit comments

Comments
 (0)