6
6
import java .time .Duration ;
7
7
import java .util .concurrent .*;
8
8
import java .util .concurrent .atomic .AtomicReference ;
9
- import java .util .concurrent .locks .LockSupport ;
10
9
11
10
/* Program to reproduce #1320 */
12
11
public class AuthViolationDuringReconnect {
13
12
private static final ConcurrentHashMap .KeySetView <String , Boolean > subscriptions = ConcurrentHashMap .newKeySet ();
14
13
private static final ScheduledExecutorService serverRestarter = Executors .newSingleThreadScheduledExecutor ();
15
- private static final ExecutorService unsubThreadpool = Executors .newFixedThreadPool (64 );
14
+ private static final ExecutorService unsubThreadpool = Executors .newFixedThreadPool (512 );
16
15
private static final AtomicReference <NatsTestServer > ts = new AtomicReference <>();
17
16
private static final ErrorListener AUTHORIZATION_VIOLATION_LISTENER = new ErrorListener () {
18
17
@ Override
@@ -42,25 +41,25 @@ public static void main(String[] args) throws IOException, InterruptedException
42
41
reconnectedHandler .setConsumer ((ignored ) -> subscribe (d ));
43
42
subscribe (d );
44
43
45
- serverRestarter .scheduleWithFixedDelay (() -> restartServer (ts , port ), 2 , 1 , TimeUnit .SECONDS );
44
+ serverRestarter .scheduleWithFixedDelay (() -> restartServer (ts , port ), 2000 , 3000 , TimeUnit .MILLISECONDS );
46
45
47
46
new Thread (waitCloseSocket (nc )).start ();
48
47
}
49
48
50
49
private static Runnable waitCloseSocket (NatsConnection nc ) {
51
50
return () -> {
51
+ try {
52
+ Thread .sleep (1000 );
53
+ } catch (InterruptedException e ) {
54
+ throw new RuntimeException (e );
55
+ }
52
56
while (true ) {
53
57
if (nc .closeSocketLock .isLocked ()) {
54
- try {
55
- System .out .printf ("Unsubscribing all subscriptions due to disconnection %d \n " , subscriptions .size ());
56
- latch .countDown ();
57
- Thread .sleep (500 );
58
- } catch (InterruptedException e ) {
59
- throw new RuntimeException (e );
58
+ System .out .printf ("Unsubscribing all subscriptions due to disconnection %d \n " , subscriptions .size ());
59
+ latch .countDown ();
60
+ while (nc .closeSocketLock .isLocked ()) {
60
61
}
61
-
62
62
}
63
- LockSupport .parkNanos (5 );
64
63
}
65
64
};
66
65
}
@@ -76,7 +75,7 @@ private static void restartServer(AtomicReference<NatsTestServer> ts, int port)
76
75
77
76
private static void subscribe (Dispatcher d ) {
78
77
latch = new CountDownLatch (1 );
79
- for (int i = 0 ; i < 300_000 ; i ++) {
78
+ for (int i = 0 ; i < 500_000 ; i ++) {
80
79
String subject = "test_" + i ;
81
80
subscriptions .add (subject );
82
81
d .subscribe (subject );
@@ -96,7 +95,7 @@ private static Options buildOptions(int port, ReconnectedHandler reconnectedHand
96
95
.servers (new String []{"nats://localhost:" + port })
97
96
.token (new char []{'1' , '2' , '3' , '4' })
98
97
.maxReconnects (-1 )
99
- .reconnectWait (Duration .ofMillis (200 ))
98
+ .reconnectWait (Duration .ofMillis (2000 ))
100
99
.connectionTimeout (Duration .ofMillis (500 ))
101
100
.connectionListener (reconnectedHandler )
102
101
.errorListener (AUTHORIZATION_VIOLATION_LISTENER );
0 commit comments