Skip to content

Commit dd05a74

Browse files
authored
Batch ingestion not correctly handling OperationCancelledException which can cause the ingestion to never used incoming context tasks and hang. (#4792)
Backport of #4780
1 parent e14a3a8 commit dd05a74

File tree

30 files changed

+206
-141
lines changed

30 files changed

+206
-141
lines changed

src/Particular.LicensingComponent/AuditThroughput/AuditThroughputCollectorHostedService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
4040
}
4141
} while (await timer.WaitForNextTickAsync(cancellationToken));
4242
}
43-
catch (OperationCanceledException)
43+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
4444
{
4545
logger.LogInformation($"Stopping {nameof(AuditThroughputCollectorHostedService)}");
4646
}

src/Particular.LicensingComponent/BrokerThroughput/BrokerThroughputCollectorHostedService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static ReadOnlyDictionary<string, string> LoadBrokerSettingValues(IEnumerable<Ke
5353
}
5454
} while (await timer.WaitForNextTickAsync(stoppingToken));
5555
}
56-
catch (OperationCanceledException)
56+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
5757
{
5858
logger.LogInformation($"Stopping {nameof(BrokerThroughputCollectorHostedService)}");
5959
}

src/ServiceControl.AcceptanceTesting/ScenarioWithEndpointBehaviorExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ public override async Task Stop(CancellationToken cancellationToken = default)
139139
{
140140
await checkTask;
141141
}
142-
catch (OperationCanceledException)
142+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
143143
{
144-
//Swallow
144+
// Even though we are stopping, ONLY swallow when OCE from callee to not hide any ungraceful stop errors
145145
}
146146
finally
147147
{

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 75 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
using ServiceControl.Infrastructure.Metrics;
1818
using Transports;
1919

20-
class AuditIngestion : IHostedService
20+
class AuditIngestion : BackgroundService
2121
{
2222
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
2323

@@ -59,23 +59,15 @@ public AuditIngestion(
5959

6060
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);
6161

62-
watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);
63-
64-
ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
65-
}
66-
67-
public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
68-
69-
public async Task StopAsync(CancellationToken cancellationToken)
70-
{
71-
await watchdog.Stop();
72-
channel.Writer.Complete();
73-
await ingestionWorker;
74-
75-
if (transportInfrastructure != null)
76-
{
77-
await transportInfrastructure.Shutdown(cancellationToken);
78-
}
62+
watchdog = new Watchdog(
63+
"audit message ingestion",
64+
EnsureStarted,
65+
EnsureStopped,
66+
ingestionState.ReportError,
67+
ingestionState.Clear,
68+
settings.TimeToRestartAuditIngestionAfterFailure,
69+
logger
70+
);
7971
}
8072

8173
Task OnCriticalError(string failure, Exception exception)
@@ -139,7 +131,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
139131
}
140132
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
141133
{
142-
// ignored
134+
logger.Info("StopReceive cancelled");
143135
}
144136
}
145137

@@ -176,7 +168,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
176168
}
177169
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
178170
{
179-
// ignored
171+
logger.Info("StopReceive cancelled");
180172
}
181173
finally
182174
{
@@ -202,51 +194,86 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
202194
await taskCompletionSource.Task;
203195
}
204196

205-
async Task Loop()
197+
public override async Task StartAsync(CancellationToken cancellationToken)
206198
{
207-
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
199+
await watchdog.Start(() => applicationLifetime.StopApplication());
200+
await base.StartAsync(cancellationToken);
201+
}
208202

209-
while (await channel.Reader.WaitToReadAsync())
203+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
204+
{
205+
try
210206
{
211-
// will only enter here if there is something to read.
212-
try
207+
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
208+
209+
while (await channel.Reader.WaitToReadAsync(stoppingToken))
213210
{
214-
// as long as there is something to read this will fetch up to MaximumConcurrency items
215-
while (channel.Reader.TryRead(out var context))
211+
// will only enter here if there is something to read.
212+
try
216213
{
217-
contexts.Add(context);
214+
// as long as there is something to read this will fetch up to MaximumConcurrency items
215+
while (channel.Reader.TryRead(out var context))
216+
{
217+
contexts.Add(context);
218+
}
219+
220+
batchSizeMeter.Mark(contexts.Count);
221+
using (batchDurationMeter.Measure())
222+
{
223+
await auditIngestor.Ingest(contexts);
224+
}
218225
}
226+
catch (Exception e)
227+
{
228+
// signal all message handling tasks to terminate
229+
foreach (var context in contexts)
230+
{
231+
_ = context.GetTaskCompletionSource().TrySetException(e);
232+
}
233+
234+
if (e is OperationCanceledException && stoppingToken.IsCancellationRequested)
235+
{
236+
logger.Info("Batch cancelled", e);
237+
break;
238+
}
219239

220-
batchSizeMeter.Mark(contexts.Count);
221-
using (batchDurationMeter.Measure())
240+
logger.Info("Ingesting messages failed", e);
241+
}
242+
finally
222243
{
223-
await auditIngestor.Ingest(contexts);
244+
contexts.Clear();
224245
}
225246
}
226-
catch (OperationCanceledException)
227-
{
228-
//Do nothing as we are shutting down
229-
continue;
230-
}
231-
catch (Exception e) // show must go on
247+
// will fall out here when writer is completed
248+
}
249+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
250+
{
251+
// ExecuteAsync cancelled
252+
}
253+
}
254+
255+
public override async Task StopAsync(CancellationToken cancellationToken)
256+
{
257+
try
258+
{
259+
await watchdog.Stop();
260+
channel.Writer.Complete();
261+
await base.StopAsync(cancellationToken);
262+
}
263+
finally
264+
{
265+
if (transportInfrastructure != null)
232266
{
233-
if (logger.IsInfoEnabled)
267+
try
234268
{
235-
logger.Info("Ingesting messages failed", e);
269+
await transportInfrastructure.Shutdown(cancellationToken);
236270
}
237-
238-
// signal all message handling tasks to terminate
239-
foreach (var context in contexts)
271+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
240272
{
241-
context.GetTaskCompletionSource().TrySetException(e);
273+
logger.Info("Shutdown cancelled", e);
242274
}
243275
}
244-
finally
245-
{
246-
contexts.Clear();
247-
}
248276
}
249-
// will fall out here when writer is completed
250277
}
251278

