Skip to content

Commit 344e310

Browse files
Only create a single publish channel (#1620) (#1622)
* Use a single publish channel with atomic swaps * Cosmetics * Few tweaks * Simplify implementation * Remove null-forgiving --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com> Co-authored-by: Brandon Ording <bording@gmail.com>
1 parent e8126f2 commit 344e310

File tree

1 file changed

+46
-32
lines changed

1 file changed

+46
-32
lines changed

src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,15 @@
33
namespace NServiceBus.Transport.RabbitMQ
44
{
55
using System;
6-
using System.Collections.Concurrent;
76
using System.Threading;
87
using System.Threading.Tasks;
98
using global::RabbitMQ.Client;
109
using global::RabbitMQ.Client.Events;
1110
using Logging;
1211

13-
class ChannelProvider : IAsyncDisposable
12+
class ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
13+
: IAsyncDisposable
1414
{
15-
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
16-
{
17-
this.connectionFactory = connectionFactory;
18-
this.retryDelay = retryDelay;
19-
20-
this.routingTopology = routingTopology;
21-
22-
channels = new ConcurrentQueue<ConfirmsAwareChannel>();
23-
}
24-
2515
public async Task Initialize(CancellationToken cancellationToken = default) => connection = await CreateConnectionWithShutdownListener(cancellationToken).ConfigureAwait(false);
2616

2717
async Task<IConnection> CreateConnectionWithShutdownListener(CancellationToken cancellationToken)
@@ -60,7 +50,7 @@ async Task ReconnectSwallowingExceptions(string? connectionName, CancellationTok
6050

6151
var newConnection = await CreateConnectionWithShutdownListener(cancellationToken).ConfigureAwait(false);
6252

63-
// A race condition is possible where CreatePublishConnection is invoked during Dispose
53+
// A race condition is possible where CreatePublishConnection is invoked during Dispose
6454
// where the returned connection isn't disposed so invoking Dispose to be sure
6555
if (cancellationToken.IsCancellationRequested)
6656
{
@@ -94,32 +84,57 @@ protected virtual void FireAndForget(Func<CancellationToken, Task> action, Cance
9484

9585
public async ValueTask<ConfirmsAwareChannel> GetPublishChannel(CancellationToken cancellationToken = default)
9686
{
97-
if (channels.TryDequeue(out var channel) && !channel.IsClosed)
87+
if (publishChannel is { IsOpen: true })
9888
{
99-
return channel;
89+
return publishChannel;
10090
}
10191

102-
if (channel is not null)
92+
try
10393
{
104-
await channel.DisposeAsync()
105-
.ConfigureAwait(false);
106-
}
94+
await publishChannelSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
95+
if (publishChannel is { IsOpen: true })
96+
{
97+
return publishChannel;
98+
}
10799

108-
channel = new ConfirmsAwareChannel(connection, routingTopology);
109-
await channel.Initialize(cancellationToken).ConfigureAwait(false);
100+
var oldChannel = publishChannel;
101+
if (oldChannel is not null)
102+
{
103+
await oldChannel.DisposeAsync().ConfigureAwait(false);
104+
}
110105

111-
return channel;
106+
var newChannel = new ConfirmsAwareChannel(connection, routingTopology);
107+
await newChannel.Initialize(cancellationToken).ConfigureAwait(false);
108+
publishChannel = newChannel;
109+
return newChannel;
110+
}
111+
finally
112+
{
113+
publishChannelSemaphore.Release();
114+
}
112115
}
113116

114-
public ValueTask ReturnPublishChannel(ConfirmsAwareChannel channel, CancellationToken cancellationToken = default)
117+
public async ValueTask ReturnPublishChannel(ConfirmsAwareChannel channel, CancellationToken cancellationToken = default)
115118
{
116119
if (channel.IsOpen)
117120
{
118-
channels.Enqueue(channel);
119-
return ValueTask.CompletedTask;
121+
return;
120122
}
121123

122-
return channel.DisposeAsync();
124+
try
125+
{
126+
await publishChannelSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
127+
128+
if (ReferenceEquals(publishChannel, channel))
129+
{
130+
await channel.DisposeAsync().ConfigureAwait(false);
131+
publishChannel = null;
132+
}
133+
}
134+
finally
135+
{
136+
publishChannelSemaphore.Release();
137+
}
123138
}
124139

125140
#pragma warning disable PS0018
@@ -137,20 +152,19 @@ public async ValueTask DisposeAsync()
137152
var oldConnection = Interlocked.Exchange(ref connection, null);
138153
oldConnection?.Dispose();
139154

140-
foreach (var channel in channels)
155+
var oldChannel = Interlocked.Exchange(ref publishChannel, null);
156+
if (oldChannel is not null)
141157
{
142-
await channel.DisposeAsync().ConfigureAwait(false);
158+
await oldChannel.DisposeAsync().ConfigureAwait(false);
143159
}
144160

145161
disposed = true;
146162
}
147163

148-
readonly ConnectionFactory connectionFactory;
149-
readonly TimeSpan retryDelay;
150-
readonly IRoutingTopology routingTopology;
151-
readonly ConcurrentQueue<ConfirmsAwareChannel> channels;
152164
readonly CancellationTokenSource stoppingTokenSource = new();
153165
volatile IConnection? connection;
166+
readonly SemaphoreSlim publishChannelSemaphore = new(1, 1);
167+
volatile ConfirmsAwareChannel? publishChannel;
154168
bool disposed;
155169

156170
static readonly ILog Logger = LogManager.GetLogger(typeof(ChannelProvider));

0 commit comments

Comments
 (0)