Skip to content

Commit bf969d1

Browse files
committed
Delay without exception is cheaper
1 parent a224240 commit bf969d1

17 files changed

+138
-43
lines changed

source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,8 @@ public async Task WhenUsingTheRedisQueue_ASimpleEchoServiceCanBeCalled(ClientAnd
971971
await echo.CountBytesAsync(DataStream.FromString("hello"));
972972
}
973973
}
974+
975+
Logger.Fatal("YYYYYY\n\n\nYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYTotal {S}\n\n\n\n", MessageSerialiserAndDataStreamStorage.totalDecompress);
974976
}
975977

976978
[Test]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System;
16+
using System.Threading;
17+
using System.Threading.Tasks;
18+
using Halibut.Util;
19+
using NUnit.Framework;
20+
21+
namespace Halibut.Tests.Util
22+
{
23+
public class DelayWithoutExceptionTest : BaseTest
24+
{
25+
[Test]
26+
public async Task DelayWithoutException_ShouldNotThrow()
27+
{
28+
using var cts = new CancellationTokenSource();
29+
cts.CancelAfter(10);
30+
await DelayWithoutException.Delay(TimeSpan.FromDays(1), cts.Token);
31+
}
32+
}
33+
}

source/Halibut/Queue/QueueMessageSerializer.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@ public QueueMessageSerializer(Func<StreamCapturingJsonSerializer> createStreamCa
4343
stream = WrapInMessageSerialisationStreams(messageStreamWrappers, stream, wrappedStreamDisposables);
4444

4545
// TODO instead store
46-
using (var zip = new DeflateStream(stream, CompressionMode.Compress, true)) {
47-
using (var jsonTextWriter = new BsonDataWriter(zip) { CloseOutput = false })
46+
47+
using (var zip = new DeflateStream(stream, CompressionMode.Compress, true))
48+
using (var buf = new BufferedStream(zip))
49+
{
50+
51+
using (var jsonTextWriter = new BsonDataWriter(buf) { CloseOutput = false })
4852
{
4953
var streamCapturingSerializer = createStreamCapturingSerializer();
5054
streamCapturingSerializer.Serializer.Serialize(jsonTextWriter, new MessageEnvelope<T>(message));
@@ -188,7 +192,8 @@ public async Task<byte[]> PrepareBytesFromWire(byte[] responseBytes)
188192
stream = WrapStreamInMessageDeserialisationStreams(messageStreamWrappers, stream, disposables);
189193

190194
using var deflateStream = new DeflateStream(stream, CompressionMode.Decompress, true);
191-
using (var bson = new BsonDataReader(deflateStream) { CloseInput = false })
195+
using var buf = new BufferedStream(deflateStream);
196+
using (var bson = new BsonDataReader(buf) { CloseInput = false })
192197
{
193198
var streamCapturingSerializer = createStreamCapturingSerializer();
194199
var result = streamCapturingSerializer.Serializer.Deserialize<MessageEnvelope<T>>(bson);

source/Halibut/Queue/QueuedDataStreams/HeartBeatDrivenDataStreamProgressReporter.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class HeartBeatDrivenDataStreamProgressReporter : IAsyncDisposable, IGetN
1616
{
1717
readonly ImmutableDictionary<Guid, IDataStreamWithFileUploadProgress> dataStreamsToReportProgressOn;
1818

19-
readonly ConcurrentBag<Guid> completedDataStreams = new();
19+
readonly HashSet<Guid> completedDataStreams = new();
2020

2121
HeartBeatDrivenDataStreamProgressReporter(ImmutableDictionary<Guid, IDataStreamWithFileUploadProgress> dataStreamsToReportProgressOn)
2222
{
@@ -30,7 +30,11 @@ public async Task HeartBeatReceived(HeartBeatMessage heartBeatMessage, Cancellat
3030

3131
foreach (var keyValuePair in heartBeatMessage.DataStreamProgress)
3232
{
33-
if(completedDataStreams.Contains(keyValuePair.Key)) continue;
33+
lock (completedDataStreams)
34+
{
35+
if(completedDataStreams.Contains(keyValuePair.Key)) continue;
36+
}
37+
3438

3539
if (dataStreamsToReportProgressOn.TryGetValue(keyValuePair.Key, out var dataStreamWithTransferProgress))
3640
{
@@ -39,7 +43,10 @@ public async Task HeartBeatReceived(HeartBeatMessage heartBeatMessage, Cancellat
3943

4044
if (dataStreamWithTransferProgress.Length == keyValuePair.Value)
4145
{
42-
completedDataStreams.Add(keyValuePair.Key);
46+
lock (completedDataStreams)
47+
{
48+
completedDataStreams.Add(keyValuePair.Key);
49+
}
4350
}
4451
}
4552
}
@@ -57,13 +64,21 @@ public async ValueTask DisposeAsync()
5764
// this object is disposable and on dispose we note that file will no longer be uploading. Which
5865
// for the normal percentage based file transfer progress will result in marking the DataStreams as 100% uploaded.
5966
// If we don't do this at the end of a successful call we may find DataStream progress is reported as less than 100%.
67+
var localCopyCompletedDataStreams = new List<Guid>();
68+
69+
// Because of where this is used, it is hard to be sure this object won't be used while disposing,
70+
// so take a copy of streams we have already completed.
71+
lock (completedDataStreams)
72+
{
73+
localCopyCompletedDataStreams.AddRange(completedDataStreams);
74+
}
6075
foreach (var keyValuePair in dataStreamsToReportProgressOn)
6176
{
62-
if (!completedDataStreams.Contains(keyValuePair.Key))
77+
if (!localCopyCompletedDataStreams.Contains(keyValuePair.Key))
6378
{
6479
var progress = keyValuePair.Value.DataStreamTransferProgress;
80+
// Thus may be called twice if HeartBeats are received while disposing.
6581
await progress.NoLongerUploading(CancellationToken.None);
66-
completedDataStreams.Add(keyValuePair.Key);
6782
}
6883
}
6984
}
Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4+
using Halibut.Util;
45

56
namespace Halibut.Queue.Redis.Cancellation
67
{
@@ -15,14 +16,15 @@ public DelayBeforeSubscribingToRequestCancellation(TimeSpan delay)
1516

1617
public async Task WaitBeforeHeartBeatSendingOrReceiving(CancellationToken cancellationToken)
1718
{
18-
try
19-
{
20-
await Task.Delay(Delay, cancellationToken);
21-
}
22-
catch
23-
{
24-
// If only Delay had an option to not throw.
25-
}
19+
await DelayWithoutException.Delay(Delay, cancellationToken);
20+
// try
21+
// {
22+
// await DelayWithoutException.Delay(Delay, cancellationToken);
23+
// }
24+
// catch
25+
// {
26+
// // If only Delay had an option to not throw.
27+
// }
2628
}
2729
}
2830
}

source/Halibut/Queue/Redis/Cancellation/WatchForRequestCancellation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async Task WatchForCancellation(
6262
// Also poll to see if the request is cancelled since we can miss the publication.
6363
while (!token.IsCancellationRequested)
6464
{
65-
await Try.IgnoringError(async () => await Task.Delay(TimeSpan.FromSeconds(60), token));
65+
await DelayWithoutException.Delay(TimeSpan.FromSeconds(60), token);
6666

6767
if(token.IsCancellationRequested) return;
6868

source/Halibut/Queue/Redis/MessageStorage/MessageSerialiserAndDataStreamStorage.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.IO;
45
using System.Linq;
56
using System.Threading;
@@ -70,8 +71,17 @@ public MessageSerialiserAndDataStreamStorage(QueueMessageSerializer queueMessage
7071
this.storeDataStreamsForDistributedQueues = storeDataStreamsForDistributedQueues;
7172
}
7273

74+
// public static long total = 0;
7375
public async Task<(RedisStoredMessage, HeartBeatDrivenDataStreamProgressReporter)> PrepareRequest(RequestMessage request, CancellationToken cancellationToken)
7476
{
77+
// var sw = Stopwatch.StartNew();
78+
// for (int i = 0; i < 1000; i++)
79+
// {
80+
// await queueMessageSerializer.PrepareMessageForWireTransferAndForQueue(request);
81+
// }
82+
// sw.Stop();
83+
// Interlocked.Add(ref total, (long) sw.Elapsed.TotalMilliseconds);
84+
7585
var (jsonRequestMessage, dataStreams) = await queueMessageSerializer.PrepareMessageForWireTransferAndForQueue(request);
7686
SwitchDataStreamsToNotReportProgress(dataStreams);
7787
var dataStreamProgressReporter = HeartBeatDrivenDataStreamProgressReporter.CreateForDataStreams(dataStreams);
@@ -118,11 +128,6 @@ static void SwitchDataStreamsToNotReportProgress(IReadOnlyList<DataStream> dataS
118128
public async Task<RedisStoredMessage> PrepareResponseForStorageInRedis(Guid activityId, ResponseBytesAndDataStreams response, CancellationToken cancellationToken)
119129
{
120130
var responseBytesToStoreInRedis = await queueMessageSerializer.PrepareBytesFromWire(response.ResponseBytes);
121-
122-
if ("hello".Length == 0)
123-
{
124-
await queueMessageSerializer.ConvertStoredResponseToResponseMessage<ResponseMessage>(responseBytesToStoreInRedis);
125-
}
126131

127132

128133
var dataStreamMetadata = await storeDataStreamsForDistributedQueues.StoreDataStreams(response.DataStreams, cancellationToken);
@@ -134,9 +139,18 @@ public async Task<RedisStoredMessage> PrepareResponseForStorageInRedis(Guid acti
134139

135140
return new RedisStoredMessage(responseBytesToStoreInRedis, dataStreamMetadata);
136141
}
137-
142+
143+
public static long totalDecompress = 0;
138144
public async Task<ResponseMessage> ReadResponseFromRedisStoredMessage(RedisStoredMessage storedMessage, CancellationToken cancellationToken)
139145
{
146+
// var sw = Stopwatch.StartNew();
147+
// for (int i = 0; i < 1000; i++)
148+
// {
149+
// await queueMessageSerializer.ConvertStoredResponseToResponseMessage<ResponseMessage>(storedMessage.Message);
150+
// }
151+
// sw.Stop();
152+
// Interlocked.Add(ref totalDecompress, (long) sw.Elapsed.TotalMilliseconds);
153+
140154
var (response, dataStreams) = await queueMessageSerializer.ConvertStoredResponseToResponseMessage<ResponseMessage>(storedMessage.Message);
141155

142156
var rehydratableDataStreams = BuildUpRehydratableDataStreams(dataStreams, out _);

source/Halibut/Queue/Redis/NodeHeartBeat/HeartBeatInitialDelay.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4+
using Halibut.Util;
45

56
namespace Halibut.Queue.Redis.NodeHeartBeat
67
{
@@ -17,7 +18,7 @@ public async Task WaitBeforeHeartBeatSendingOrReceiving(CancellationToken cancel
1718
{
1819
try
1920
{
20-
await Task.Delay(InitialDelay, cancellationToken);
21+
await DelayWithoutException.Delay(InitialDelay, cancellationToken);
2122
}
2223
catch
2324
{

source/Halibut/Queue/Redis/NodeHeartBeat/NodeHeartBeatSender.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async Task SendPulsesWhileProcessingRequest(
7474
log.WriteException(EventType.Diagnostic, "Failed to send heartbeat for {0} node, request {1}, switching to panic mode with {2} second intervals", ex, nodeSendingPulsesType, requestActivityId, delayBetweenPulse.TotalSeconds);
7575
}
7676

77-
await Try.IgnoringError(async () => await Task.Delay(delayBetweenPulse, cancellationToken));
77+
await DelayWithoutException.Delay(delayBetweenPulse, cancellationToken);
7878
}
7979

8080
log.Write(EventType.Diagnostic, "Heartbeat pulse loop ended for {0} node, request {1}", nodeSendingPulsesType, requestActivityId);

source/Halibut/Queue/Redis/NodeHeartBeat/NodeHeartBeatWatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ static async Task<NodeWatcherResult> WatchForPulsesFromNode(Uri endpoint,
136136
TimeSpan.FromSeconds(30),
137137
maxTimeBetweenHeartBeetsBeforeNodeIsAssumedToBeOffline - timeSinceLastHeartBeat + TimeSpan.FromSeconds(1));
138138

139-
await Try.IgnoringError(async () => await Task.Delay(timeToWait, watchCancellationToken));
139+
await DelayWithoutException.Delay(timeToWait, watchCancellationToken);
140140
}
141141

142142
log.Write(EventType.Diagnostic, "{0} node watcher cancelled, request {1}", watchingForPulsesFrom, requestActivityId);

0 commit comments

Comments
 (0)