15
15
/* Test to reproduce #1320 */
16
16
public class AuthViolationDuringReconnectTest {
17
17
18
- static class Context {
18
+ static class Context implements AutoCloseable {
19
19
int port ;
20
20
NatsConnection nc ;
21
21
Dispatcher d ;
22
22
CountDownLatch latch ;
23
23
ConcurrentHashMap .KeySetView <String , Boolean > subscriptions = ConcurrentHashMap .newKeySet ();
24
24
ScheduledExecutorService serverRestarter = Executors .newSingleThreadScheduledExecutor ();
25
- ExecutorService unsubThreadpool = Executors .newFixedThreadPool (2 );
26
- AtomicReference <NatsTestServer > ts = new AtomicReference <>();
25
+ ExecutorService unsubThreadPool = Executors .newFixedThreadPool (2 );
26
+ AtomicReference <NatsTestServer > server = new AtomicReference <>();
27
27
AtomicBoolean violated = new AtomicBoolean (false );
28
28
AtomicInteger restartsLeft = new AtomicInteger (10 );
29
29
ErrorListener errorListener = new ErrorListener () {
30
30
@ Override
31
31
public void errorOccurred (Connection conn , String error ) {
32
32
if (error .contains ("Authorization Violation" )) {
33
- // System.out.println("Authorization Violation");
34
33
violated .set (true );
35
34
}
36
35
}
37
36
};
37
+
38
+ @ Override
39
+ public void close () throws Exception {
40
+ serverRestarter .shutdown ();
41
+ unsubThreadPool .shutdown ();
42
+ server .get ().shutdown ();
43
+ }
38
44
}
39
45
40
46
@ Test
41
47
public void testAuthViolationDuringReconnect () throws Exception {
42
- Context ctx = new Context ();
43
- ctx .port = NatsTestServer .nextPort ();
48
+ try (Context ctx = new Context ()) {
49
+ ctx .port = NatsTestServer .nextPort ();
50
+ startServer (ctx );
44
51
45
- startNatsServer (ctx );
46
-
47
- ctx .nc = new MockPausingNatsConnection (buildOptions (ctx ));
48
- ctx .nc .connect (true );
49
- ctx .d = ctx .nc .createDispatcher ();
50
- subscribe (ctx );
52
+ Options options = new Options .Builder ()
53
+ .servers (new String []{"nats://incorrect:1111" , "nats://localhost:" + ctx .port })
54
+ .noRandomize ()
55
+ .token (new char []{'1' , '2' , '3' , '4' })
56
+ .reconnectWait (Duration .ofMillis (2000 ))
57
+ .connectionTimeout (Duration .ofMillis (500 ))
58
+ .errorListener (ctx .errorListener )
59
+ .build ();
60
+
61
+ ctx .nc = new MockPausingNatsConnection (options );
62
+ ctx .nc .connect (true );
63
+ ctx .d = ctx .nc .createDispatcher ();
64
+
65
+ ctx .latch = new CountDownLatch (1 );
66
+ for (int i = 0 ; i < 1_000 ; i ++) {
67
+ String subject = "test_" + i ;
68
+ ctx .subscriptions .add (subject );
69
+ ctx .d .subscribe (subject );
70
+ ctx .unsubThreadPool .execute (() -> {
71
+ try {
72
+ ctx .latch .await ();
73
+ ctx .d .unsubscribe (subject );
74
+ }
75
+ catch (InterruptedException e ) {
76
+ throw new RuntimeException (e );
77
+ }
78
+ });
79
+ }
51
80
52
- ctx .serverRestarter .scheduleWithFixedDelay (() -> restartServer (ctx ), 2000 , 3000 , TimeUnit .MILLISECONDS );
81
+ ctx .serverRestarter .scheduleWithFixedDelay (() -> restartServer (ctx ), 2000 , 3000 , TimeUnit .MILLISECONDS );
53
82
54
- Thread t = new Thread (waitCloseSocket (ctx ));
55
- t .start ();
56
- t .join ();
83
+ Thread t = new Thread (waitCloseSocket (ctx ));
84
+ t .start ();
85
+ t .join ();
57
86
58
- assertFalse (ctx .violated .get ());
59
- ctx . ts . get (). shutdown ();
87
+ assertFalse (ctx .violated .get ());
88
+ }
60
89
}
61
90
62
91
private static Runnable waitCloseSocket (Context ctx ) {
@@ -82,66 +111,20 @@ private static Runnable waitCloseSocket(Context ctx) {
82
111
};
83
112
}
84
113
114
+ private static void startServer (Context ctx ) throws IOException {
115
+ ctx .server .set (new NatsTestServer (new String []{"--auth" , "1234" }, ctx .port , false ));
116
+ }
117
+
85
118
private static void restartServer (Context ctx ) {
86
119
try {
87
120
ctx .restartsLeft .decrementAndGet ();
88
- // System.out.println("Restarting server " + ctx.restartsLeft.get());
89
- ctx .ts .get ().shutdown ();
90
- startNatsServer (ctx );
121
+ ctx .server .get ().shutdown ();
122
+ startServer (ctx );
91
123
} catch (Exception e ) {
92
124
throw new RuntimeException (e );
93
125
}
94
126
}
95
127
96
- private static void startNatsServer (Context ctx ) throws IOException {
97
- ctx .ts .set (new NatsTestServer (new String []{"--auth" , "1234" }, ctx .port , false ));
98
- }
99
-
100
- private static void subscribe (Context ctx ) {
101
- ctx .latch = new CountDownLatch (1 );
102
- for (int i = 0 ; i < 1_000 ; i ++) {
103
- String subject = "test_" + i ;
104
- ctx .subscriptions .add (subject );
105
- ctx .d .subscribe (subject );
106
- ctx .unsubThreadpool .execute (() -> {
107
- try {
108
- ctx .latch .await ();
109
- ctx .d .unsubscribe (subject );
110
- } catch (InterruptedException e ) {
111
- throw new RuntimeException (e );
112
- }
113
- });
114
- }
115
- }
116
-
117
- private static Options buildOptions (Context ctx ) {
118
- Options .Builder natsOptions = new Options .Builder ()
119
- .servers (new String []{"nats://incorrect:1111" , "nats://localhost:" + ctx .port })
120
- .noRandomize ()
121
- .token (new char []{'1' , '2' , '3' , '4' })
122
- .reconnectWait (Duration .ofMillis (2000 ))
123
- .connectionTimeout (Duration .ofMillis (500 ))
124
- .errorListener (ctx .errorListener );
125
-
126
- return natsOptions .build ();
127
- }
128
-
129
- private static class ReconnectedHandler implements ConnectionListener {
130
-
131
- private java .util .function .Consumer <Void > consumer ;
132
-
133
- public void setConsumer (java .util .function .Consumer <Void > consumer ) {
134
- this .consumer = consumer ;
135
- }
136
-
137
- @ Override
138
- public void connectionEvent (Connection conn , Events type ) {
139
- if (type == Events .RECONNECTED ) {
140
- consumer .accept (null );
141
- }
142
- }
143
- }
144
-
145
128
static class MockPausingNatsConnection extends NatsConnection {
146
129
MockPausingNatsConnection (Options options ) {
147
130
super (options );
0 commit comments