Skip to content

Commit 9eb5255

Browse files
committed
Improve reliability of channel provider in case of reconnects (#1435)
* Abort loop earlier if possible * Reduce nesting * Swap the connection when ready * Explicit break to reduce nesting * Nullable enable * Move connection related stuff into the connection folder * Adjust the channel provider design slightly to achieve better testability without too much test induced damage * Test various races that can occur during shutdown and reconnection --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent ee8c423 commit 9eb5255

File tree

4 files changed

+234
-19
lines changed

4 files changed

+234
-19
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using global::RabbitMQ.Client;
8+
using global::RabbitMQ.Client.Events;
9+
using NUnit.Framework;
10+
11+
[TestFixture]
12+
public class ChannelProviderTests
13+
{
14+
[Test]
15+
public async Task Should_recover_connection_and_dispose_old_one_when_connection_shutdown()
16+
{
17+
var channelProvider = new TestableChannelProvider();
18+
channelProvider.CreateConnection();
19+
20+
var publishConnection = channelProvider.PublishConnections.Dequeue();
21+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
22+
23+
channelProvider.DelayTaskCompletionSource.SetResult(true);
24+
25+
await channelProvider.FireAndForgetAction(CancellationToken.None);
26+
27+
var recoveredConnection = channelProvider.PublishConnections.Dequeue();
28+
29+
Assert.That(publishConnection.WasDisposed, Is.True);
30+
Assert.That(recoveredConnection.WasDisposed, Is.False);
31+
}
32+
33+
[Test]
34+
public void Should_dispose_connection_when_disposed()
35+
{
36+
var channelProvider = new TestableChannelProvider();
37+
channelProvider.CreateConnection();
38+
39+
var publishConnection = channelProvider.PublishConnections.Dequeue();
40+
channelProvider.Dispose();
41+
42+
Assert.That(publishConnection.WasDisposed, Is.True);
43+
}
44+
45+
[Test]
46+
public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_still_pending()
47+
{
48+
var channelProvider = new TestableChannelProvider();
49+
channelProvider.CreateConnection();
50+
51+
var publishConnection = channelProvider.PublishConnections.Dequeue();
52+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
53+
54+
// Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing
55+
// to simulate a pending delay task
56+
channelProvider.Dispose();
57+
58+
await channelProvider.FireAndForgetAction(CancellationToken.None);
59+
60+
Assert.That(publishConnection.WasDisposed, Is.True);
61+
Assert.That(channelProvider.PublishConnections, Has.Count.Zero);
62+
}
63+
64+
[Test]
65+
public async Task Should_dispose_newly_established_connection()
66+
{
67+
var channelProvider = new TestableChannelProvider();
68+
channelProvider.CreateConnection();
69+
70+
var publishConnection = channelProvider.PublishConnections.Dequeue();
71+
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));
72+
73+
// This simulates the race of the reconnection loop being fired off with the delay task completed during
74+
// the disposal of the channel provider. To achieve that it is necessary to kick off the reconnection loop
75+
// and await its completion after the channel provider has been disposed.
76+
var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None);
77+
channelProvider.DelayTaskCompletionSource.SetResult(true);
78+
channelProvider.Dispose();
79+
80+
await fireAndForgetTask;
81+
82+
var recoveredConnection = channelProvider.PublishConnections.Dequeue();
83+
84+
Assert.That(publishConnection.WasDisposed, Is.True);
85+
Assert.That(recoveredConnection.WasDisposed, Is.True);
86+
}
87+
88+
class TestableChannelProvider : ChannelProvider
89+
{
90+
public TestableChannelProvider() : base(null, TimeSpan.Zero, null)
91+
{
92+
}
93+
94+
public Queue<FakeConnection> PublishConnections { get; } = new Queue<FakeConnection>();
95+
96+
public TaskCompletionSource<bool> DelayTaskCompletionSource { get; } = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
97+
98+
public Func<CancellationToken, Task> FireAndForgetAction { get; private set; }
99+
100+
protected override IConnection CreatePublishConnection()
101+
{
102+
var connection = new FakeConnection();
103+
PublishConnections.Enqueue(connection);
104+
return connection;
105+
}
106+
107+
protected override void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default)
108+
=> FireAndForgetAction = _ => action(cancellationToken);
109+
110+
protected override async Task DelayReconnect(CancellationToken cancellationToken = default)
111+
{
112+
using (var _ = cancellationToken.Register(() => DelayTaskCompletionSource.TrySetCanceled(cancellationToken)))
113+
{
114+
await DelayTaskCompletionSource.Task;
115+
}
116+
}
117+
}
118+
119+
class FakeConnection : IConnection
120+
{
121+
public int LocalPort { get; }
122+
public int RemotePort { get; }
123+
124+
public void Dispose() => WasDisposed = true;
125+
126+
public bool WasDisposed { get; private set; }
127+
128+
public void UpdateSecret(string newSecret, string reason) => throw new NotImplementedException();
129+
130+
public void Abort() => throw new NotImplementedException();
131+
132+
public void Abort(ushort reasonCode, string reasonText) => throw new NotImplementedException();
133+
134+
public void Abort(TimeSpan timeout) => throw new NotImplementedException();
135+
136+
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
137+
138+
public void Close() => throw new NotImplementedException();
139+
140+
public void Close(ushort reasonCode, string reasonText) => throw new NotImplementedException();
141+
142+
public void Close(TimeSpan timeout) => throw new NotImplementedException();
143+
144+
public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();
145+
146+
public IModel CreateModel() => throw new NotImplementedException();
147+
148+
public void HandleConnectionBlocked(string reason) => throw new NotImplementedException();
149+
150+
public void HandleConnectionUnblocked() => throw new NotImplementedException();
151+
152+
public ushort ChannelMax { get; }
153+
public IDictionary<string, object> ClientProperties { get; }
154+
public ShutdownEventArgs CloseReason { get; }
155+
public AmqpTcpEndpoint Endpoint { get; }
156+
public uint FrameMax { get; }
157+
public TimeSpan Heartbeat { get; }
158+
public bool IsOpen { get; }
159+
public AmqpTcpEndpoint[] KnownHosts { get; }
160+
public IProtocol Protocol { get; }
161+
public IDictionary<string, object> ServerProperties { get; }
162+
public IList<ShutdownReportEntry> ShutdownReport { get; }
163+
public string ClientProvidedName { get; } = $"FakeConnection{Interlocked.Increment(ref connectionCounter)}";
164+
public event EventHandler<CallbackExceptionEventArgs> CallbackException = (sender, args) => { };
165+
public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked = (sender, args) => { };
166+
public event EventHandler<ShutdownEventArgs> ConnectionShutdown = (sender, args) => { };
167+
public event EventHandler<EventArgs> ConnectionUnblocked = (sender, args) => { };
168+
169+
public void RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdown?.Invoke(this, args);
170+
171+
static int connectionCounter;
172+
}
173+
}
174+
}
File renamed without changes.
File renamed without changes.

