Skip to content

Commit f5b1042

Browse files
authored
Merge pull request #1336 from nats-io/fix-1320
[BUG] Fix UNSUBs after disconnect can cause auth violations
2 parents 5ad6ebb + 7e8333a commit f5b1042

File tree

7 files changed

+213
-172
lines changed

7 files changed

+213
-172
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ void sendUnsub(NatsSubscription sub, int after) {
10591059
if (after > 0) {
10601060
bab.append(SP).append(after);
10611061
}
1062-
queueInternalOutgoing(new ProtocolMessage(bab));
1062+
queueOutgoing(new ProtocolMessage(bab, true));
10631063
}
10641064

10651065
// Assumes the null/empty checks were handled elsewhere
@@ -1107,8 +1107,11 @@ void sendSubscriptionMessage(String sid, String subject, String queueName, boole
11071107
}
11081108
bab.append(SP).append(sid);
11091109

1110-
ProtocolMessage subMsg = new ProtocolMessage(bab);
1111-
1110+
// setting this to filter on stop.
1111+
// if it's an "internal" message, it won't be filtered
1112+
// if it's a normal message, the subscription will already be registered
1113+
// and therefore will be re-subscribed after a stop anyway
1114+
ProtocolMessage subMsg = new ProtocolMessage(bab, true);
11121115
if (treatAsInternal) {
11131116
queueInternalOutgoing(subMsg);
11141117
} else {
@@ -1499,7 +1502,7 @@ void sendConnect(NatsUri nuri) throws IOException {
14991502
ByteArrayBuilder bab =
15001503
new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
15011504
.append(CONNECT_SP_BYTES).append(connectOptions);
1502-
queueInternalOutgoing(new ProtocolMessage(bab));
1505+
queueInternalOutgoing(new ProtocolMessage(bab, false));
15031506
} catch (Exception exp) {
15041507
throw new IOException("Error sending connect string", exp);
15051508
}
@@ -1527,7 +1530,7 @@ public Duration RTT() throws IOException {
15271530
pongQueue.add(pongFuture);
15281531
try {
15291532
long time = NatsSystemClock.nanoTime();
1530-
writer.queueInternalMessage(new ProtocolMessage(OP_PING_BYTES));
1533+
writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
15311534
pongFuture.get(timeout, TimeUnit.MILLISECONDS);
15321535
return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
15331536
}

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
import java.util.concurrent.locks.ReentrantLock;
3131

3232
import static io.nats.client.support.BuilderBase.bufferAllocSize;
33-
import static io.nats.client.support.NatsConstants.*;
33+
import static io.nats.client.support.NatsConstants.CR;
34+
import static io.nats.client.support.NatsConstants.LF;
3435

3536
class NatsConnectionWriter implements Runnable {
3637
private static final int BUFFER_BLOCK_SIZE = 256;
@@ -105,10 +106,7 @@ Future<Boolean> stop() {
105106
try {
106107
this.outgoing.pause();
107108
this.reconnectOutgoing.pause();
108-
// Clear old ping/pong requests
109-
this.outgoing.filter((msg) ->
110-
msg.isProtocol() &&
111-
(msg.getProtocolBab().equals(OP_PING_BYTES) || msg.getProtocolBab().equals(OP_PONG_BYTES)));
109+
this.outgoing.filter(NatsMessage::isProtocolFilterOnStop);
112110
}
113111
finally {
114112
this.startStopLock.unlock();
@@ -151,7 +149,7 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st
151149
sendBuffer[sendPosition++] = CR;
152150
sendBuffer[sendPosition++] = LF;
153151

154-
if (!msg.isProtocol()) { // because a protocol message does not have headers
152+
if (!msg.isProtocol()) { // because a protocol message does not have headers or data
155153
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);
156154

157155
byte[] bytes = msg.getData(); // guaranteed to not be null

src/main/java/io/nats/client/impl/NatsMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ boolean isProtocol() {
107107
return false; // overridden in NatsMessage.ProtocolMessage
108108
}
109109

110+
boolean isProtocolFilterOnStop() {
111+
return false; // overridden in NatsMessage.ProtocolMessage
112+
}
113+
110114
private static final Headers EMPTY_READ_ONLY = new Headers(null, true, null);
111115

112116
protected void calculate() {

src/main/java/io/nats/client/impl/ProtocolMessage.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,39 @@
1919
// Protocol message is a special version of a NatsPublishableMessage extends NatsMessage
2020
// ----------------------------------------------------------------------------------------------------
2121
class ProtocolMessage extends NatsPublishableMessage {
22+
final boolean filterOnStop;
2223

23-
ProtocolMessage(ByteArrayBuilder babProtocol) {
24+
ProtocolMessage(ByteArrayBuilder babProtocol, boolean filterOnStop) {
2425
super(false);
2526
protocolBab = babProtocol;
2627
sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data
28+
this.filterOnStop = filterOnStop;
2729
}
2830

2931
ProtocolMessage(byte[] protocol) {
3032
super(false);
3133
protocolBab = new ByteArrayBuilder(protocol);
3234
sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data
35+
this.filterOnStop = true;
3336
}
3437

3538
ProtocolMessage(ProtocolMessage pm) {
3639
super(false);
3740
protocolBab = pm.protocolBab;
3841
sizeInBytes = controlLineLength = pm.sizeInBytes;
42+
filterOnStop = pm.filterOnStop;
3943
}
4044

4145
@Override
4246
boolean isProtocol() {
4347
return true;
4448
}
4549

50+
@Override
51+
boolean isProtocolFilterOnStop() {
52+
return filterOnStop;
53+
}
54+
4655
@Override
4756
int copyNotEmptyHeaders(int destPosition, byte[] dest) {
4857
return 0; // until a protocol messages gets headers, might as well shortcut this.

src/test/java/io/nats/client/impl/AuthViolationDuringReconnect.java

Lines changed: 0 additions & 150 deletions
This file was deleted.

0 commit comments

Comments
 (0)