Skip to content

Commit 5ad6ebb

Browse files
authored
Merge pull request #1328 from ajax-surovskyi-y/auth_violation_reproduce_program
Accepting - no production code, just testing code. Can always be removed later.
2 parents 5fb9f42 + 5fdc912 commit 5ad6ebb

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class NatsConnection implements Connection {
5555
private boolean disconnecting; // you can only disconnect in one thread
5656
private boolean closing; // respect a close call regardless
5757
private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
58-
private final ReentrantLock closeSocketLock;
58+
final ReentrantLock closeSocketLock;
5959

6060
private Status status;
6161
private final ReentrantLock statusLock;
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package io.nats.client.impl;
2+
3+
import io.nats.client.*;
4+
5+
import java.io.IOException;
6+
import java.time.Duration;
7+
import java.util.concurrent.*;
8+
import java.util.concurrent.atomic.AtomicReference;
9+
10+
/* Program to reproduce #1320 */
11+
public class AuthViolationDuringReconnect {
12+
private static final ConcurrentHashMap.KeySetView<String, Boolean> subscriptions = ConcurrentHashMap.newKeySet();
13+
private static final ScheduledExecutorService serverRestarter = Executors.newSingleThreadScheduledExecutor();
14+
private static final ExecutorService unsubThreadpool = Executors.newFixedThreadPool(2);
15+
private static final AtomicReference<NatsTestServer> ts = new AtomicReference<>();
16+
private static final ErrorListener AUTHORIZATION_VIOLATION_LISTENER = new ErrorListener() {
17+
@Override
18+
public void errorOccurred(Connection conn, String error) {
19+
if (error.contains("Authorization Violation")) {
20+
System.out.println("Authorization Violation, Stopping server");
21+
try {
22+
Thread.sleep(1000);
23+
ts.get().shutdown();
24+
} catch (InterruptedException e) {
25+
throw new RuntimeException(e);
26+
}
27+
System.exit(-1);
28+
}
29+
}
30+
};
31+
private static volatile CountDownLatch latch;
32+
33+
public static void main(String[] args) throws IOException, InterruptedException {
34+
int port = NatsTestServer.nextPort();
35+
ts.set(new NatsTestServer(new String[]{"--auth", "1234", "-m", "8222"}, port, false));
36+
37+
ReconnectedHandler reconnectedHandler = new ReconnectedHandler();
38+
NatsConnection nc = new MockPausingNatsConnection(buildOptions(port, reconnectedHandler));
39+
nc.connect(true);
40+
Dispatcher d = nc.createDispatcher();
41+
42+
reconnectedHandler.setConsumer((ignored) -> subscribe(d));
43+
subscribe(d);
44+
45+
serverRestarter.scheduleWithFixedDelay(() -> restartServer(ts, port), 2000, 3000, TimeUnit.MILLISECONDS);
46+
47+
new Thread(waitCloseSocket(nc)).start();
48+
}
49+
50+
private static Runnable waitCloseSocket(NatsConnection nc) {
51+
return () -> {
52+
try {
53+
Thread.sleep(200);
54+
} catch (InterruptedException e) {
55+
throw new RuntimeException(e);
56+
}
57+
while (true) {
58+
if (nc.closeSocketLock.isLocked()) {
59+
System.out.printf("Unsubscribing all subscriptions due to disconnection %d \n", subscriptions.size());
60+
latch.countDown();
61+
while (nc.closeSocketLock.isLocked()) {
62+
}
63+
}
64+
}
65+
};
66+
}
67+
68+
private static void restartServer(AtomicReference<NatsTestServer> ts, int port) {
69+
try {
70+
System.out.println("Restarting server");
71+
ts.get().shutdown();
72+
ts.set(new NatsTestServer(new String[]{"--auth", "1234", "-m", "8222"}, port, false));
73+
} catch (Exception e) {
74+
throw new RuntimeException(e);
75+
}
76+
}
77+
78+
private static void subscribe(Dispatcher d) {
79+
latch = new CountDownLatch(1);
80+
for (int i = 0; i < 1_000; i++) {
81+
String subject = "test_" + i;
82+
subscriptions.add(subject);
83+
d.subscribe(subject);
84+
unsubThreadpool.execute(() -> {
85+
try {
86+
latch.await();
87+
d.unsubscribe(subject);
88+
} catch (InterruptedException e) {
89+
throw new RuntimeException(e);
90+
}
91+
});
92+
}
93+
}
94+
95+
private static Options buildOptions(int port, ReconnectedHandler reconnectedHandler) {
96+
Options.Builder natsOptions = new Options.Builder()
97+
.servers(new String[]{"nats://incorrect:1111", "nats://localhost:" + port})
98+
.noRandomize()
99+
.token(new char[]{'1', '2', '3', '4'})
100+
.maxReconnects(-1)
101+
.reconnectWait(Duration.ofMillis(2000))
102+
.connectionTimeout(Duration.ofMillis(500))
103+
.connectionListener(reconnectedHandler)
104+
.errorListener(AUTHORIZATION_VIOLATION_LISTENER);
105+
106+
return natsOptions.build();
107+
}
108+
109+
private static class ReconnectedHandler implements ConnectionListener {
110+
111+
private java.util.function.Consumer<Void> consumer;
112+
113+
public void setConsumer(java.util.function.Consumer<Void> consumer) {
114+
this.consumer = consumer;
115+
}
116+
117+
@Override
118+
public void connectionEvent(Connection conn, Events type) {
119+
if (type == Events.RECONNECTED) {
120+
consumer.accept(null);
121+
}
122+
}
123+
}
124+
125+
static class MockPausingNatsConnection extends NatsConnection {
126+
MockPausingNatsConnection(Options options) {
127+
super(options);
128+
}
129+
130+
@Override
131+
void closeSocketImpl(boolean forceClose) {
132+
try {
133+
Thread.sleep(500);
134+
} catch (InterruptedException e) {
135+
throw new RuntimeException(e);
136+
}
137+
super.closeSocketImpl(forceClose);
138+
}
139+
140+
@Override
141+
void sendUnsub(NatsSubscription sub, int after) {
142+
try {
143+
Thread.sleep(1000);
144+
} catch (InterruptedException e) {
145+
throw new RuntimeException(e);
146+
}
147+
super.sendUnsub(sub, after);
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)