Skip to content

Commit cf68dbd

Browse files
added pausing mock implementation
1 parent 4025ede commit cf68dbd

File tree

2 files changed

+33
-12
lines changed

2 files changed

+33
-12
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,6 @@ void reconnectImpl() throws InterruptedException {
379379
}
380380

381381
writer.setReconnectMode(true);
382-
Thread.sleep(2);
383382

384383
if (!isConnected() && !isClosed() && !this.isClosing()) {
385384
boolean keepGoing = true;
@@ -763,7 +762,6 @@ void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws Int
763762

764763
statusLock.lock();
765764
try {
766-
Thread.sleep(1);
767765
updateStatus(Status.DISCONNECTED);
768766
this.exceptionDuringConnectChange = null; // Ignore IOExceptions during closeSocketImpl()
769767
this.disconnecting = false;
@@ -1061,11 +1059,6 @@ void sendUnsub(NatsSubscription sub, int after) {
10611059
if (after > 0) {
10621060
bab.append(SP).append(after);
10631061
}
1064-
try {
1065-
Thread.sleep(1, 500_000);
1066-
} catch (InterruptedException e) {
1067-
throw new RuntimeException(e);
1068-
}
10691062
queueInternalOutgoing(new ProtocolMessage(bab));
10701063
}
10711064

src/test/java/io/nats/client/impl/AuthViolationDuringReconnect.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
public class AuthViolationDuringReconnect {
1212
private static final ConcurrentHashMap.KeySetView<String, Boolean> subscriptions = ConcurrentHashMap.newKeySet();
1313
private static final ScheduledExecutorService serverRestarter = Executors.newSingleThreadScheduledExecutor();
14-
private static final ExecutorService unsubThreadpool = Executors.newFixedThreadPool(512);
14+
private static final ExecutorService unsubThreadpool = Executors.newFixedThreadPool(2);
1515
private static final AtomicReference<NatsTestServer> ts = new AtomicReference<>();
1616
private static final ErrorListener AUTHORIZATION_VIOLATION_LISTENER = new ErrorListener() {
1717
@Override
@@ -35,7 +35,8 @@ public static void main(String[] args) throws IOException, InterruptedException
3535
ts.set(new NatsTestServer(new String[]{"--auth", "1234", "-m", "8222"}, port, false));
3636

3737
ReconnectedHandler reconnectedHandler = new ReconnectedHandler();
38-
NatsConnection nc = (NatsConnection) Nats.connect(buildOptions(port, reconnectedHandler));
38+
NatsConnection nc = new MockPausingNatsConnection(buildOptions(port, reconnectedHandler));
39+
nc.connect(true);
3940
Dispatcher d = nc.createDispatcher();
4041

4142
reconnectedHandler.setConsumer((ignored) -> subscribe(d));
@@ -49,7 +50,7 @@ public static void main(String[] args) throws IOException, InterruptedException
4950
private static Runnable waitCloseSocket(NatsConnection nc) {
5051
return () -> {
5152
try {
52-
Thread.sleep(1000);
53+
Thread.sleep(200);
5354
} catch (InterruptedException e) {
5455
throw new RuntimeException(e);
5556
}
@@ -76,7 +77,7 @@ private static void restartServer(AtomicReference<NatsTestServer> ts, int port)
7677

7778
private static void subscribe(Dispatcher d) {
7879
latch = new CountDownLatch(1);
79-
for (int i = 0; i < 10_000; i++) {
80+
for (int i = 0; i < 1_000; i++) {
8081
String subject = "test_" + i;
8182
subscriptions.add(subject);
8283
d.subscribe(subject);
@@ -93,7 +94,8 @@ private static void subscribe(Dispatcher d) {
9394

9495
private static Options buildOptions(int port, ReconnectedHandler reconnectedHandler) {
9596
Options.Builder natsOptions = new Options.Builder()
96-
.servers(new String[]{"nats://localhost:" + port})
97+
.servers(new String[]{"nats://incorrect:1111", "nats://localhost:" + port})
98+
.noRandomize()
9799
.token(new char[]{'1', '2', '3', '4'})
98100
.maxReconnects(-1)
99101
.reconnectWait(Duration.ofMillis(2000))
@@ -119,4 +121,30 @@ public void connectionEvent(Connection conn, Events type) {
119121
}
120122
}
121123
}
124+
125+
static class MockPausingNatsConnection extends NatsConnection {
126+
MockPausingNatsConnection(Options options) {
127+
super(options);
128+
}
129+
130+
@Override
131+
void closeSocketImpl(boolean forceClose) {
132+
try {
133+
Thread.sleep(500);
134+
} catch (InterruptedException e) {
135+
throw new RuntimeException(e);
136+
}
137+
super.closeSocketImpl(forceClose);
138+
}
139+
140+
@Override
141+
void sendUnsub(NatsSubscription sub, int after) {
142+
try {
143+
Thread.sleep(1000);
144+
} catch (InterruptedException e) {
145+
throw new RuntimeException(e);
146+
}
147+
super.sendUnsub(sub, after);
148+
}
149+
}
122150
}

0 commit comments

Comments
 (0)