@@ -315,8 +315,7 @@ public void notifyChannelActive(Channel channel) {
315
315
logger .info ("{} Closing channel because endpoint is already closed" , logPrefix ());
316
316
channel .close ();
317
317
318
- onEndpointClosed ();
319
- CHANNEL .set (this , DummyContextualChannelInstances .CHANNEL_ENDPOINT_CLOSED );
318
+ // Cleaning will be done later in notifyChannelInactiveAfterWatchdogDecision, we are happy so far.
320
319
return ;
321
320
}
322
321
@@ -383,12 +382,12 @@ public void notifyChannelInactive(Channel channel) {
383
382
public void notifyChannelInactiveAfterWatchdogDecision (Channel channel ,
384
383
Deque <RedisCommand <?, ?, ?>> retryableQueuedCommands ) {
385
384
final ContextualChannel inactiveChan = this .channel ;
386
- if (!inactiveChan .getInitialState () .isConnected () || inactiveChan .getDelegate () != channel ) {
385
+ if (!inactiveChan .context . initialState .isConnected () || inactiveChan .getDelegate () != channel ) {
387
386
logger .error ("[unexpected][{}] notifyChannelInactive: channel not match" , logPrefix ());
388
387
return ;
389
388
}
390
389
391
- if (inactiveChan .getContext () .isChannelInactiveEventFired ()) {
390
+ if (inactiveChan .context .isChannelInactiveEventFired ()) {
392
391
logger .error ("[unexpected][{}] notifyChannelInactive: already fired" , logPrefix ());
393
392
return ;
394
393
}
@@ -417,7 +416,7 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
417
416
if (!willReconnect ) {
418
417
CHANNEL .set (this , DummyContextualChannelInstances .CHANNEL_ENDPOINT_CLOSED );
419
418
}
420
- inactiveChan .getContext ()
419
+ inactiveChan .context
421
420
.setCloseStatus (new ConnectionContext .CloseStatus (willReconnect , retryableQueuedCommands , exception ));
422
421
trySetEndpointQuiescence (inactiveChan );
423
422
}
@@ -430,7 +429,7 @@ public void notifyException(Throwable t) {
430
429
}
431
430
432
431
final ContextualChannel curr = this .channel ;
433
- if (!curr .getInitialState () .isConnected () || !curr .isActive ()) {
432
+ if (!curr .context . initialState .isConnected () || !curr .isActive ()) {
434
433
connectionError = t ;
435
434
}
436
435
}
@@ -443,16 +442,9 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
443
442
@ Override
444
443
public void flushCommands () {
445
444
final ContextualChannel chan = this .channel ;
446
- switch (chan .getInitialState () ) {
445
+ switch (chan .context . initialState ) {
447
446
case ENDPOINT_CLOSED :
448
- syncAfterTerminated (() -> {
449
- if (isClosed ()) {
450
- onEndpointClosed ();
451
- } else {
452
- fulfillCommands ("Connection is closed" ,
453
- cmd -> cmd .completeExceptionally (new RedisException ("Connection is closed" )));
454
- }
455
- });
447
+ syncAfterTerminated (this ::onEndpointClosed );
456
448
return ;
457
449
case RECONNECT_FAILED :
458
450
syncAfterTerminated (() -> {
@@ -472,7 +464,7 @@ public void flushCommands() {
472
464
scheduleSendJobIfNeeded (chan );
473
465
return ;
474
466
default :
475
- throw new IllegalStateException ("unexpected state: " + chan .getInitialState () );
467
+ throw new IllegalStateException ("unexpected state: " + chan .context . initialState );
476
468
}
477
469
}
478
470
@@ -506,7 +498,7 @@ public CompletableFuture<Void> closeAsync() {
506
498
}
507
499
508
500
final ContextualChannel chan = channel ;
509
- if (chan .getInitialState () .isConnected ()) {
501
+ if (chan .context . initialState .isConnected ()) {
510
502
// 1. STATUS.compareAndSet(this, ST_OPEN, ST_CLOSED) synchronize-before channel == CONNECTED
511
503
// 2. channel == CONNECTED synchronize-before setting channel to WILL_RECONNECT/ENDPOINT_CLOSED
512
504
// 3. setting channel to WILL_RECONNECT synchronize-before `isClosed()`, which will cancel all the commands.
@@ -531,7 +523,7 @@ public void disconnect() {
531
523
532
524
ContextualChannel chan = this .channel ;
533
525
534
- if (chan .getInitialState () .isConnected () && chan .isOpen ()) {
526
+ if (chan .context . initialState .isConnected () && chan .isOpen ()) {
535
527
chan .disconnect ();
536
528
}
537
529
}
@@ -548,7 +540,7 @@ public void reset() {
548
540
}
549
541
550
542
final ContextualChannel chan = channel ;
551
- if (chan .getInitialState () .isConnected ()) {
543
+ if (chan .context . initialState .isConnected ()) {
552
544
chan .pipeline ().fireUserEventTriggered (new ConnectionEvents .Reset ());
553
545
}
554
546
// Unsafe to call cancelBufferedCommands() here.
@@ -562,7 +554,7 @@ private void resetInternal() {
562
554
}
563
555
564
556
ContextualChannel chan = channel ;
565
- if (chan .getInitialState () .isConnected ()) {
557
+ if (chan .context . initialState .isConnected ()) {
566
558
chan .pipeline ().fireUserEventTriggered (new ConnectionEvents .Reset ());
567
559
}
568
560
// Unsafe to call cancelBufferedCommands() here.
@@ -579,7 +571,7 @@ public void initialState() {
579
571
cancelCommands ("initialState" );
580
572
581
573
ContextualChannel currentChannel = this .channel ;
582
- if (currentChannel .getInitialState () .isConnected ()) {
574
+ if (currentChannel .context . initialState .isConnected ()) {
583
575
ChannelFuture close = currentChannel .close ();
584
576
if (currentChannel .isOpen ()) {
585
577
close .syncUninterruptibly ();
@@ -621,7 +613,14 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
621
613
return ;
622
614
}
623
615
624
- if (chan .getContext ().getBatchFlushEndPointContext ().getHasOngoingSendLoop ().tryEnterSafeGetVolatile ()) {
616
+ if (chan .context .batchFlushEndPointContext .hasOngoingSendLoop .tryEnterSafeGetVolatile ()) {
617
+ // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
618
+ // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
619
+ // Avg latency: 3.2956217278663s
620
+ // Avg QPS: 495238.50056392356/s
621
+ // 2. uses eventLoop.execute() directly
622
+ // Avg latency: 3.2677197021496998s
623
+ // Avg QPS: 476925.0751855796/s
625
624
eventLoop .execute (() -> scheduleSendJobInEventLoopIfNeeded (chan ));
626
625
}
627
626
@@ -635,14 +634,14 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
635
634
636
635
private void scheduleSendJobInEventLoopIfNeeded (final ContextualChannel chan ) {
637
636
// Guarantee only 1 send loop.
638
- if (chan .getContext (). getBatchFlushEndPointContext (). getHasOngoingSendLoop () .tryEnterUnsafe ()) {
637
+ if (chan .context . batchFlushEndPointContext . hasOngoingSendLoop .tryEnterUnsafe ()) {
639
638
loopSend (chan );
640
639
}
641
640
}
642
641
643
642
private void loopSend (final ContextualChannel chan ) {
644
- final ConnectionContext connectionContext = chan .getContext () ;
645
- final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .getBatchFlushEndPointContext () ;
643
+ final ConnectionContext connectionContext = chan .context ;
644
+ final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .batchFlushEndPointContext ;
646
645
if (connectionContext .isChannelInactiveEventFired () || batchFlushEndPointContext .hasRetryableFailedToSendTasks ()) {
647
646
return ;
648
647
}
@@ -654,37 +653,36 @@ private void loopSend(final ContextualChannel chan) {
654
653
private void loopSend0 (final BatchFlushEndPointContext batchFlushEndPointContext , final ContextualChannel chan ,
655
654
int remainingSpinnCount , final boolean firstCall ) {
656
655
do {
657
- final int count = pollBatch (batchFlushEndPointContext , batchSize , chan );
656
+ final int count = pollBatch (batchFlushEndPointContext , chan );
658
657
if (count < 0 ) {
659
658
return ;
660
659
}
661
- if (count == 0 || ( firstCall && count < batchSize ) ) {
660
+ if (count < batchSize ) {
662
661
// queue was empty
663
662
break ;
664
663
}
665
664
} while (--remainingSpinnCount > 0 );
666
665
667
666
if (remainingSpinnCount <= 0 ) {
667
+ // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
668
668
chan .eventLoop ().execute (() -> loopSend (chan ));
669
669
return ;
670
670
}
671
671
672
- // QPSPattern is low and we have drained all tasks.
673
672
if (firstCall ) {
674
- // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
675
- batchFlushEndPointContext .getHasOngoingSendLoop () .exitSafe ();
676
- // Guarantee thread-safety: no dangling tasks in the queue.
673
+ // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
674
+ batchFlushEndPointContext .hasOngoingSendLoop .exitSafe ();
675
+ // // Guarantee thread-safety: no dangling tasks in the queue.
677
676
loopSend0 (batchFlushEndPointContext , chan , remainingSpinnCount , false );
678
677
} else {
679
- // In low qps pattern, the send job will be triggered later when a new task is added,
680
- batchFlushEndPointContext .getHasOngoingSendLoop () .exitUnsafe ();
678
+ // The send loop will be triggered later when a new task is added,
679
+ batchFlushEndPointContext .hasOngoingSendLoop .exitUnsafe ();
681
680
}
682
681
}
683
682
684
- private int pollBatch (final BatchFlushEndPointContext batchFlushEndPointContext , final int maxBatchSize ,
685
- ContextualChannel chan ) {
683
+ private int pollBatch (final BatchFlushEndPointContext batchFlushEndPointContext , ContextualChannel chan ) {
686
684
int count = 0 ;
687
- for (; count < maxBatchSize ; count ++) {
685
+ for (; count < batchSize ; count ++) {
688
686
final RedisCommand <?, ?, ?> cmd = this .taskQueue .poll (); // relaxed poll is faster and we wil retry later anyway.
689
687
if (cmd == null ) {
690
688
break ;
@@ -707,17 +705,17 @@ private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext,
707
705
private void trySetEndpointQuiescence (ContextualChannel chan ) {
708
706
LettuceAssert .isTrue (chan .eventLoop ().inEventLoop (), "unexpected: not in event loop" );
709
707
710
- final ConnectionContext connectionContext = chan .getContext () ;
708
+ final ConnectionContext connectionContext = chan .context ;
711
709
final @ Nullable ConnectionContext .CloseStatus closeStatus = connectionContext .getCloseStatus ();
712
- final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .getBatchFlushEndPointContext () ;
710
+ final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .batchFlushEndPointContext ;
713
711
if (batchFlushEndPointContext .isDone () && closeStatus != null ) {
714
712
if (closeStatus .isWillReconnect ()) {
715
713
onWillReconnect (closeStatus , batchFlushEndPointContext );
716
714
} else {
717
715
onWontReconnect (closeStatus , batchFlushEndPointContext );
718
716
}
719
717
720
- if (chan .getContext () .setChannelQuiescentOnce ()) {
718
+ if (chan .context .setChannelQuiescentOnce ()) {
721
719
onEndpointQuiescence ();
722
720
} else {
723
721
ExceptionUtils .maybeFire (logger , canFire , "unexpected: setEndpointQuiescenceOncePerConnection() failed" );
@@ -726,7 +724,7 @@ private void trySetEndpointQuiescence(ContextualChannel chan) {
726
724
}
727
725
728
726
private void onEndpointQuiescence () {
729
- if (channel .getInitialState () == ConnectionContext .State .ENDPOINT_CLOSED ) {
727
+ if (channel .context . initialState == ConnectionContext .State .ENDPOINT_CLOSED ) {
730
728
return ;
731
729
}
732
730
@@ -879,7 +877,7 @@ private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
879
877
}
880
878
881
879
final ContextualChannel chan = this .channel ;
882
- switch (chan .getInitialState () ) {
880
+ switch (chan .context . initialState ) {
883
881
case ENDPOINT_CLOSED :
884
882
return new RedisException ("Connection is closed" );
885
883
case RECONNECT_FAILED :
@@ -891,12 +889,12 @@ private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
891
889
case CONNECTED :
892
890
return !chan .isActive () && rejectCommandsWhileDisconnected ? new RedisException ("Connection is closed" ) : null ;
893
891
default :
894
- throw new IllegalStateException ("unexpected state: " + chan .getInitialState () );
892
+ throw new IllegalStateException ("unexpected state: " + chan .context . initialState );
895
893
}
896
894
}
897
895
898
896
private void onUnexpectedState (String caller , ConnectionContext .State exp ) {
899
- final ConnectionContext .State actual = this .channel .getInitialState () ;
897
+ final ConnectionContext .State actual = this .channel .context . initialState ;
900
898
logger .error ("{}[{}][unexpected] : unexpected state: exp '{}' got '{}'" , logPrefix (), caller , exp , actual );
901
899
cancelCommands (String .format ("%s: state not match: expect '%s', got '%s'" , caller , exp , actual ));
902
900
}
@@ -984,7 +982,7 @@ static WrittenToChannel newInstance(DefaultBatchFlushEndpoint endpoint, Contextu
984
982
985
983
@ Override
986
984
public void operationComplete (Future <Void > future ) {
987
- final BatchFlushEndPointContext batchFlushEndPointContext = chan .getContext (). getBatchFlushEndPointContext () ;
985
+ final BatchFlushEndPointContext batchFlushEndPointContext = chan .context . batchFlushEndPointContext ;
988
986
try {
989
987
QUEUE_SIZE .decrementAndGet (endpoint );
990
988
batchFlushEndPointContext .done (1 );
@@ -1016,10 +1014,10 @@ private Throwable checkSendResult(Future<?> sendFuture, ContextualChannel contex
1016
1014
return null ;
1017
1015
}
1018
1016
1019
- final ConnectionContext .CloseStatus closeStatus = contextualChannel .getContext () .getCloseStatus ();
1017
+ final ConnectionContext .CloseStatus closeStatus = contextualChannel .context .getCloseStatus ();
1020
1018
if (closeStatus != null ) {
1021
1019
logger .warn ("[checkSendResult][interesting][{}] callback called after onClose() event, close status: {}" ,
1022
- endpoint .logPrefix (), contextualChannel .getContext () .getCloseStatus ());
1020
+ endpoint .logPrefix (), contextualChannel .context .getCloseStatus ());
1023
1021
final Throwable err = sendFuture .isSuccess () ? closeStatus .getErr () : sendFuture .cause ();
1024
1022
if (!closeStatus .isWillReconnect () || shouldNotRetry (err , cmd )) {
1025
1023
cmd .completeExceptionally (err );
@@ -1049,7 +1047,7 @@ private boolean shouldNotRetry(Throwable cause, RedisCommand<?, ?, ?> cmd) {
1049
1047
}
1050
1048
1051
1049
private void internalCloseConnectionIfNeeded (ContextualChannel toCloseChan , Throwable reason ) {
1052
- if (toCloseChan .getContext () .isChannelInactiveEventFired () || !toCloseChan .isActive ()) {
1050
+ if (toCloseChan .context .isChannelInactiveEventFired () || !toCloseChan .isActive ()) {
1053
1051
return ;
1054
1052
}
1055
1053
0 commit comments