55
55
import io .lettuce .core .tracing .Tracer ;
56
56
import io .lettuce .core .tracing .Tracing ;
57
57
import io .netty .buffer .ByteBuf ;
58
+ import io .netty .buffer .ByteBufAllocator ;
58
59
import io .netty .channel .Channel ;
59
60
import io .netty .channel .ChannelDuplexHandler ;
60
61
import io .netty .channel .ChannelHandler ;
@@ -93,6 +94,8 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
93
94
94
95
private static final AtomicLong COMMAND_HANDLER_COUNTER = new AtomicLong ();
95
96
97
+ private static final int DEFAULT_READER_BUFFER_CAPACITY = 8192 * 8 ;
98
+
96
99
private final ClientOptions clientOptions ;
97
100
98
101
private final ClientResources clientResources ;
@@ -139,6 +142,10 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
139
142
140
143
private Tracing .Endpoint tracedEndpoint ;
141
144
145
+ private ByteBuf tmpReadBuffer ;
146
+
147
+ private ByteBufAllocator byteBufAllocator ;
148
+
142
149
/**
143
150
* Initialize a new instance that handles commands from the supplied queue.
144
151
*
@@ -222,7 +229,8 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
222
229
223
230
setState (LifecycleState .REGISTERED );
224
231
225
- readBuffer = ctx .alloc ().buffer (8192 * 8 );
232
+ byteBufAllocator = ctx .alloc ();
233
+ readBuffer = ctx .alloc ().buffer (DEFAULT_READER_BUFFER_CAPACITY );
226
234
rsm = new RedisStateMachine ();
227
235
ctx .fireChannelRegistered ();
228
236
}
@@ -614,6 +622,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
614
622
if (traceEnabled ) {
615
623
logger .trace ("{} Buffer: {}" , logPrefix (), input .toString (Charset .defaultCharset ()).trim ());
616
624
}
625
+ // if buffer capacity larger than default capacity, then create a new buffer with double capacity
626
+ // in most cases, key size is less than 64k
627
+ if (readBuffer .capacity () == DEFAULT_READER_BUFFER_CAPACITY && readBuffer .writableBytes () < input .readableBytes ()
628
+ && byteBufAllocator != null ) {
629
+ ByteBuf byteBuf = byteBufAllocator .directBuffer (readBuffer .capacity () << 1 );
630
+ byteBuf .writeBytes (readBuffer );
631
+ tmpReadBuffer = readBuffer ;
632
+ readBuffer = byteBuf ;
633
+ }
617
634
618
635
readBuffer .touch ("CommandHandler.read(…)" );
619
636
readBuffer .writeBytes (input );
@@ -661,6 +678,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
661
678
662
679
ctx .close ();
663
680
throw e ;
681
+ } finally {
682
+ releaseLargeBufferIfNecessary (buffer );
664
683
}
665
684
666
685
hasDecodeProgress = false ;
@@ -704,14 +723,26 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
704
723
complete (command );
705
724
} catch (Exception e ) {
706
725
logger .warn ("{} Unexpected exception during request: {}" , logPrefix , e .toString (), e );
726
+ } finally {
727
+ releaseLargeBufferIfNecessary (buffer );
707
728
}
708
729
}
709
730
}
710
731
afterDecode (ctx , command );
711
732
}
712
733
}
734
+ if (buffer .refCnt () > 0 ) {
735
+ decodeBufferPolicy .afterDecoding (buffer );
736
+ }
737
+ }
713
738
714
- decodeBufferPolicy .afterDecoding (buffer );
739
+ private void releaseLargeBufferIfNecessary (ByteBuf buffer ) {
740
+ if (this .tmpReadBuffer != null ) {
741
+ buffer .release ();
742
+ this .readBuffer = tmpReadBuffer ;
743
+ this .readBuffer .clear ();
744
+ this .tmpReadBuffer = null ;
745
+ }
715
746
}
716
747
717
748
protected void notifyPushListeners (PushMessage notification ) {
@@ -734,7 +765,7 @@ protected void notifyPushListeners(PushMessage notification) {
734
765
* @return
735
766
*/
736
767
protected boolean canDecode (ByteBuf buffer ) {
737
- return buffer .isReadable () && (isMessageDecode () || isPushDecode (buffer ));
768
+ return buffer .refCnt ()> 0 && buffer . isReadable () && (isMessageDecode () || isPushDecode (buffer ));
738
769
}
739
770
740
771
private boolean isPushMessage (ByteBuf buffer ) {
0 commit comments