Skip to content

Commit 07883e7

Browse files
Add test for auth violations during reconnect
Introduce a new test case to reproduce issue #1320, focusing on handling authorization violations during reconnections. It also makes the `closeSocketLock` in `NatsConnection` package-private to facilitate testing.
1 parent 51c6aa1 commit 07883e7

File tree

2 files changed

+123
-1
lines changed

2 files changed

+123
-1
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class NatsConnection implements Connection {
5555
private boolean disconnecting; // you can only disconnect in one thread
5656
private boolean closing; // respect a close call regardless
5757
private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
58-
private final ReentrantLock closeSocketLock;
58+
final ReentrantLock closeSocketLock;
5959

6060
private Status status;
6161
private final ReentrantLock statusLock;
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package io.nats.client.impl;
2+
3+
import io.nats.client.*;
4+
5+
import java.io.IOException;
6+
import java.time.Duration;
7+
import java.util.concurrent.*;
8+
import java.util.concurrent.atomic.AtomicReference;
9+
import java.util.concurrent.locks.LockSupport;
10+
11+
/* Program to reproduce #1320 */
12+
public class AuthViolationDuringReconnect {
13+
private static final ConcurrentHashMap.KeySetView<String, Boolean> subscriptions = ConcurrentHashMap.newKeySet();
14+
private static final ScheduledExecutorService serverRestarter = Executors.newSingleThreadScheduledExecutor();
15+
private static final ExecutorService unsubThreadpool = Executors.newFixedThreadPool(64);
16+
private static final AtomicReference<NatsTestServer> ts = new AtomicReference<>();
17+
private static final ErrorListener AUTHORIZATION_VIOLATION_LISTENER = new ErrorListener() {
18+
@Override
19+
public void errorOccurred(Connection conn, String error) {
20+
if (error.contains("Authorization Violation")) {
21+
System.out.println("Authorization Violation, Stopping server");
22+
try {
23+
Thread.sleep(1000);
24+
ts.get().shutdown();
25+
} catch (InterruptedException e) {
26+
throw new RuntimeException(e);
27+
}
28+
System.exit(-1);
29+
}
30+
}
31+
};
32+
private static volatile CountDownLatch latch;
33+
34+
public static void main(String[] args) throws IOException, InterruptedException {
35+
int port = NatsTestServer.nextPort();
36+
ts.set(new NatsTestServer(new String[]{"--auth", "1234", "-m", "8222"}, port, false));
37+
38+
ReconnectedHandler reconnectedHandler = new ReconnectedHandler();
39+
NatsConnection nc = (NatsConnection) Nats.connect(buildOptions(port, reconnectedHandler));
40+
Dispatcher d = nc.createDispatcher();
41+
42+
reconnectedHandler.setConsumer((ignored) -> subscribe(d));
43+
subscribe(d);
44+
45+
serverRestarter.scheduleWithFixedDelay(() -> restartServer(ts, port), 2, 1, TimeUnit.SECONDS);
46+
47+
new Thread(waitCloseSocket(nc)).start();
48+
}
49+
50+
private static Runnable waitCloseSocket(NatsConnection nc) {
51+
return () -> {
52+
while (true) {
53+
if (nc.closeSocketLock.isLocked()) {
54+
try {
55+
System.out.printf("Unsubscribing all subscriptions due to disconnection %d \n", subscriptions.size());
56+
latch.countDown();
57+
Thread.sleep(500);
58+
} catch (InterruptedException e) {
59+
throw new RuntimeException(e);
60+
}
61+
62+
}
63+
LockSupport.parkNanos(5);
64+
}
65+
};
66+
}
67+
68+
private static void restartServer(AtomicReference<NatsTestServer> ts, int port) {
69+
try {
70+
ts.get().shutdown();
71+
ts.set(new NatsTestServer(new String[]{"--auth", "1234", "-m", "8222"}, port, false));
72+
} catch (Exception e) {
73+
throw new RuntimeException(e);
74+
}
75+
}
76+
77+
private static void subscribe(Dispatcher d) {
78+
latch = new CountDownLatch(1);
79+
for (int i = 0; i < 300_000; i++) {
80+
String subject = "test_" + i;
81+
subscriptions.add(subject);
82+
d.subscribe(subject);
83+
unsubThreadpool.execute(() -> {
84+
try {
85+
latch.await();
86+
d.unsubscribe(subject);
87+
} catch (InterruptedException e) {
88+
throw new RuntimeException(e);
89+
}
90+
});
91+
}
92+
}
93+
94+
private static Options buildOptions(int port, ReconnectedHandler reconnectedHandler) {
95+
Options.Builder natsOptions = new Options.Builder()
96+
.servers(new String[]{"nats://localhost:" + port})
97+
.token(new char[]{'1', '2', '3', '4'})
98+
.maxReconnects(-1)
99+
.reconnectWait(Duration.ofMillis(200))
100+
.connectionTimeout(Duration.ofMillis(500))
101+
.connectionListener(reconnectedHandler)
102+
.errorListener(AUTHORIZATION_VIOLATION_LISTENER);
103+
104+
return natsOptions.build();
105+
}
106+
107+
private static class ReconnectedHandler implements ConnectionListener {
108+
109+
private java.util.function.Consumer<Void> consumer;
110+
111+
public void setConsumer(java.util.function.Consumer<Void> consumer) {
112+
this.consumer = consumer;
113+
}
114+
115+
@Override
116+
public void connectionEvent(Connection conn, Events type) {
117+
if (type == Events.RECONNECTED) {
118+
consumer.accept(null);
119+
}
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)