@@ -102,6 +102,7 @@ public async Task<IReadOnlyList<DataStream>> WriteMessageAsync<T>(Stream stream,
102
102
}
103
103
}
104
104
105
+ public static bool DontJsonDeserialise = true ;
105
106
async Task < ( T Message , IReadOnlyList < DataStream > DataStreams , byte [ ] ? messageBytes ) > ReadCompressedMessageAsync < T > ( ErrorRecordingStream errorRecordingStream ,
106
107
IRewindableBuffer rewindableBuffer ,
107
108
bool captureData ,
@@ -138,6 +139,33 @@ public async Task<IReadOnlyList<DataStream>> WriteMessageAsync<T>(Stream stream,
138
139
return ( new MessageEnvelope < T > ( ) . Message , Array . Empty < DataStream > ( ) , null ) ; // And hack around we can't return null
139
140
}
140
141
142
+ if ( copyToMemoryBufferStream != null && DontJsonDeserialise )
143
+ {
144
+ byte [ ] buf = new byte [ 4096 ] ;
145
+ while ( true )
146
+ {
147
+ var read = await deflatedInMemoryStream . ReadAsync ( buf , cancellationToken ) ;
148
+ if ( read == 0 ) break ;
149
+ }
150
+ // Find the unused bytes in the DeflateStream input buffer
151
+ if ( deflateReflector . TryGetAvailableInputBufferSize ( zip , out var unusedBytesCount ) )
152
+ {
153
+ rewindableBuffer . FinishAndRewind ( unusedBytesCount ) ;
154
+ }
155
+ else
156
+ {
157
+ rewindableBuffer . CancelBuffer ( ) ;
158
+ }
159
+
160
+ var compressedMessageSize = compressedByteCountingStream . BytesRead - unusedBytesCount ;
161
+ observer . MessageRead ( compressedMessageSize , decompressedByteCountingStream . BytesRead , deflatedInMemoryStream . BytesReadIntoMemory ) ;
162
+ if ( copyToMemoryBufferStream != null )
163
+ {
164
+ copyToMemoryBufferStream . memoryBuffer . SetLength ( compressedMessageSize ) ;
165
+ return ( new MessageEnvelope < T > ( ) . Message , new List < DataStream > ( ) . ToArray ( ) , copyToMemoryBufferStream . memoryBuffer . ToArray ( ) ) ;
166
+ }
167
+ }
168
+
141
169
using ( var bson = new BsonDataReader ( deflatedInMemoryStream ) { CloseInput = false } )
142
170
{
143
171
var ( messageEnvelope , dataStreams ) = DeserializeMessageAndDataStreams < T > ( bson ) ;
0 commit comments