src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#nullable enable
2+
13
namespace NServiceBus.Transport.RabbitMQ
24
{
35
using System;
@@ -7,7 +9,7 @@ namespace NServiceBus.Transport.RabbitMQ
79
using global::RabbitMQ.Client;
810
using Logging;
911

10-
sealed class ChannelProvider : IDisposable
12+
class ChannelProvider : IDisposable
1113
{
1214
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
1315
{
@@ -19,36 +21,56 @@ public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay,
1921
channels = new ConcurrentQueue<ConfirmsAwareChannel>();
2022
}
2123

22-
public void CreateConnection()
24+
public void CreateConnection() => connection = CreateConnectionWithShutdownListener();
25+
26+
protected virtual IConnection CreatePublishConnection() => connectionFactory.CreatePublishConnection();
27+
28+
IConnection CreateConnectionWithShutdownListener()
2329
{
24-
connection = connectionFactory.CreatePublishConnection();
25-
connection.ConnectionShutdown += Connection_ConnectionShutdown;
30+
var newConnection = CreatePublishConnection();
31+
newConnection.ConnectionShutdown += Connection_ConnectionShutdown;
32+
return newConnection;
2633
}
2734

28-
void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
35+
void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e)
2936
{
30-
if (e.Initiator != ShutdownInitiator.Application)
37+
if (e.Initiator == ShutdownInitiator.Application || sender is null)
3138
{
32-
var connection = (IConnection)sender;
33-
34-
// Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
35-
_ = Task.Run(() => ReconnectSwallowingExceptions(connection.ClientProvidedName), CancellationToken.None);
39+
return;
3640
}
41+
42+
var connectionThatWasShutdown = (IConnection)sender;
43+
44+
FireAndForget(cancellationToken => ReconnectSwallowingExceptions(connectionThatWasShutdown.ClientProvidedName, cancellationToken), stoppingTokenSource.Token);
3745
}
3846

39-
#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
40-
async Task ReconnectSwallowingExceptions(string connectionName)
41-
#pragma warning restore PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
47+
async Task ReconnectSwallowingExceptions(string connectionName, CancellationToken cancellationToken)
4248
{
43-
while (true)
49+
while (!cancellationToken.IsCancellationRequested)
4450
{
4551
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds);
4652

47-
await Task.Delay(retryDelay).ConfigureAwait(false);
48-
4953
try
5054
{
51-
CreateConnection();
55+
await DelayReconnect(cancellationToken).ConfigureAwait(false);
56+
57+
var newConnection = CreateConnectionWithShutdownListener();
58+
59+
// A race condition is possible where CreatePublishConnection is invoked during Dispose
60+
// where the returned connection isn't disposed so invoking Dispose to be sure
61+
if (cancellationToken.IsCancellationRequested)
62+
{
63+
newConnection.Dispose();
64+
break;
65+
}
66+
67+
var oldConnection = Interlocked.Exchange(ref connection, newConnection);
68+
oldConnection?.Dispose();
69+
break;
70+
}
71+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
72+
{
73+
Logger.InfoFormat("'{0}': Stopped trying to reconnecting to the broker due to shutdown", connectionName);
5274
break;
5375
}
5476
catch (Exception ex)
@@ -60,6 +82,12 @@ async Task ReconnectSwallowingExceptions(string connectionName)
6082
Logger.InfoFormat("'{0}': Connection to the broker reestablished successfully.", connectionName);
6183
}
6284

85+
protected virtual void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default) =>
86+
// Task.Run() so the call returns immediately instead of waiting for the first await or return down the call stack
87+
_ = Task.Run(() => action(cancellationToken), CancellationToken.None);
88+
89+
protected virtual Task DelayReconnect(CancellationToken cancellationToken = default) => Task.Delay(retryDelay, cancellationToken);
90+
6391
public ConfirmsAwareChannel GetPublishChannel()
6492
{
6593
if (!channels.TryDequeue(out var channel) || channel.IsClosed)
@@ -86,19 +114,32 @@ public void ReturnPublishChannel(ConfirmsAwareChannel channel)
86114

87115
public void Dispose()
88116
{
89-
connection?.Dispose();
117+
if (disposed)
118+
{
119+
return;
120+
}
121+
122+
stoppingTokenSource.Cancel();
123+
stoppingTokenSource.Dispose();
124+
125+
var oldConnection = Interlocked.Exchange(ref connection, null);
126+
oldConnection?.Dispose();
90127

91128
foreach (var channel in channels)
92129
{
93130
channel.Dispose();
94131
}
132+
133+
disposed = true;
95134
}
96135

97136
readonly ConnectionFactory connectionFactory;
98137
readonly TimeSpan retryDelay;
99138
readonly IRoutingTopology routingTopology;
100139
readonly ConcurrentQueue<ConfirmsAwareChannel> channels;
101-
IConnection connection;
140+
readonly CancellationTokenSource stoppingTokenSource = new();
141+
volatile IConnection? connection;
142+
bool disposed;
102143

103144
static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));
104145
}

0 commit comments

Comments
 (0)