53
53
import io .netty .channel .ChannelFuture ;
54
54
import io .netty .channel .EventLoop ;
55
55
import io .netty .handler .codec .EncoderException ;
56
+ import io .netty .util .Recycler ;
56
57
import io .netty .util .concurrent .Future ;
58
+ import io .netty .util .concurrent .GenericFutureListener ;
57
59
import io .netty .util .internal .logging .InternalLogger ;
58
60
import io .netty .util .internal .logging .InternalLoggerFactory ;
59
61
@@ -619,7 +621,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
619
621
return ;
620
622
}
621
623
622
- if (chan .getContext ().getFairEndPointContext ().getHasOngoingSendLoop ().tryEnterSafeGetVolatile ()) {
624
+ if (chan .getContext ().getBatchFlushEndPointContext ().getHasOngoingSendLoop ().tryEnterSafeGetVolatile ()) {
623
625
eventLoop .execute (() -> scheduleSendJobInEventLoopIfNeeded (chan ));
624
626
}
625
627
@@ -633,14 +635,14 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
633
635
634
636
private void scheduleSendJobInEventLoopIfNeeded (final ContextualChannel chan ) {
635
637
// Guarantee only 1 send loop.
636
- if (chan .getContext ().getFairEndPointContext ().getHasOngoingSendLoop ().tryEnterUnsafe ()) {
638
+ if (chan .getContext ().getBatchFlushEndPointContext ().getHasOngoingSendLoop ().tryEnterUnsafe ()) {
637
639
loopSend (chan );
638
640
}
639
641
}
640
642
641
643
private void loopSend (final ContextualChannel chan ) {
642
644
final ConnectionContext connectionContext = chan .getContext ();
643
- final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .getFairEndPointContext ();
645
+ final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .getBatchFlushEndPointContext ();
644
646
if (connectionContext .isChannelInactiveEventFired () || batchFlushEndPointContext .hasRetryableFailedToSendTasks ()) {
645
647
return ;
646
648
}
@@ -687,18 +689,7 @@ private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext,
687
689
if (cmd == null ) {
688
690
break ;
689
691
}
690
- channelWrite (chan , cmd ).addListener (future -> {
691
- QUEUE_SIZE .decrementAndGet (this );
692
- batchFlushEndPointContext .done (1 );
693
-
694
- final Throwable retryableErr = checkSendResult (future , chan , cmd );
695
- if (retryableErr != null && batchFlushEndPointContext .addRetryableFailedToSendTask (cmd , retryableErr )) {
696
- // Close connection on first transient write failure
697
- internalCloseConnectionIfNeeded (chan , retryableErr );
698
- }
699
-
700
- trySetEndpointQuiescence (chan );
701
- });
692
+ channelWrite (chan , cmd ).addListener (WrittenToChannel .newInstance (this , chan , cmd ));
702
693
}
703
694
704
695
if (count > 0 ) {
@@ -713,60 +704,12 @@ private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext,
713
704
return count ;
714
705
}
715
706
716
- /**
717
- * Check write result.
718
- *
719
- * @param sendFuture The future to check.
720
- * @param contextualChannel The channel instance associated with the future.
721
- * @param cmd The task.
722
- * @return The cause of the failure if is a retryable failed task, otherwise null.
723
- */
724
- private Throwable checkSendResult (Future <?> sendFuture , ContextualChannel contextualChannel , RedisCommand <?, ?, ?> cmd ) {
725
- if (cmd .isDone ()) {
726
- ExceptionUtils .logUnexpectedDone (logger , logPrefix (), cmd );
727
- return null ;
728
- }
729
-
730
- final ConnectionContext .CloseStatus closeStatus = contextualChannel .getContext ().getCloseStatus ();
731
- if (closeStatus != null ) {
732
- logger .warn ("[checkSendResult][interesting][{}] callback called after onClose() event, close status: {}" ,
733
- logPrefix (), contextualChannel .getContext ().getCloseStatus ());
734
- final Throwable err = sendFuture .isSuccess () ? closeStatus .getErr () : sendFuture .cause ();
735
- if (!closeStatus .isWillReconnect () || shouldNotRetry (err , cmd )) {
736
- cmd .completeExceptionally (err );
737
- return null ;
738
- } else {
739
- return err ;
740
- }
741
- }
742
-
743
- if (sendFuture .isSuccess ()) {
744
- return null ;
745
- }
746
-
747
- final Throwable cause = sendFuture .cause ();
748
- ExceptionUtils .maybeLogSendError (logger , cause );
749
- if (shouldNotRetry (cause , cmd )) {
750
- cmd .completeExceptionally (cause );
751
- return null ;
752
- }
753
-
754
- return cause ;
755
- }
756
-
757
- private boolean shouldNotRetry (Throwable cause , RedisCommand <?, ?, ?> cmd ) {
758
- return reliability == Reliability .AT_MOST_ONCE || ActivationCommand .isActivationCommand (cmd )
759
- || ExceptionUtils .oneOf (cause , SHOULD_NOT_RETRY_EXCEPTION_TYPES );
760
- }
761
-
762
707
private void trySetEndpointQuiescence (ContextualChannel chan ) {
763
- final EventLoop chanEventLoop = chan .eventLoop ();
764
- LettuceAssert .isTrue (chanEventLoop .inEventLoop (), "unexpected: not in event loop" );
765
- LettuceAssert .isTrue (chanEventLoop == lastEventLoop , "unexpected: event loop not match" );
708
+ LettuceAssert .isTrue (chan .eventLoop ().inEventLoop (), "unexpected: not in event loop" );
766
709
767
710
final ConnectionContext connectionContext = chan .getContext ();
768
711
final @ Nullable ConnectionContext .CloseStatus closeStatus = connectionContext .getCloseStatus ();
769
- final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .getFairEndPointContext ();
712
+ final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .getBatchFlushEndPointContext ();
770
713
if (batchFlushEndPointContext .isDone () && closeStatus != null ) {
771
714
if (closeStatus .isWillReconnect ()) {
772
715
onWillReconnect (closeStatus , batchFlushEndPointContext );
@@ -849,20 +792,6 @@ private void offerFirstAll(Deque<RedisCommand<?, ?, ?>> commands) {
849
792
this .taskQueue .offerFirstAll (commands );
850
793
}
851
794
852
- private void internalCloseConnectionIfNeeded (ContextualChannel toCloseChan , Throwable reason ) {
853
- if (toCloseChan .getContext ().isChannelInactiveEventFired () || !toCloseChan .isActive ()) {
854
- return ;
855
- }
856
-
857
- logger .error ("[internalCloseConnectionIfNeeded][attention][{}] close the connection due to write error, reason: '{}'" ,
858
- logPrefix (), reason .getMessage (), reason );
859
- toCloseChan .eventLoop ().schedule (() -> {
860
- if (toCloseChan .isActive ()) {
861
- toCloseChan .close ();
862
- }
863
- }, 1 , TimeUnit .SECONDS );
864
- }
865
-
866
795
private void cancelCommands (String message ) {
867
796
fulfillCommands (message , RedisCommand ::cancel );
868
797
}
@@ -904,7 +833,7 @@ private final void fulfillCommands(String message, Consumer<RedisCommand<?, ?, ?
904
833
}
905
834
906
835
if (totalCancelledTaskNum > 0 ) {
907
- logger .error ("cancel {} pending tasks, reason: '{}'" , totalCancelledTaskNum , message );
836
+ logger .error ("{} cancel {} pending tasks, reason: '{}'" , logPrefix () , totalCancelledTaskNum , message );
908
837
}
909
838
}
910
839
@@ -968,7 +897,7 @@ private Throwable validateWrite(@SuppressWarnings("unused") int commands) {
968
897
969
898
private void onUnexpectedState (String caller , ConnectionContext .State exp ) {
970
899
final ConnectionContext .State actual = this .channel .getInitialState ();
971
- logger .error ("[{}][unexpected] {} : unexpected state: exp '{}' got '{}'" , caller , logPrefix (), exp , actual );
900
+ logger .error ("{} [{}][unexpected] : unexpected state: exp '{}' got '{}'" , logPrefix (), caller , exp , actual );
972
901
cancelCommands (String .format ("%s: state not match: expect '%s', got '%s'" , caller , exp , actual ));
973
902
}
974
903
@@ -1010,4 +939,138 @@ private enum Reliability {
1010
939
AT_MOST_ONCE , AT_LEAST_ONCE
1011
940
}
1012
941
942
+ /**
943
+ * Add to stack listener. This listener is pooled and must be {@link #recycle() recycled after usage}.
944
+ */
945
+ static class WrittenToChannel implements GenericFutureListener <Future <Void >> {
946
+
947
+ private static final Recycler <WrittenToChannel > RECYCLER = new Recycler <WrittenToChannel >() {
948
+
949
+ @ Override
950
+ protected WrittenToChannel newObject (Recycler .Handle <WrittenToChannel > handle ) {
951
+ return new WrittenToChannel (handle );
952
+ }
953
+
954
+ };
955
+
956
+ private final Recycler .Handle <WrittenToChannel > handle ;
957
+
958
+ private DefaultBatchFlushEndpoint endpoint ;
959
+
960
+ private RedisCommand <?, ?, ?> command ;
961
+
962
+ private ContextualChannel chan ;
963
+
964
+ private WrittenToChannel (Recycler .Handle <WrittenToChannel > handle ) {
965
+ this .handle = handle ;
966
+ }
967
+
968
+ /**
969
+ * Allocate a new instance.
970
+ *
971
+ * @return new instance
972
+ */
973
+ static WrittenToChannel newInstance (DefaultBatchFlushEndpoint endpoint , ContextualChannel chan ,
974
+ RedisCommand <?, ?, ?> command ) {
975
+
976
+ WrittenToChannel entry = RECYCLER .get ();
977
+
978
+ entry .endpoint = endpoint ;
979
+ entry .chan = chan ;
980
+ entry .command = command ;
981
+
982
+ return entry ;
983
+ }
984
+
985
+ @ Override
986
+ public void operationComplete (Future <Void > future ) {
987
+ final BatchFlushEndPointContext batchFlushEndPointContext = chan .getContext ().getBatchFlushEndPointContext ();
988
+ try {
989
+ QUEUE_SIZE .decrementAndGet (endpoint );
990
+ batchFlushEndPointContext .done (1 );
991
+
992
+ final Throwable retryableErr = checkSendResult (future , chan , command );
993
+ if (retryableErr != null && batchFlushEndPointContext .addRetryableFailedToSendTask (command , retryableErr )) {
994
+ // Close connection on first transient write failure
995
+ internalCloseConnectionIfNeeded (chan , retryableErr );
996
+ }
997
+
998
+ endpoint .trySetEndpointQuiescence (chan );
999
+ } finally {
1000
+ recycle ();
1001
+ }
1002
+ }
1003
+
1004
+ /**
1005
+ * Check write result.
1006
+ *
1007
+ * @param sendFuture The future to check.
1008
+ * @param contextualChannel The channel instance associated with the future.
1009
+ * @param cmd The task.
1010
+ * @return The cause of the failure if is a retryable failed task, otherwise null.
1011
+ */
1012
+ private Throwable checkSendResult (Future <?> sendFuture , ContextualChannel contextualChannel ,
1013
+ RedisCommand <?, ?, ?> cmd ) {
1014
+ if (cmd .isDone ()) {
1015
+ ExceptionUtils .logUnexpectedDone (logger , endpoint .logPrefix (), cmd );
1016
+ return null ;
1017
+ }
1018
+
1019
+ final ConnectionContext .CloseStatus closeStatus = contextualChannel .getContext ().getCloseStatus ();
1020
+ if (closeStatus != null ) {
1021
+ logger .warn ("[checkSendResult][interesting][{}] callback called after onClose() event, close status: {}" ,
1022
+ endpoint .logPrefix (), contextualChannel .getContext ().getCloseStatus ());
1023
+ final Throwable err = sendFuture .isSuccess () ? closeStatus .getErr () : sendFuture .cause ();
1024
+ if (!closeStatus .isWillReconnect () || shouldNotRetry (err , cmd )) {
1025
+ cmd .completeExceptionally (err );
1026
+ return null ;
1027
+ } else {
1028
+ return err ;
1029
+ }
1030
+ }
1031
+
1032
+ if (sendFuture .isSuccess ()) {
1033
+ return null ;
1034
+ }
1035
+
1036
+ final Throwable cause = sendFuture .cause ();
1037
+ ExceptionUtils .maybeLogSendError (logger , cause );
1038
+ if (shouldNotRetry (cause , cmd )) {
1039
+ cmd .completeExceptionally (cause );
1040
+ return null ;
1041
+ }
1042
+
1043
+ return cause ;
1044
+ }
1045
+
1046
+ private boolean shouldNotRetry (Throwable cause , RedisCommand <?, ?, ?> cmd ) {
1047
+ return endpoint .reliability == Reliability .AT_MOST_ONCE || ActivationCommand .isActivationCommand (cmd )
1048
+ || ExceptionUtils .oneOf (cause , SHOULD_NOT_RETRY_EXCEPTION_TYPES );
1049
+ }
1050
+
1051
+ private void internalCloseConnectionIfNeeded (ContextualChannel toCloseChan , Throwable reason ) {
1052
+ if (toCloseChan .getContext ().isChannelInactiveEventFired () || !toCloseChan .isActive ()) {
1053
+ return ;
1054
+ }
1055
+
1056
+ logger .error (
1057
+ "[internalCloseConnectionIfNeeded][interesting][{}] close the connection due to write error, reason: '{}'" ,
1058
+ endpoint .logPrefix (), reason .getMessage (), reason );
1059
+ toCloseChan .eventLoop ().schedule (() -> {
1060
+ if (toCloseChan .isActive ()) {
1061
+ toCloseChan .close ();
1062
+ }
1063
+ }, 1 , TimeUnit .SECONDS );
1064
+ }
1065
+
1066
+ private void recycle () {
1067
+ this .endpoint = null ;
1068
+ this .chan = null ;
1069
+ this .command = null ;
1070
+
1071
+ handle .recycle (this );
1072
+ }
1073
+
1074
+ }
1075
+
1013
1076
}
0 commit comments