Skip to content

Commit 60b9a96

Browse files
committed
[BUG] Fix #1320
1 parent 5ad6ebb commit 60b9a96

File tree

5 files changed

+179
-154
lines changed

5 files changed

+179
-154
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ void sendUnsub(NatsSubscription sub, int after) {
10591059
if (after > 0) {
10601060
bab.append(SP).append(after);
10611061
}
1062-
queueInternalOutgoing(new ProtocolMessage(bab));
1062+
queueOutgoing(new ProtocolMessage(bab));
10631063
}
10641064

10651065
// Assumes the null/empty checks were handled elsewhere

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,13 @@ Future<Boolean> stop() {
106106
this.outgoing.pause();
107107
this.reconnectOutgoing.pause();
108108
// Clear old ping/pong requests
109-
this.outgoing.filter((msg) ->
110-
msg.isProtocol() &&
111-
(msg.getProtocolBab().equals(OP_PING_BYTES) || msg.getProtocolBab().equals(OP_PONG_BYTES)));
109+
this.outgoing.filter((msg) ->{
110+
if (msg.isProtocol()) {
111+
ByteArrayBuilder bab = msg.getProtocolBab();
112+
return bab.equals(OP_PING_BYTES) || bab.equals(OP_PONG_BYTES) || bab.equals(OP_UNSUB_BYTES);
113+
}
114+
return false;
115+
});
112116
}
113117
finally {
114118
this.startStopLock.unlock();

src/main/java/io/nats/client/support/NatsConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public interface NatsConstants {
7070

7171
byte[] OP_PING_BYTES = OP_PING.getBytes();
7272
byte[] OP_PONG_BYTES = OP_PONG.getBytes();
73+
byte[] OP_UNSUB_BYTES = OP_UNSUB.getBytes();
7374

7475
byte[] PUB_SP_BYTES = (OP_PUB + SPACE).getBytes(US_ASCII);
7576
byte[] HPUB_SP_BYTES = (OP_HPUB + SPACE).getBytes(US_ASCII);

src/test/java/io/nats/client/impl/AuthViolationDuringReconnect.java

Lines changed: 0 additions & 150 deletions
This file was deleted.
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package io.nats.client.impl;
2+
3+
import io.nats.client.*;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.io.IOException;
7+
import java.time.Duration;
8+
import java.util.concurrent.*;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
13+
import static org.junit.jupiter.api.Assertions.assertFalse;
14+
15+
/* Test to reproduce #1320 */
16+
public class AuthViolationDuringReconnectTest {
17+
18+
static class Context {
19+
int port;
20+
NatsConnection nc;
21+
Dispatcher d;
22+
CountDownLatch latch;
23+
ConcurrentHashMap.KeySetView<String, Boolean> subscriptions = ConcurrentHashMap.newKeySet();
24+
ScheduledExecutorService serverRestarter = Executors.newSingleThreadScheduledExecutor();
25+
ExecutorService unsubThreadpool = Executors.newFixedThreadPool(2);
26+
AtomicReference<NatsTestServer> ts = new AtomicReference<>();
27+
AtomicBoolean violated = new AtomicBoolean(false);
28+
AtomicInteger restartsLeft = new AtomicInteger(10);
29+
ErrorListener errorListener = new ErrorListener() {
30+
@Override
31+
public void errorOccurred(Connection conn, String error) {
32+
if (error.contains("Authorization Violation")) {
33+
// System.out.println("Authorization Violation");
34+
violated.set(true);
35+
}
36+
}
37+
};
38+
}
39+
40+
@Test
41+
public void testAuthViolationDuringReconnect() throws Exception {
42+
Context ctx = new Context();
43+
ctx.port = NatsTestServer.nextPort();
44+
45+
startNatsServer(ctx);
46+
47+
ctx.nc = new MockPausingNatsConnection(buildOptions(ctx));
48+
ctx.nc.connect(true);
49+
ctx.d = ctx.nc.createDispatcher();
50+
subscribe(ctx);
51+
52+
ctx.serverRestarter.scheduleWithFixedDelay(() -> restartServer(ctx), 2000, 3000, TimeUnit.MILLISECONDS);
53+
54+
Thread t = new Thread(waitCloseSocket(ctx));
55+
t.start();
56+
t.join();
57+
58+
assertFalse(ctx.violated.get());
59+
ctx.ts.get().shutdown();
60+
}
61+
62+
private static Runnable waitCloseSocket(Context ctx) {
63+
return () -> {
64+
try {
65+
Thread.sleep(200);
66+
} catch (InterruptedException e) {
67+
throw new RuntimeException(e);
68+
}
69+
while (!ctx.violated.get() && ctx.restartsLeft.get() > 0) {
70+
if (ctx.nc.closeSocketLock.isLocked()) {
71+
// System.out.printf("Unsubscribing all subscriptions due to disconnection %d \n", ctx.subscriptions.size());
72+
ctx.latch.countDown();
73+
// just acquire the lock and release it
74+
try {
75+
ctx.nc.closeSocketLock.lock();
76+
}
77+
finally {
78+
ctx.nc.closeSocketLock.unlock();
79+
}
80+
}
81+
}
82+
};
83+
}
84+
85+
private static void restartServer(Context ctx) {
86+
try {
87+
ctx.restartsLeft.decrementAndGet();
88+
// System.out.println("Restarting server " + ctx.restartsLeft.get());
89+
ctx.ts.get().shutdown();
90+
startNatsServer(ctx);
91+
} catch (Exception e) {
92+
throw new RuntimeException(e);
93+
}
94+
}
95+
96+
private static void startNatsServer(Context ctx) throws IOException {
97+
ctx.ts.set(new NatsTestServer(new String[]{"--auth", "1234"}, ctx.port, false));
98+
}
99+
100+
private static void subscribe(Context ctx) {
101+
ctx.latch = new CountDownLatch(1);
102+
for (int i = 0; i < 1_000; i++) {
103+
String subject = "test_" + i;
104+
ctx.subscriptions.add(subject);
105+
ctx.d.subscribe(subject);
106+
ctx.unsubThreadpool.execute(() -> {
107+
try {
108+
ctx.latch.await();
109+
ctx.d.unsubscribe(subject);
110+
} catch (InterruptedException e) {
111+
throw new RuntimeException(e);
112+
}
113+
});
114+
}
115+
}
116+
117+
private static Options buildOptions(Context ctx) {
118+
Options.Builder natsOptions = new Options.Builder()
119+
.servers(new String[]{"nats://incorrect:1111", "nats://localhost:" + ctx.port})
120+
.noRandomize()
121+
.token(new char[]{'1', '2', '3', '4'})
122+
.reconnectWait(Duration.ofMillis(2000))
123+
.connectionTimeout(Duration.ofMillis(500))
124+
.errorListener(ctx.errorListener);
125+
126+
return natsOptions.build();
127+
}
128+
129+
private static class ReconnectedHandler implements ConnectionListener {
130+
131+
private java.util.function.Consumer<Void> consumer;
132+
133+
public void setConsumer(java.util.function.Consumer<Void> consumer) {
134+
this.consumer = consumer;
135+
}
136+
137+
@Override
138+
public void connectionEvent(Connection conn, Events type) {
139+
if (type == Events.RECONNECTED) {
140+
consumer.accept(null);
141+
}
142+
}
143+
}
144+
145+
static class MockPausingNatsConnection extends NatsConnection {
146+
MockPausingNatsConnection(Options options) {
147+
super(options);
148+
}
149+
150+
@Override
151+
void closeSocketImpl(boolean forceClose) {
152+
try {
153+
Thread.sleep(500);
154+
} catch (InterruptedException e) {
155+
throw new RuntimeException(e);
156+
}
157+
super.closeSocketImpl(forceClose);
158+
}
159+
160+
@Override
161+
void sendUnsub(NatsSubscription sub, int after) {
162+
try {
163+
Thread.sleep(1000);
164+
} catch (InterruptedException e) {
165+
throw new RuntimeException(e);
166+
}
167+
super.sendUnsub(sub, after);
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)