diff --git a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
index 9bd61526a..000383c2b 100644
--- a/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
+++ b/src/NServiceBus.Transport.RabbitMQ.CommandLine/NServiceBus.Transport.RabbitMQ.CommandLine.csproj
@@ -15,7 +15,7 @@
-
+
diff --git a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
index 01419652c..5734a4386 100644
--- a/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
+++ b/src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
@@ -21,34 +21,47 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,
public void CreateConnection()
{
- connection = connectionFactory.CreatePublishConnection();
+ connection = connectionFactory.CreatePublishConnection(); // Can take over 5 seconds
connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
- if (e.Initiator != ShutdownInitiator.Application)
+ if (e.Initiator == ShutdownInitiator.Application)
{
- var connection = (IConnection)sender;
-
- // Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
- _ = Task.Run(() => ReconnectSwallowingExceptions(connection.ClientProvidedName), CancellationToken.None);
+ return;
}
+
+ var connectionThatWasShutdown = (IConnection)sender;
+ var connectionName = connectionThatWasShutdown.ClientProvidedName;
+ connectionThatWasShutdown.Dispose();
+
+ // Task.Run() to clarify intent that the call MUST return immediately and not rely on current async call stack behavior
+ _ = Task.Run(() => ReconnectSwallowingExceptions(connectionName, stoppingTokenSource.Token), CancellationToken.None);
}
-#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
- async Task ReconnectSwallowingExceptions(string connectionName)
-#pragma warning restore PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
+ async Task ReconnectSwallowingExceptions(string connectionName, CancellationToken cancellationToken)
{
- while (true)
+ while (!cancellationToken.IsCancellationRequested)
{
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds);
- await Task.Delay(retryDelay).ConfigureAwait(false);
-
try
{
+ await Task.Delay(retryDelay, cancellationToken).ConfigureAwait(false);
CreateConnection();
+
+ // A race condition is possible where CreateConnection is invoked during Dispose
+ // where the returned connection isn't disposed so invoking Dispose to be sure
+ if (cancellationToken.IsCancellationRequested)
+ {
+ connection.Dispose();
+ }
+ break;
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ Logger.InfoFormat("'{0}': Stopped trying to reconnecting to the broker due to shutdown", connectionName);
break;
}
catch (Exception ex)
@@ -86,19 +99,31 @@ public void ReturnPublishChannel(ConfirmsAwareChannel channel)
public void Dispose()
{
+ if (disposed)
+ {
+ return;
+ }
+
+ stoppingTokenSource.Cancel();
+ stoppingTokenSource.Dispose();
+
connection?.Dispose();
foreach (var channel in channels)
{
channel.Dispose();
}
+
+ disposed = true;
}
readonly ConnectionFactory connectionFactory;
readonly TimeSpan retryDelay;
readonly IRoutingTopology routingTopology;
readonly ConcurrentQueue channels;
+ readonly CancellationTokenSource stoppingTokenSource = new();
IConnection connection;
+ bool disposed;
static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));
}