Skip to content

Commit b487561

Browse files
committed
dont always get the bytes
1 parent 56767fd commit b487561

File tree

7 files changed

+14
-11
lines changed

7 files changed

+14
-11
lines changed

source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ static byte[] DeflateString(string s)
305305

306306
async Task<T> ReadMessage<T>(MessageSerializer messageSerializer, RewindableBufferStream rewindableBufferStream)
307307
{
308-
return (await messageSerializer.ReadMessageAsync<T>(rewindableBufferStream, CancellationToken)).Message;
308+
return (await messageSerializer.ReadMessageAsync<T>(rewindableBufferStream, false, CancellationToken)).Message;
309309
}
310310

311311
async Task WriteMessage(MessageSerializer messageSerializer, Stream stream, string message)

source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public async Task SendAsync<T>(T message, CancellationToken cancellationToken)
388388
output.AppendLine("--> " + typeof(T).Name);
389389
}
390390

391-
public Task SendAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken)
391+
public Task SendPrePreparedRequestAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken)
392392
{
393393
throw new NotImplementedException();
394394
}

source/Halibut/Transport/Protocol/IMessageExchangeStream.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public interface IMessageExchangeStream
3636

3737
Task SendAsync<T>(T message, CancellationToken cancellationToken);
3838

39-
Task SendAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken);
39+
Task SendPrePreparedRequestAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken);
4040

4141
Task<RequestMessage?> ReceiveRequestAsync(TimeSpan timeoutForReceivingTheFirstByte, CancellationToken cancellationToken);
4242
Task<ResponseMessage?> ReceiveResponseAsync(CancellationToken cancellationToken);

source/Halibut/Transport/Protocol/IMessageSerializer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace Halibut.Transport.Protocol
1010
public interface IMessageSerializer
1111
{
1212
Task<IReadOnlyList<DataStream>> WriteMessageAsync<T>(Stream stream, T message, CancellationToken cancellationToken);
13-
Task<(T Message, IReadOnlyList<DataStream> DataStreams, byte[]? CompressedMessageBytes)> ReadMessageAsync<T>(RewindableBufferStream stream, CancellationToken cancellationToken);
13+
Task<(T Message, IReadOnlyList<DataStream> DataStreams, byte[]? CompressedMessageBytes)> ReadMessageAsync<T>(
14+
RewindableBufferStream stream, bool captureData, CancellationToken cancellationToken);
1415
}
1516
}

source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ async Task<ResponseMessage> SendAndReceiveRequest(RequestMessage nextRequest, Ca
307307

308308
async Task<ResponseBytesAndDataStreams> SendAndReceiveRequest(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken)
309309
{
310-
await stream.SendAsync(preparedRequestMessage, cancellationToken);
310+
await stream.SendPrePreparedRequestAsync(preparedRequestMessage, cancellationToken);
311311
return (await stream.ReceiveResponseBytesAsync(cancellationToken))!;
312312
}
313313

source/Halibut/Transport/Protocol/MessageExchangeStream.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public async Task SendAsync<T>(T message, CancellationToken cancellationToken)
195195
log.Write(EventType.Diagnostic, "Sent: {0}", message);
196196
}
197197

198-
public async Task SendAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken)
198+
public async Task SendPrePreparedRequestAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken)
199199
{
200200
await stream.WriteAsync(preparedRequestMessage.RequestBytes, cancellationToken);
201201
await WriteEachStreamAsync(preparedRequestMessage.DataStreams, cancellationToken);
@@ -224,7 +224,7 @@ await stream.WithReadTimeout(
224224

225225
public async Task<ResponseBytesAndDataStreams?> ReceiveResponseBytesAsync(CancellationToken cancellationToken)
226226
{
227-
var (result, dataStreams, compressedMessageBytes) = await serializer.ReadMessageAsync<ResponseMessage>(stream, cancellationToken);
227+
var (result, dataStreams, compressedMessageBytes) = await serializer.ReadMessageAsync<ResponseMessage>(stream, true, cancellationToken);
228228
await ReadStreamsAsync(dataStreams, cancellationToken);
229229
log.Write(EventType.Diagnostic, "Received: {0}", result); // TODO stop sending the response to logs.
230230
if (compressedMessageBytes == null) return null;
@@ -233,7 +233,7 @@ await stream.WithReadTimeout(
233233

234234
async Task<T?> ReceiveAsync<T>(CancellationToken cancellationToken)
235235
{
236-
var (result, dataStreams, compressedMessageBytes) = await serializer.ReadMessageAsync<T>(stream, cancellationToken);
236+
var (result, dataStreams, compressedMessageBytes) = await serializer.ReadMessageAsync<T>(stream, false, cancellationToken);
237237
await ReadStreamsAsync(dataStreams, cancellationToken);
238238
log.Write(EventType.Diagnostic, "Received: {0}", result); // TODO stop sending the response to logs.
239239
return result;

source/Halibut/Transport/Protocol/MessageSerializer.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,17 @@ public async Task<IReadOnlyList<DataStream>> WriteMessageAsync<T>(Stream stream,
6464
return serializedStreams;
6565
}
6666

67-
public async Task<(T Message, IReadOnlyList<DataStream> DataStreams, byte[]? CompressedMessageBytes)> ReadMessageAsync<T>(RewindableBufferStream stream, CancellationToken cancellationToken)
67+
public async Task<(T Message, IReadOnlyList<DataStream> DataStreams, byte[]? CompressedMessageBytes)> ReadMessageAsync<T>(
68+
RewindableBufferStream stream,
69+
bool captureData,
70+
CancellationToken cancellationToken)
6871
{
6972
await using (var errorRecordingStream = new ErrorRecordingStream(stream, closeInner: false))
7073
{
7174
Exception? exceptionFromDeserialisation = null;
7275
try
7376
{
74-
// TODO don't always set true.
75-
return await ReadCompressedMessageAsync<T>(errorRecordingStream, stream, true, cancellationToken);
77+
return await ReadCompressedMessageAsync<T>(errorRecordingStream, stream, captureData, cancellationToken);
7678
}
7779
catch (Exception e)
7880
{

0 commit comments

Comments
 (0)