Skip to content

Commit a743831

Browse files
danielmarbachbordingpaulomorgadolukebakken
authored
Prepare for RabbitMQ.Client v7.0.0 (#1446)
* Wip upgrade * More changes * More changes * Fix warnings * Remove our now unneeded publisher confirms handling * Use CatchAsync * Add awaits to tests * Add missing await * ConfirmsAwareChannel init for now * Wire up unregister event for consumer cancellation * Headers can be null when nothing was sent * Add a done condition (still not an entirely perfect test but we'll go from there) * Cleanup test a bit * Update approval file * Don't cancel the connection close * ToHashSet Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com> * Cleanup test * Switch regex approach * Async continuation * Forward cancellation token with exception handling as a spike * Update to 7.0.0-rc.9 This version restores the parameter order from 6.x for `BasicPublishAsync` * Cleanup * More cleanup * Add more info to test exception message * Add more logging * Simplify * Return channel after all tasks are completed now that send is async * Put return in finally block * Try adding our own tracking back * Revert "Try adding our own tracking back" This reverts commit 99bc699. * WaitForConfirmsOrDieAsync for confirms * Upgrade to RC11 * Try adding our own tracking back * Comment * Synchronize for now the sequence number access around publish (deliberate wider scope for safety as a trial) * Move NextPublishSeqNo acquisition inside semaphore * Remove TODOs that are not necessary For example channel operations should not be parallelized. * Non-generic task completion source * readonly * It seems we have to pass the non-readonly version * UnsafeRegister * Bump version to 10.0 --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com> Co-authored-by: Brandon Ording <bording@gmail.com> Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com> Co-authored-by: Luke Bakken <luke@bakken.io>
1 parent 6e1b6fd commit a743831

File tree

52 files changed

+839
-780
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+839
-780
lines changed

src/Custom.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project>
22

33
<PropertyGroup>
4-
<MinVerMinimumMajorMinor>9.0</MinVerMinimumMajorMinor>
4+
<MinVerMinimumMajorMinor>10.0</MinVerMinimumMajorMinor>
55
<MinVerAutoIncrement>minor</MinVerAutoIncrement>
66
</PropertyGroup>
77

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
22
{
33
using System.Collections.Generic;
4+
using System.Threading.Tasks;
45
using global::RabbitMQ.Client;
56

67
static class ChannelExtensions
78
{
8-
public static void DeclareQuorumQueue(this IModel channel, string queueName)
9+
public static Task<QueueDeclareOk> DeclareQuorumQueue(this IChannel channel, string queueName)
910
{
10-
channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
11+
return channel.QueueDeclareAsync(queueName, true, false, false, new Dictionary<string, object>
1112
{
1213
{ "x-queue-type", "quorum" }
1314
});
1415
}
1516

16-
public static void DeclareClassicQueue(this IModel channel, string queueName)
17+
public static Task<QueueDeclareOk> DeclareClassicQueue(this IChannel channel, string queueName)
1718
{
18-
channel.QueueDeclare(queueName, true, false, false);
19+
return channel.QueueDeclareAsync(queueName, true, false, false);
1920
}
2021
}
2122
}

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/ConfigureEndpointRabbitMQTransport.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,28 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
2727
return Task.CompletedTask;
2828
}
2929

30-
public Task Cleanup()
30+
public async Task Cleanup()
3131
{
32-
PurgeQueues();
33-
34-
return Task.CompletedTask;
32+
await PurgeQueues();
3533
}
3634

37-
void PurgeQueues()
35+
async Task PurgeQueues()
3836
{
3937
if (transport == null)
4038
{
4139
return;
4240
}
4341

44-
var queues = transport.QueuesToCleanup.Distinct().ToArray();
42+
var queues = transport.QueuesToCleanup.ToHashSet();
4543

46-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection("Test Queue Purger"))
47-
using (var channel = connection.CreateModel())
44+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync("Test Queue Purger"))
45+
using (var channel = await connection.CreateChannelAsync())
4846
{
4947
foreach (var queue in queues)
5048
{
5149
try
5250
{
53-
channel.QueuePurge(queue);
51+
await channel.QueuePurgeAsync(queue);
5452
}
5553
catch (Exception ex)
5654
{

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/NServiceBus.Transport.RabbitMQ.AcceptanceTests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<ItemGroup>
2222
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
2323
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
24-
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
24+
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.11" />
2525
</ItemGroup>
2626

2727
<ItemGroup>

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_error_queue.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
22
{
33
using System;
4+
using System.Threading.Tasks;
45
using AcceptanceTesting;
56
using NServiceBus.AcceptanceTests;
67
using NServiceBus.AcceptanceTests.EndpointTemplates;
@@ -9,12 +10,12 @@
910
public class When_classic_endpoint_uses_quorum_error_queue : NServiceBusAcceptanceTest
1011
{
1112
[Test]
12-
public void Should_fail_to_start()
13+
public async Task Should_fail_to_start()
1314
{
14-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
15-
using (var channel = connection.CreateModel())
15+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
16+
using (var channel = await connection.CreateChannelAsync())
1617
{
17-
channel.DeclareQuorumQueue("rabbitmq.transport.tests.quorum-error");
18+
await channel.DeclareQuorumQueue("rabbitmq.transport.tests.quorum-error");
1819
}
1920

2021
var exception = Assert.CatchAsync<Exception>(async () => await Scenario.Define<ScenarioContext>()

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_classic_endpoint_uses_quorum_queue.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
22
{
33
using System;
4+
using System.Threading.Tasks;
45
using AcceptanceTesting;
56
using AcceptanceTesting.Customization;
67
using NServiceBus.AcceptanceTests;
@@ -10,12 +11,12 @@
1011
public class When_classic_endpoint_uses_quorum_queue : NServiceBusAcceptanceTest
1112
{
1213
[Test]
13-
public void Should_fail_to_start()
14+
public async Task Should_fail_to_start()
1415
{
15-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
16-
using (var channel = connection.CreateModel())
16+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
17+
using (var channel = await connection.CreateChannelAsync())
1718
{
18-
channel.DeclareQuorumQueue(Conventions.EndpointNamingConvention(typeof(ClassicQueueEndpoint)));
19+
await channel.DeclareQuorumQueue(Conventions.EndpointNamingConvention(typeof(ClassicQueueEndpoint)));
1920
}
2021

2122
var exception = Assert.CatchAsync<Exception>(async () => await Scenario.Define<ScenarioContext>()

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_error_queue.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
22
{
33
using System;
4+
using System.Threading.Tasks;
45
using AcceptanceTesting;
56
using NServiceBus.AcceptanceTests;
67
using NUnit.Framework;
78

89
public class When_quorum_endpoint_uses_classic_error_queue : NServiceBusAcceptanceTest
910
{
1011
[Test]
11-
public void Should_fail_to_start()
12+
public async Task Should_fail_to_start()
1213
{
13-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
14-
using (var channel = connection.CreateModel())
14+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
15+
using (var channel = await connection.CreateChannelAsync())
1516
{
16-
channel.DeclareClassicQueue("rabbitmq.transport.tests.classic-error");
17+
await channel.DeclareClassicQueue("rabbitmq.transport.tests.classic-error");
1718
}
1819

1920

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_quorum_endpoint_uses_classic_queue.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
22
{
33
using System;
4+
using System.Threading.Tasks;
45
using AcceptanceTesting;
56
using AcceptanceTesting.Customization;
67
using NServiceBus.AcceptanceTests;
@@ -9,12 +10,12 @@
910
public class When_quorum_endpoint_uses_classic_queue : NServiceBusAcceptanceTest
1011
{
1112
[Test]
12-
public void Should_fail_to_start()
13+
public async Task Should_fail_to_start()
1314
{
14-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
15-
using (var channel = connection.CreateModel())
15+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
16+
using (var channel = await connection.CreateChannelAsync())
1617
{
17-
channel.DeclareClassicQueue(Conventions.EndpointNamingConvention(typeof(QuorumQueueEndpoint)));
18+
await channel.DeclareClassicQueue(Conventions.EndpointNamingConvention(typeof(QuorumQueueEndpoint)));
1819
}
1920

2021
var exception = Assert.CatchAsync<Exception>(async () => await Scenario.Define<ScenarioContext>()

src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/QuorumQueues/When_starting_endpoint_using_quorum_queues.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ public async Task Should_create_receiving_queues_as_quorum_queues()
1515
{
1616
var endpointInputQueue = Conventions.EndpointNamingConvention(typeof(QuorumQueueEndpoint));
1717

18-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
19-
using (var channel = connection.CreateModel())
18+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
19+
using (var channel = await connection.CreateChannelAsync())
2020
{
21-
channel.QueueDelete(endpointInputQueue, false, false);
22-
channel.QueueDelete(endpointInputQueue + "-disc", false, false);
23-
channel.QueueDelete("QuorumQueueSatelliteReceiver", false, false);
21+
await channel.QueueDeleteAsync(endpointInputQueue, false, false);
22+
await channel.QueueDeleteAsync(endpointInputQueue + "-disc", false, false);
23+
await channel.QueueDeleteAsync("QuorumQueueSatelliteReceiver", false, false);
2424
}
2525

2626
await Scenario.Define<ScenarioContext>()
@@ -29,23 +29,23 @@ await Scenario.Define<ScenarioContext>()
2929
.Run();
3030

3131
// try to declare the same queue as a non-quorum queue, which should fail:
32-
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
32+
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
3333
{
34-
using (var channel = connection.CreateModel())
34+
using (var channel = await connection.CreateChannelAsync())
3535
{
36-
var mainQueueException = Assert.Catch<RabbitMQClientException>(() => channel.DeclareClassicQueue(endpointInputQueue));
36+
var mainQueueException = Assert.CatchAsync<RabbitMQClientException>(async () => await channel.DeclareClassicQueue(endpointInputQueue));
3737
Assert.That(mainQueueException.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type'"));
3838
}
3939

40-
using (var channel = connection.CreateModel())
40+
using (var channel = await connection.CreateChannelAsync())
4141
{
42-
var instanceSpecificQueueException = Assert.Catch<RabbitMQClientException>(() => channel.DeclareClassicQueue(endpointInputQueue + "-disc"));
42+
var instanceSpecificQueueException = Assert.CatchAsync<RabbitMQClientException>(async () => await channel.DeclareClassicQueue(endpointInputQueue + "-disc"));
4343
Assert.That(instanceSpecificQueueException.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type'"));
4444
}
4545

46-
using (var channel = connection.CreateModel())
46+
using (var channel = await connection.CreateChannelAsync())
4747
{
48-
var satelliteReceiver = Assert.Catch<RabbitMQClientException>(() => channel.DeclareClassicQueue("QuorumQueueSatelliteReceiver"));
48+
var satelliteReceiver = Assert.CatchAsync<RabbitMQClientException>(async () => await channel.DeclareClassicQueue("QuorumQueueSatelliteReceiver"));
4949
Assert.That(satelliteReceiver.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type'"));
5050
}
5151
}
Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using AcceptanceTesting;
67
using NServiceBus.AcceptanceTests;
@@ -32,59 +33,42 @@ public async Task Should_only_deliver_event_to_one_of_the_instances()
3233
c.ServerBSubscribed = true;
3334
});
3435
})
36+
.Done(ctx => ctx.Counter > 0)
3537
.Run(TimeSpan.FromSeconds(10));
3638

3739
Assert.That(context.Counter, Is.EqualTo(1), "One of the scaled out instances should get the event");
3840
}
3941

4042
public class ScaledOutSubscriber : EndpointConfigurationBuilder
4143
{
42-
public ScaledOutSubscriber()
43-
{
44-
EndpointSetup<DefaultPublisher>();
45-
}
44+
public ScaledOutSubscriber() => EndpointSetup<DefaultPublisher>();
4645

47-
class MyEventHandler : IHandleMessages<MyEvent>
46+
class MyEventHandler(MyContext myContext) : IHandleMessages<MyEvent>
4847
{
49-
readonly MyContext myContext;
50-
51-
public MyEventHandler(MyContext context)
52-
{
53-
myContext = context;
54-
}
55-
5648
public Task Handle(MyEvent message, IMessageHandlerContext context)
5749
{
58-
lock (objLock)
59-
{
60-
myContext.Counter++;
61-
}
62-
50+
myContext.IncrementCounter();
6351
return Task.CompletedTask;
6452
}
65-
66-
static object objLock = new object();
6753
}
6854
}
6955

7056
public class Publisher : EndpointConfigurationBuilder
7157
{
72-
public Publisher()
73-
{
74-
EndpointSetup<DefaultPublisher>();
75-
}
58+
public Publisher() => EndpointSetup<DefaultPublisher>();
7659
}
7760

78-
public class MyEvent : IEvent
79-
{
80-
81-
}
61+
public class MyEvent : IEvent;
8262

8363
class MyContext : ScenarioContext
8464
{
8565
public bool ServerASubscribed { get; set; }
8666
public bool ServerBSubscribed { get; set; }
87-
public int Counter { get; set; }
67+
public int Counter => counter;
68+
69+
public void IncrementCounter() => Interlocked.Increment(ref counter);
70+
71+
int counter;
8872
}
8973
}
9074
}

0 commit comments

Comments
 (0)