@@ -133,6 +133,8 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
133133
134134 protected volatile @ Nonnull ContextualChannel channel = DummyContextualChannelInstances .CHANNEL_CONNECTING ;
135135
136+ private volatile Throwable failedToReconnectReason ;
137+
136138 private final Consumer <RedisCommand <?, ?, ?>> callbackOnClose ;
137139
138140 private final boolean rejectCommandsWhileDisconnected ;
@@ -153,19 +155,17 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
153155
154156 private ConnectionFacade connectionFacade ;
155157
156- private volatile Throwable connectionError ;
157-
158158 private final String cachedEndpointId ;
159159
160160 protected final UnboundedOfferFirstQueue <Object > taskQueue ;
161161
162162 private final boolean canFire ;
163163
164- private volatile boolean inProtectMode ;
164+ private volatile EventLoop lastEventLoop = null ;
165165
166- private volatile Throwable failedToReconnectReason ;
166+ private volatile Throwable connectionError ;
167167
168- private volatile EventLoop lastEventLoop = null ;
168+ private volatile boolean inProtectMode ;
169169
170170 private final int writeSpinCount ;
171171
@@ -316,17 +316,16 @@ private <V, K> void writeAndFlushActivationCommands(ContextualChannel chan,
316316 @ Override
317317 public void notifyChannelActive (Channel channel ) {
318318 final ContextualChannel contextualChannel = new ContextualChannel (channel , ConnectionContext .State .CONNECTED );
319-
320- this .logPrefix = null ;
321- this .connectionError = null ;
322-
323319 if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_CONNECTING , contextualChannel )) {
324320 channel .close ();
325321 onUnexpectedState ("notifyChannelActive" , ConnectionContext .State .CONNECTING );
326322 return ;
327323 }
328324
329- lastEventLoop = channel .eventLoop ();
325+ this .lastEventLoop = channel .eventLoop ();
326+ this .connectionError = null ;
327+ this .inProtectMode = false ;
328+ this .logPrefix = null ;
330329
331330 // Created a synchronize-before with set channel to CHANNEL_CONNECTING,
332331 if (isClosed ()) {
@@ -398,7 +397,7 @@ public void notifyChannelInactive(Channel channel) {
398397
399398 @ Override
400399 public void notifyChannelInactiveAfterWatchdogDecision (Channel channel ,
401- Deque <RedisCommand <?, ?, ?>> retryableQueuedCommands ) {
400+ Deque <RedisCommand <?, ?, ?>> retryablePendingCommands ) {
402401 final ContextualChannel inactiveChan = this .channel ;
403402 if (!inactiveChan .context .initialState .isConnected ()) {
404403 logger .error ("[unexpected][{}] notifyChannelInactive: channel initial state not connected" , logPrefix ());
@@ -446,7 +445,7 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
446445 CHANNEL .set (this , DummyContextualChannelInstances .CHANNEL_ENDPOINT_CLOSED );
447446 }
448447 inactiveChan .context
449- .setCloseStatus (new ConnectionContext .CloseStatus (willReconnect , retryableQueuedCommands , exception ));
448+ .setCloseStatus (new ConnectionContext .CloseStatus (willReconnect , retryablePendingCommands , exception ));
450449 trySetEndpointQuiescence (inactiveChan );
451450 }
452451
@@ -945,11 +944,11 @@ private <K, V, T> RedisCommand<K, V, T> processActivationCommand(RedisCommand<K,
945944
946945 private Throwable validateWrite (ContextualChannel chan , int commands , boolean isActivationCommand ) {
947946 if (isClosed ()) {
948- return new RedisException ("Connection is closed" );
947+ return new RedisException ("Endpoint is closed" );
949948 }
950949
951950 final Throwable localConnectionErr = connectionError ;
952- if (localConnectionErr != null /* different logic of DefaultEndpoint */ ) {
951+ if (localConnectionErr != null /* attention: different logic of DefaultEndpoint */ ) {
953952 return localConnectionErr ;
954953 }
955954
@@ -961,18 +960,19 @@ private Throwable validateWrite(ContextualChannel chan, int commands, boolean is
961960
962961 final ConnectionContext .State initialState = chan .context .initialState ;
963962 final boolean rejectCommandsWhileDisconnectedLocal = this .rejectCommandsWhileDisconnected || isActivationCommand ;
963+ final String rejectDesc = isActivationCommand ? "isActivationCommand" : "rejectCommandsWhileDisconnected" ;
964964 switch (initialState ) {
965965 case ENDPOINT_CLOSED :
966966 return new RedisException ("Connection is closed" );
967967 case RECONNECT_FAILED :
968- return failedToReconnectReason ;
968+ return getFailedToReconnectReason () ;
969969 case WILL_RECONNECT :
970970 case CONNECTING :
971- return rejectCommandsWhileDisconnectedLocal
972- ? new RedisException ("Currently not connected. Commands are rejected." )
971+ return rejectCommandsWhileDisconnectedLocal ? new RedisException ("Currently not connected and " + rejectDesc )
973972 : null ;
974973 case CONNECTED :
975- return !chan .isActive () && rejectCommandsWhileDisconnectedLocal ? new RedisException ("Channel is closed" )
974+ return !chan .isActive () && rejectCommandsWhileDisconnectedLocal
975+ ? new RedisException ("Channel is inactive and " + rejectDesc )
976976 : null ;
977977 default :
978978 throw new IllegalStateException ("unexpected state: " + initialState );
0 commit comments