diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj index 9bd61526a..000383c2b 100644 --- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj +++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs index 01419652c..5734a4386 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs @@ -21,34 +21,47 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, public void CreateConnection() { - connection = connectionFactory.CreatePublishConnection(); + connection = connectionFactory.CreatePublishConnection(); // Can take over 5 seconds connection.ConnectionShutdown += Connection_ConnectionShutdown; } void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { - if (e.Initiator != ShutdownInitiator.Application) + if (e.Initiator == ShutdownInitiator.Application) { - var connection = (IConnection)sender; - - // Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack - _ = Task.Run(() => ReconnectSwallowingExceptions(connection.ClientProvidedName), CancellationToken.None); + return; } + + var connectionThatWasShutdown = (IConnection)sender; + var connectionName = connectionThatWasShutdown.ClientProvidedName; + connectionThatWasShutdown.Dispose(); + + // Task.Run() to clarify intent that the call MUST return immediately and not rely on current async call stack behavior + _ = Task.Run(() => ReconnectSwallowingExceptions(connectionName, stoppingTokenSource.Token), CancellationToken.None); } -#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext - async Task ReconnectSwallowingExceptions(string connectionName) -#pragma warning restore PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext + async Task ReconnectSwallowingExceptions(string connectionName, CancellationToken cancellationToken) { - while (true) + while (!cancellationToken.IsCancellationRequested) { Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds); - await Task.Delay(retryDelay).ConfigureAwait(false); - try { + await Task.Delay(retryDelay, cancellationToken).ConfigureAwait(false); CreateConnection(); + + // A race condition is possible where CreateConnection is invoked during Dispose + // where the returned connection isn't disposed so invoking Dispose to be sure + if (cancellationToken.IsCancellationRequested) + { + connection.Dispose(); + } + break; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + Logger.InfoFormat("'{0}': Stopped trying to reconnecting to the broker due to shutdown", connectionName); break; } catch (Exception ex) @@ -86,19 +99,31 @@ public void ReturnPublishChannel(ConfirmsAwareChannel channel) public void Dispose() { + if (disposed) + { + return; + } + + stoppingTokenSource.Cancel(); + stoppingTokenSource.Dispose(); + connection?.Dispose(); foreach (var channel in channels) { channel.Dispose(); } + + disposed = true; } readonly ConnectionFactory connectionFactory; readonly TimeSpan retryDelay; readonly IRoutingTopology routingTopology; readonly ConcurrentQueue channels; + readonly CancellationTokenSource stoppingTokenSource = new(); IConnection connection; + bool disposed; static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider)); }