Skip to content

Commit 8faecf0

Browse files
committed
Faster filtering
1 parent 903d68a commit 8faecf0

File tree

6 files changed

+56
-26
lines changed

6 files changed

+56
-26
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ void sendUnsub(NatsSubscription sub, int after) {
10591059
if (after > 0) {
10601060
bab.append(SP).append(after);
10611061
}
1062-
queueOutgoing(new ProtocolMessage(bab));
1062+
queueOutgoing(new ProtocolMessage(bab, true));
10631063
}
10641064

10651065
// Assumes the null/empty checks were handled elsewhere
@@ -1107,7 +1107,7 @@ void sendSubscriptionMessage(String sid, String subject, String queueName, boole
11071107
}
11081108
bab.append(SP).append(sid);
11091109

1110-
ProtocolMessage subMsg = new ProtocolMessage(bab);
1110+
ProtocolMessage subMsg = new ProtocolMessage(bab, false);
11111111

11121112
if (treatAsInternal) {
11131113
queueInternalOutgoing(subMsg);
@@ -1499,7 +1499,7 @@ void sendConnect(NatsUri nuri) throws IOException {
14991499
ByteArrayBuilder bab =
15001500
new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
15011501
.append(CONNECT_SP_BYTES).append(connectOptions);
1502-
queueInternalOutgoing(new ProtocolMessage(bab));
1502+
queueInternalOutgoing(new ProtocolMessage(bab, false));
15031503
} catch (Exception exp) {
15041504
throw new IOException("Error sending connect string", exp);
15051505
}
@@ -1527,7 +1527,7 @@ public Duration RTT() throws IOException {
15271527
pongQueue.add(pongFuture);
15281528
try {
15291529
long time = NatsSystemClock.nanoTime();
1530-
writer.queueInternalMessage(new ProtocolMessage(OP_PING_BYTES));
1530+
writer.queueInternalMessage(new ProtocolMessage(PONG_PROTO));
15311531
pongFuture.get(timeout, TimeUnit.MILLISECONDS);
15321532
return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
15331533
}

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
import java.util.concurrent.locks.ReentrantLock;
3131

3232
import static io.nats.client.support.BuilderBase.bufferAllocSize;
33-
import static io.nats.client.support.NatsConstants.*;
33+
import static io.nats.client.support.NatsConstants.CR;
34+
import static io.nats.client.support.NatsConstants.LF;
3435

3536
class NatsConnectionWriter implements Runnable {
3637
private static final int BUFFER_BLOCK_SIZE = 256;
@@ -105,14 +106,7 @@ Future<Boolean> stop() {
105106
try {
106107
this.outgoing.pause();
107108
this.reconnectOutgoing.pause();
108-
// Clear old ping/pong requests
109-
this.outgoing.filter((msg) ->{
110-
if (msg.isProtocol()) {
111-
ByteArrayBuilder bab = msg.getProtocolBab();
112-
return bab.equals(OP_PING_BYTES) || bab.equals(OP_PONG_BYTES) || bab.equals(OP_UNSUB_BYTES);
113-
}
114-
return false;
115-
});
109+
this.outgoing.filter(NatsMessage::isProtocolFilterOnStop);
116110
}
117111
finally {
118112
this.startStopLock.unlock();
@@ -155,7 +149,7 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st
155149
sendBuffer[sendPosition++] = CR;
156150
sendBuffer[sendPosition++] = LF;
157151

158-
if (!msg.isProtocol()) { // because a protocol message does not have headers
152+
if (!msg.isProtocol()) { // because a protocol message does not have headers or data
159153
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);
160154

161155
byte[] bytes = msg.getData(); // guaranteed to not be null

src/main/java/io/nats/client/impl/NatsMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ boolean isProtocol() {
107107
return false; // overridden in NatsMessage.ProtocolMessage
108108
}
109109

110+
boolean isProtocolFilterOnStop() {
111+
return false; // overridden in NatsMessage.ProtocolMessage
112+
}
113+
110114
private static final Headers EMPTY_READ_ONLY = new Headers(null, true, null);
111115

112116
protected void calculate() {

src/main/java/io/nats/client/impl/ProtocolMessage.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,39 @@
1919
// Protocol message is a special version of a NatsPublishableMessage extends NatsMessage
2020
// ----------------------------------------------------------------------------------------------------
2121
class ProtocolMessage extends NatsPublishableMessage {
22+
final boolean filterOnStop;
2223

23-
ProtocolMessage(ByteArrayBuilder babProtocol) {
24+
ProtocolMessage(ByteArrayBuilder babProtocol, boolean filterOnStop) {
2425
super(false);
2526
protocolBab = babProtocol;
2627
sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data
28+
this.filterOnStop = filterOnStop;
2729
}
2830

2931
ProtocolMessage(byte[] protocol) {
3032
super(false);
3133
protocolBab = new ByteArrayBuilder(protocol);
3234
sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data
35+
this.filterOnStop = true;
3336
}
3437

3538
ProtocolMessage(ProtocolMessage pm) {
3639
super(false);
3740
protocolBab = pm.protocolBab;
3841
sizeInBytes = controlLineLength = pm.sizeInBytes;
42+
filterOnStop = pm.filterOnStop;
3943
}
4044

4145
@Override
4246
boolean isProtocol() {
4347
return true;
4448
}
4549

50+
@Override
51+
boolean isProtocolFilterOnStop() {
52+
return filterOnStop;
53+
}
54+
4655
@Override
4756
int copyNotEmptyHeaders(int destPosition, byte[] dest) {
4857
return 0; // until a protocol messages gets headers, might as well shortcut this.

src/main/java/io/nats/client/support/NatsConstants.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public interface NatsConstants {
7070

7171
byte[] OP_PING_BYTES = OP_PING.getBytes();
7272
byte[] OP_PONG_BYTES = OP_PONG.getBytes();
73-
byte[] OP_UNSUB_BYTES = OP_UNSUB.getBytes();
7473

7574
byte[] PUB_SP_BYTES = (OP_PUB + SPACE).getBytes(US_ASCII);
7675
byte[] HPUB_SP_BYTES = (OP_HPUB + SPACE).getBytes(US_ASCII);

src/test/java/io/nats/client/impl/NatsMessageTests.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@
2222
import java.time.Duration;
2323
import java.util.List;
2424

25+
import static io.nats.client.support.NatsConstants.OP_PING;
26+
import static io.nats.client.support.NatsConstants.OP_PING_BYTES;
2527
import static io.nats.client.utils.ResourceUtils.dataAsLines;
2628
import static org.junit.jupiter.api.Assertions.*;
2729

2830
public class NatsMessageTests extends JetStreamTestBase {
2931
@Test
3032
public void testSizeOnProtocolMessage() {
31-
NatsMessage msg = new ProtocolMessage("PING".getBytes());
33+
NatsMessage msg = new ProtocolMessage(OP_PING_BYTES);
3234
assertEquals(msg.getProtocolBytes().length + 2, msg.getSizeInBytes(), "Size is set, with CRLF");
33-
assertEquals("PING".getBytes(StandardCharsets.UTF_8).length + 2, msg.getSizeInBytes(), "Size is correct");
34-
assertTrue(msg.toString().endsWith("PING")); // toString COVERAGE
35+
assertEquals(OP_PING_BYTES.length + 2, msg.getSizeInBytes(), "Size is correct");
36+
assertTrue(msg.toString().endsWith(OP_PING)); // toString COVERAGE
3537
}
3638

3739
@Test
@@ -192,6 +194,8 @@ public void miscCoverage() {
192194
assertFalse(m.isStatusMessage());
193195
assertNotNull(m.toString());
194196
assertNotNull(m.toDetailString());
197+
assertFalse(m.isProtocol());
198+
assertFalse(m.isProtocolFilterOnStop());
195199

196200
m = NatsMessage.builder()
197201
.subject("test").replyTo("reply")
@@ -230,8 +234,7 @@ public void miscCoverage() {
230234
m = testMessage();
231235
assertTrue(m.hasHeaders());
232236
assertNotNull(m.getHeaders());
233-
//noinspection deprecation
234-
assertFalse(m.isUtf8mode()); // coverage, ALWAYS FALSE SINCE DEPRECATED
237+
assertFalse(m.isUtf8mode()); // coverage, ALWAYS FALSE SINCE DISUSED
235238
assertFalse(m.getHeaders().isEmpty());
236239
assertNull(m.getSubscription());
237240
assertNull(m.getNatsSubscription());
@@ -244,17 +247,29 @@ public void miscCoverage() {
244247
assertNull(m.getHeaders());
245248
assertNotNull(m.toString()); // COVERAGE
246249

247-
ProtocolMessage pm = new ProtocolMessage(new byte[0]);
248-
assertNotNull(pm.getProtocolBab());
249-
assertEquals(0, pm.getProtocolBab().length());
250-
assertEquals(2, pm.getSizeInBytes());
251-
assertEquals(2, pm.getControlLineLength());
250+
ProtocolMessage pmFilterOnStop = new ProtocolMessage(new byte[0]);
251+
ProtocolMessage pmNotFilterOnStop = new ProtocolMessage(pmFilterOnStop.getProtocolBab(), false);
252+
253+
validateProto(pmFilterOnStop, true);
254+
validateProto(pmNotFilterOnStop, false);
255+
256+
// retains filter on stop
257+
validateProto(new ProtocolMessage(pmFilterOnStop), true);
258+
validateProto(new ProtocolMessage(pmNotFilterOnStop), false);
259+
260+
// sets filter on stop
261+
validateProto(new ProtocolMessage(pmFilterOnStop.getProtocolBab(), true), true);
262+
validateProto(new ProtocolMessage(pmFilterOnStop.getProtocolBab(), false), false);
263+
validateProto(new ProtocolMessage(pmNotFilterOnStop.getProtocolBab(), true), true);
264+
validateProto(new ProtocolMessage(pmNotFilterOnStop.getProtocolBab(), false), false);
252265

253266
IncomingMessage scm = new IncomingMessage() {};
254267
assertEquals(0, scm.getSizeInBytes());
255268
assertThrows(IllegalStateException.class, scm::getProtocolBab);
256269
assertThrows(IllegalStateException.class, scm::getProtocolBytes);
257270
assertThrows(IllegalStateException.class, scm::getControlLineLength);
271+
assertFalse(scm.isProtocol());
272+
assertFalse(scm.isProtocolFilterOnStop());
258273

259274
// coverage coverage coverage
260275
//noinspection deprecation
@@ -264,6 +279,15 @@ public void miscCoverage() {
264279
assertTrue(nmCov.toDetailString().contains("PUB sub reply 0"));
265280
}
266281

282+
private static void validateProto(ProtocolMessage pm, boolean isProtocolFilterOnStop) {
283+
assertNotNull(pm.getProtocolBab());
284+
assertEquals(0, pm.getProtocolBab().length());
285+
assertEquals(2, pm.getSizeInBytes());
286+
assertEquals(2, pm.getControlLineLength());
287+
assertTrue(pm.isProtocol());
288+
assertEquals(isProtocolFilterOnStop, pm.isProtocolFilterOnStop());
289+
}
290+
267291
@Test
268292
public void constructorWithMessage() {
269293
NatsMessage m = testMessage();

0 commit comments

Comments
 (0)