Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.0.0" />
<PackageReference Include="NServiceBus" Version="9.1.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>
Expand Down
49 changes: 37 additions & 12 deletions src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,47 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,

public void CreateConnection()
{
connection = connectionFactory.CreatePublishConnection();
connection = connectionFactory.CreatePublishConnection(); // Can take over 5 seconds
connection.ConnectionShutdown += Connection_ConnectionShutdown;
}

void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
if (e.Initiator != ShutdownInitiator.Application)
if (e.Initiator == ShutdownInitiator.Application)
{
var connection = (IConnection)sender;

// Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
_ = Task.Run(() => ReconnectSwallowingExceptions(connection.ClientProvidedName), CancellationToken.None);
return;
}

var connectionThatWasShutdown = (IConnection)sender;
var connectionName = connectionThatWasShutdown.ClientProvidedName;
connectionThatWasShutdown.Dispose();

// Task.Run() to clarify intent that the call MUST return immediately and not rely on current async call stack behavior
_ = Task.Run(() => ReconnectSwallowingExceptions(connectionName, stoppingTokenSource.Token), CancellationToken.None);
}

#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
async Task ReconnectSwallowingExceptions(string connectionName)
#pragma warning restore PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
async Task ReconnectSwallowingExceptions(string connectionName, CancellationToken cancellationToken)
{
while (true)
while (!cancellationToken.IsCancellationRequested)
{
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds);

await Task.Delay(retryDelay).ConfigureAwait(false);

try
{
await Task.Delay(retryDelay, cancellationToken).ConfigureAwait(false);
CreateConnection();

// A race condition is possible where CreateConnection is invoked during Dispose
// where the returned connection isn't disposed so invoking Dispose to be sure
if (cancellationToken.IsCancellationRequested)
{
connection.Dispose();
}
break;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Logger.InfoFormat("'{0}': Stopped trying to reconnecting to the broker due to shutdown", connectionName);
break;
}
catch (Exception ex)
Expand Down Expand Up @@ -86,19 +99,31 @@ public void ReturnPublishChannel(ConfirmsAwareChannel channel)

public void Dispose()
{
if (disposed)
{
return;
}

stoppingTokenSource.Cancel();
stoppingTokenSource.Dispose();

connection?.Dispose();

foreach (var channel in channels)
{
channel.Dispose();
}

disposed = true;
}

readonly ConnectionFactory connectionFactory;
readonly TimeSpan retryDelay;
readonly IRoutingTopology routingTopology;
readonly ConcurrentQueue<ConfirmsAwareChannel> channels;
readonly CancellationTokenSource stoppingTokenSource = new();
IConnection connection;
bool disposed;

static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));
}
Expand Down