252279
TransportInfrastructure transportInfrastructure;
@@ -265,7 +292,6 @@ async Task Loop()
265292
readonly Meter batchDurationMeter;
266293
readonly Counter receivedMeter;
267294
readonly Watchdog watchdog;
268-
readonly Task ingestionWorker;
269295
readonly IHostApplicationLifetime applicationLifetime;
270296

271297
static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();

src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ await failedAuditStore.ProcessFailedMessages(
4848
Logger.Debug($"Successfully re-imported failed audit message {transportMessage.Id}.");
4949
}
5050
}
51-
catch (OperationCanceledException)
51+
catch (OperationCanceledException e) when (token.IsCancellationRequested)
5252
{
53-
// no-op
53+
Logger.Info("Cancelled", e);
5454
}
5555
catch (Exception e)
5656
{

src/ServiceControl.Audit/Infrastructure/Hosting/Commands/ImportFailedAuditsCommand.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
using Microsoft.Extensions.DependencyInjection;
88
using Microsoft.Extensions.Hosting;
99
using NServiceBus;
10+
using NServiceBus.Logging;
1011
using Settings;
1112

1213
class ImportFailedAuditsCommand : AbstractCommand
1314
{
15+
readonly ILog logger = LogManager.GetLogger<ImportFailedAuditsCommand>();
16+
1417
public override async Task Execute(HostArguments args, Settings settings)
1518
{
1619
settings.IngestAuditMessages = false;
@@ -37,9 +40,9 @@ public override async Task Execute(HostArguments args, Settings settings)
3740
{
3841
await importer.Run(tokenSource.Token);
3942
}
40-
catch (OperationCanceledException)
43+
catch (OperationCanceledException e) when (tokenSource.IsCancellationRequested)
4144
{
42-
// no op
45+
logger.Info("Cancelled", e);
4346
}
4447
finally
4548
{

src/ServiceControl.Infrastructure.Metrics/MetricsReporter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void Start()
3232
await Task.Delay(interval, tokenSource.Token).ConfigureAwait(false);
3333
}
3434
}
35-
catch (OperationCanceledException)
35+
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
3636
{
3737
//no-op
3838
}

src/ServiceControl.Infrastructure/AsyncTimer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ public TimerJob(Func<CancellationToken, Task<TimerJobExecutionResult>> callback,
4040

4141
//Otherwise execute immediately
4242
}
43-
catch (OperationCanceledException)
43+
catch (OperationCanceledException) when (token.IsCancellationRequested)
4444
{
45-
// no-op
45+
break;
4646
}
4747
catch (Exception ex)
4848
{
4949
errorCallback(ex);
5050
}
5151
}
5252
}
53-
catch (OperationCanceledException)
53+
catch (OperationCanceledException) when (token.IsCancellationRequested)
5454
{
5555
// no-op
5656
}
@@ -64,7 +64,7 @@ public async Task Stop()
6464
return;
6565
}
6666

67-
tokenSource.Cancel();
67+
await tokenSource.CancelAsync().ConfigureAwait(false);
6868
tokenSource.Dispose();
6969

7070
if (task != null)
@@ -73,7 +73,7 @@ public async Task Stop()
7373
{
7474
await task.ConfigureAwait(false);
7575
}
76-
catch (OperationCanceledException)
76+
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
7777
{
7878
//NOOP
7979
}

src/ServiceControl.Infrastructure/ReadOnlyStream.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio
5151

5252
return Task.CompletedTask;
5353
}
54-
catch (OperationCanceledException e)
54+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
5555
{
5656
return Task.FromCanceled(e.CancellationToken);
5757
}
@@ -113,7 +113,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
113113

114114
return Task.FromResult(result);
115115
}
116-
catch (OperationCanceledException e)
116+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
117117
{
118118
return Task.FromCanceled<int>(e.CancellationToken);
119119
}
@@ -136,7 +136,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
136136

137137
return new ValueTask<int>(result);
138138
}
139-
catch (OperationCanceledException e)
139+
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
140140
{
141141
return new ValueTask<int>(Task.FromCanceled<int>(e.CancellationToken));
142142
}

src/ServiceControl.Infrastructure/Watchdog.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ public Task Start(Action onFailedOnStartup)
5454

5555
failedOnStartup ??= false;
5656
}
57-
catch (OperationCanceledException)
57+
catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested)
5858
{
59-
//Do not Delay
59+
// Continue, as OCE is not from caller
60+
log.Info("Start cancelled, retrying...", e);
6061
continue;
6162
}
6263
catch (Exception e)
@@ -81,9 +82,9 @@ public Task Start(Action onFailedOnStartup)
8182
{
8283
await Task.Delay(timeToWaitBetweenStartupAttempts, shutdownTokenSource.Token).ConfigureAwait(false);
8384
}
84-
catch (OperationCanceledException)
85+
catch (OperationCanceledException) when (shutdownTokenSource.IsCancellationRequested)
8586
{
86-
//Ignore
87+
//Ignore, no need to log cancellation of delay
8788
}
8889
}
8990
try

0 commit comments

Comments
 (0)