@@ -112,8 +112,6 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
112
112
cmd .cancel ();
113
113
}
114
114
115
- protected volatile @ Nonnull ContextualChannel channel = DummyContextualChannelInstances .CHANNEL_CONNECTING ;
116
-
117
115
private final Reliability reliability ;
118
116
119
117
private final ClientOptions clientOptions ;
@@ -123,12 +121,13 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
123
121
private final boolean boundedQueues ;
124
122
125
123
// access via QUEUE_SIZE
126
- @ SuppressWarnings ("unused" )
127
124
private volatile int queueSize = 0 ;
128
125
129
126
// access via STATUS
130
- @ SuppressWarnings ("unused" )
131
127
private volatile int status = ST_OPEN ;
128
+ // access via CHANNEL
129
+
130
+ protected volatile @ Nonnull ContextualChannel channel = DummyContextualChannelInstances .CHANNEL_CONNECTING ;
132
131
133
132
private final Consumer <RedisCommand <?, ?, ?>> callbackOnClose ;
134
133
@@ -152,10 +151,6 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
152
151
153
152
private volatile Throwable connectionError ;
154
153
155
- // // access via QUEUE_SIZE
156
- // @SuppressWarnings("unused")
157
- // private volatile int queueSize = 0;
158
-
159
154
private final String cachedEndpointId ;
160
155
161
156
protected final UnboundedMpscOfferFirstQueue <RedisCommand <?, ?, ?>> taskQueue ;
@@ -308,11 +303,8 @@ public void notifyChannelActive(Channel channel) {
308
303
this .connectionError = null ;
309
304
310
305
if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_CONNECTING , contextualChannel )) {
311
- logger .error ("[unexpected] {} failed to set to CHANNEL_CONNECTING because current state is '{}'" , logPrefix (),
312
- CHANNEL .get (this ));
313
306
channel .close ();
314
- onUnexpectedState ("notifyChannelActive" , ConnectionContext .State .CONNECTING ,
315
- this .channel .getContext ().getInitialState ());
307
+ onUnexpectedState ("notifyChannelActive" , ConnectionContext .State .CONNECTING );
316
308
return ;
317
309
}
318
310
@@ -357,21 +349,13 @@ public void notifyChannelActive(Channel channel) {
357
349
}
358
350
}
359
351
360
- private void onUnexpectedState (String caller , ConnectionContext .State exp , ConnectionContext .State actual ) {
361
- logger .error ("[{}][unexpected] {}: unexpected state: exp '{}' got '{}'" , caller , logPrefix (), exp , actual );
362
- cancelCommands (String .format ("%s: state not match: expect '%s', got '%s'" , caller , exp , actual ));
363
- }
364
-
365
352
@ Override
366
353
public void notifyReconnectFailed (Throwable t ) {
367
354
this .failedToReconnectReason = t ;
368
355
369
356
if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_CONNECTING ,
370
357
DummyContextualChannelInstances .CHANNEL_RECONNECT_FAILED )) {
371
- logger .error ("[unexpected] {} failed to set to CHANNEL_CONNECTING because current state is '{}'" , logPrefix (),
372
- CHANNEL .get (this ));
373
- onUnexpectedState ("notifyReconnectFailed" , ConnectionContext .State .CONNECTING ,
374
- this .channel .getContext ().getInitialState ());
358
+ onUnexpectedState ("notifyReconnectFailed" , ConnectionContext .State .CONNECTING );
375
359
return ;
376
360
}
377
361
@@ -396,13 +380,13 @@ public void notifyChannelInactive(Channel channel) {
396
380
@ Override
397
381
public void notifyChannelInactiveAfterWatchdogDecision (Channel channel ,
398
382
Deque <RedisCommand <?, ?, ?>> retryableQueuedCommands ) {
399
- final ContextualChannel prevChan = this .channel ;
400
- if (!prevChan . getContext (). getInitialState ().isConnected () || prevChan .getDelegate () != channel ) {
383
+ final ContextualChannel inactiveChan = this .channel ;
384
+ if (!inactiveChan . getInitialState ().isConnected () || inactiveChan .getDelegate () != channel ) {
401
385
logger .error ("[unexpected][{}] notifyChannelInactive: channel not match" , logPrefix ());
402
386
return ;
403
387
}
404
388
405
- if (prevChan .getContext ().isChannelInactiveEventFired ()) {
389
+ if (inactiveChan .getContext ().isChannelInactiveEventFired ()) {
406
390
logger .error ("[unexpected][{}] notifyChannelInactive: already fired" , logPrefix ());
407
391
return ;
408
392
}
@@ -431,9 +415,9 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
431
415
if (!willReconnect ) {
432
416
CHANNEL .set (this , DummyContextualChannelInstances .CHANNEL_ENDPOINT_CLOSED );
433
417
}
434
- prevChan .getContext ()
418
+ inactiveChan .getContext ()
435
419
.setCloseStatus (new ConnectionContext .CloseStatus (willReconnect , retryableQueuedCommands , exception ));
436
- trySetEndpointQuiescence (prevChan );
420
+ trySetEndpointQuiescence (inactiveChan );
437
421
}
438
422
439
423
@ Override
@@ -444,7 +428,7 @@ public void notifyException(Throwable t) {
444
428
}
445
429
446
430
final ContextualChannel curr = this .channel ;
447
- if (!curr .getContext (). getInitialState ().isConnected () || !curr .isActive ()) {
431
+ if (!curr .getInitialState ().isConnected () || !curr .isActive ()) {
448
432
connectionError = t ;
449
433
}
450
434
}
@@ -457,7 +441,7 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
457
441
@ Override
458
442
public void flushCommands () {
459
443
final ContextualChannel chan = this .channel ;
460
- switch (chan .getContext (). getInitialState ()) {
444
+ switch (chan .getInitialState ()) {
461
445
case ENDPOINT_CLOSED :
462
446
syncAfterTerminated (() -> {
463
447
if (isClosed ()) {
@@ -486,7 +470,7 @@ public void flushCommands() {
486
470
scheduleSendJobIfNeeded (chan );
487
471
return ;
488
472
default :
489
- throw new IllegalStateException ("unexpected state: " + chan .getContext (). getInitialState ());
473
+ throw new IllegalStateException ("unexpected state: " + chan .getInitialState ());
490
474
}
491
475
}
492
476
@@ -519,8 +503,8 @@ public CompletableFuture<Void> closeAsync() {
519
503
connectionWatchdog .prepareClose ();
520
504
}
521
505
522
- final Channel chan = channel ;
523
- if (channel . getContext () .getInitialState ().isConnected ()) {
506
+ final ContextualChannel chan = channel ;
507
+ if (chan .getInitialState ().isConnected ()) {
524
508
// 1. STATUS.compareAndSet(this, ST_OPEN, ST_CLOSED) synchronize-before channel == CONNECTED
525
509
// 2. channel == CONNECTED synchronize-before setting channel to WILL_RECONNECT/ENDPOINT_CLOSED
526
510
// 3. setting channel to WILL_RECONNECT synchronize-before `isClosed()`, which will cancel all the commands.
@@ -545,7 +529,7 @@ public void disconnect() {
545
529
546
530
ContextualChannel chan = this .channel ;
547
531
548
- if (chan .getContext (). getInitialState ().isConnected () && chan .isOpen ()) {
532
+ if (chan .getInitialState ().isConnected () && chan .isOpen ()) {
549
533
chan .disconnect ();
550
534
}
551
535
}
@@ -561,9 +545,9 @@ public void reset() {
561
545
logger .debug ("{} reset()" , logPrefix ());
562
546
}
563
547
564
- final ContextualChannel curr = channel ;
565
- if (curr . getContext () .getInitialState ().isConnected ()) {
566
- curr .pipeline ().fireUserEventTriggered (new ConnectionEvents .Reset ());
548
+ final ContextualChannel chan = channel ;
549
+ if (chan .getInitialState ().isConnected ()) {
550
+ chan .pipeline ().fireUserEventTriggered (new ConnectionEvents .Reset ());
567
551
}
568
552
// Unsafe to call cancelBufferedCommands() here.
569
553
// cancelBufferedCommands("Reset");
@@ -575,9 +559,9 @@ private void resetInternal() {
575
559
logger .debug ("{} reset()" , logPrefix ());
576
560
}
577
561
578
- ContextualChannel curr = channel ;
579
- if (curr . getContext () .getInitialState ().isConnected ()) {
580
- curr .pipeline ().fireUserEventTriggered (new ConnectionEvents .Reset ());
562
+ ContextualChannel chan = channel ;
563
+ if (chan .getInitialState ().isConnected ()) {
564
+ chan .pipeline ().fireUserEventTriggered (new ConnectionEvents .Reset ());
581
565
}
582
566
// Unsafe to call cancelBufferedCommands() here.
583
567
cancelCommands ("Reset" );
@@ -593,7 +577,7 @@ public void initialState() {
593
577
cancelCommands ("initialState" );
594
578
595
579
ContextualChannel currentChannel = this .channel ;
596
- if (currentChannel .getContext (). getInitialState ().isConnected ()) {
580
+ if (currentChannel .getInitialState ().isConnected ()) {
597
581
ChannelFuture close = currentChannel .close ();
598
582
if (currentChannel .isOpen ()) {
599
583
close .syncUninterruptibly ();
@@ -602,7 +586,7 @@ public void initialState() {
602
586
}
603
587
604
588
private boolean isClosed () {
605
- return STATUS . get ( this ) == ST_CLOSED ;
589
+ return status == ST_CLOSED ;
606
590
}
607
591
608
592
protected String logPrefix () {
@@ -799,16 +783,14 @@ private void trySetEndpointQuiescence(ContextualChannel chan) {
799
783
}
800
784
801
785
private void onEndpointQuiescence () {
802
- if (channel .getContext (). getInitialState () == ConnectionContext .State .ENDPOINT_CLOSED ) {
786
+ if (channel .getInitialState () == ConnectionContext .State .ENDPOINT_CLOSED ) {
803
787
return ;
804
788
}
805
789
806
790
// Create happens-before with channelActive()
807
791
if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_WILL_RECONNECT ,
808
792
DummyContextualChannelInstances .CHANNEL_CONNECTING )) {
809
-
810
- onUnexpectedState ("onEndpointQuiescence" , ConnectionContext .State .WILL_RECONNECT ,
811
- this .channel .getContext ().getInitialState ());
793
+ onUnexpectedState ("onEndpointQuiescence" , ConnectionContext .State .WILL_RECONNECT );
812
794
return ;
813
795
}
814
796
@@ -824,17 +806,18 @@ private void onWillReconnect(@Nonnull final ConnectionContext.CloseStatus closeS
824
806
// Save retryable failed tasks
825
807
logger .info (
826
808
"[onWillReconnect][{}] compensate {} retryableFailedToSendTasks (write failure) for retrying on reconnecting, first write error: {}" ,
827
- retryableFailedToSendTasks . size (), batchFlushEndPointContext . getFirstDiscontinueReason (). getMessage (),
828
- logPrefix ());
809
+ logPrefix (), retryableFailedToSendTasks . size (),
810
+ batchFlushEndPointContext . getFirstDiscontinueReason (). getMessage ());
829
811
offerFirstAll (retryableFailedToSendTasks );
830
812
}
831
813
832
814
LettuceAssert .assertState (reliability != Reliability .AT_MOST_ONCE , "unexpected: reliability is AT_MOST_ONCE" );
833
815
final Deque <RedisCommand <?, ?, ?>> retryablePendingCommands = closeStatus .getAndClearRetryablePendingCommands ();
834
816
if (retryablePendingCommands != null ) {
835
817
// Save uncompletedTasks for later retry.
836
- logger .info ("[onWillReconnect][{}] compensate {} pendingCommands (write success) for retrying on reconnecting" ,
837
- retryablePendingCommands .size (), logPrefix ());
818
+ logger .info (
819
+ "[onWillReconnect][{}] compensate {} retryable pending commands (write success) for retrying on reconnecting" ,
820
+ logPrefix (), retryablePendingCommands .size ());
838
821
offerFirstAll (retryablePendingCommands );
839
822
}
840
823
@@ -961,13 +944,13 @@ private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
961
944
return localConnectionErr ;
962
945
}
963
946
964
- if (boundedQueues && QUEUE_SIZE . get ( this ) + commands > clientOptions .getRequestQueueSize ()) {
947
+ if (boundedQueues && queueSize + commands > clientOptions .getRequestQueueSize ()) {
965
948
return new RedisException ("Request queue size exceeded: " + clientOptions .getRequestQueueSize ()
966
949
+ ". Commands are not accepted until the queue size drops." );
967
950
}
968
951
969
952
final ContextualChannel chan = this .channel ;
970
- switch (chan .getContext (). getInitialState ()) {
953
+ switch (chan .getInitialState ()) {
971
954
case ENDPOINT_CLOSED :
972
955
return new RedisException ("Connection is closed" );
973
956
case RECONNECT_FAILED :
@@ -979,10 +962,16 @@ private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
979
962
case CONNECTED :
980
963
return !chan .isActive () && rejectCommandsWhileDisconnected ? new RedisException ("Connection is closed" ) : null ;
981
964
default :
982
- throw new IllegalStateException ("unexpected state: " + chan .getContext (). getInitialState ());
965
+ throw new IllegalStateException ("unexpected state: " + chan .getInitialState ());
983
966
}
984
967
}
985
968
969
+ private void onUnexpectedState (String caller , ConnectionContext .State exp ) {
970
+ final ConnectionContext .State actual = this .channel .getInitialState ();
971
+ logger .error ("[{}][unexpected] {}: unexpected state: exp '{}' got '{}'" , caller , logPrefix (), exp , actual );
972
+ cancelCommands (String .format ("%s: state not match: expect '%s', got '%s'" , caller , exp , actual ));
973
+ }
974
+
986
975
private void channelFlush (Channel channel ) {
987
976
if (debugEnabled ) {
988
977
logger .debug ("{} write() channelFlush" , logPrefix ());
0 commit comments