Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Auto detect text files and perform LF normalization
* text=auto
*.sh text eol=lf
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
creds: ${{ secrets.AZURE_ACI_CREDENTIALS }}
enable-AzPSSession: true
- name: Setup RabbitMQ
uses: Particular/setup-rabbitmq-action@v1.6.0
uses: Particular/setup-rabbitmq-action@v1.7.0
with:
connection-string-name: RabbitMQTransport_ConnectionString
tag: RabbitMQTransport
Expand Down
14 changes: 14 additions & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[*.{csproj,props,targets,xml}]
indent_style = space
indent_size = 2
xml_space_inside_empty_tag = true

[*.cs]

Expand Down Expand Up @@ -317,6 +318,10 @@ dotnet_naming_rule.non_field_members_should_be_pascal_case.severity = error
dotnet_naming_rule.non_field_members_should_be_pascal_case.symbols = non_field_members
dotnet_naming_rule.non_field_members_should_be_pascal_case.style = pascal_case

dotnet_naming_rule.fields.style = camel_case
dotnet_naming_rule.fields.symbols = fields
dotnet_naming_rule.fields.severity = none

# Symbol specifications
dotnet_naming_symbols.interface.applicable_kinds = interface
dotnet_naming_symbols.interface.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
Expand All @@ -330,6 +335,10 @@ dotnet_naming_symbols.non_field_members.applicable_kinds = property, event, meth
dotnet_naming_symbols.non_field_members.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.non_field_members.required_modifiers =

dotnet_naming_symbols.fields.applicable_kinds = field
dotnet_naming_symbols.fields.applicable_accessibilities = public, internal, private, protected, protected_internal, private_protected
dotnet_naming_symbols.fields.required_modifiers =

# Naming styles
dotnet_naming_style.pascal_case.required_prefix =
dotnet_naming_style.pascal_case.required_suffix =
Expand All @@ -340,3 +349,8 @@ dotnet_naming_style.begins_with_i.required_prefix = I
dotnet_naming_style.begins_with_i.required_suffix =
dotnet_naming_style.begins_with_i.word_separator =
dotnet_naming_style.begins_with_i.capitalization = pascal_case

dotnet_naming_style.camel_case.required_prefix =
dotnet_naming_style.camel_case.required_suffix =
dotnet_naming_style.camel_case.word_separator =
dotnet_naming_style.camel_case.capitalization = camel_case
4 changes: 2 additions & 2 deletions src/Custom.Build.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>

<PropertyGroup>
<MinVerMinimumMajorMinor>9.0</MinVerMinimumMajorMinor>
<MinVerAutoIncrement>minor</MinVerAutoIncrement>
<MinVerMinimumMajorMinor>9.1</MinVerMinimumMajorMinor>
<MinVerAutoIncrement>patch</MinVerAutoIncrement>
</PropertyGroup>

</Project>
4 changes: 2 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel Condition="'$(AnalysisLevel)' == ''">5.0</AnalysisLevel>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
<!-- NuGetAuditMode set to 'all' for tool projects in Directory.Build.targets, other project types default to 'direct' -->
<NuGetAuditLevel>low</NuGetAuditLevel>
<NuGetAuditMode Condition="'$(NuGetAuditMode)' == ''">all</NuGetAuditMode>
<!-- To lock the version of Particular.Analyzers, for example, in a release branch, set this property in Custom.Build.props -->
<ParticularAnalyzersVersion Condition="'$(ParticularAnalyzersVersion)' == ''">2.1.2</ParticularAnalyzersVersion>
<ParticularAnalyzersVersion Condition="'$(ParticularAnalyzersVersion)' == ''">2.1.3</ParticularAnalyzersVersion>
<NServiceBusKey>0024000004800000940000000602000000240000525341310004000001000100dde965e6172e019ac82c2639ffe494dd2e7dd16347c34762a05732b492e110f2e4e2e1b5ef2d85c848ccfb671ee20a47c8d1376276708dc30a90ff1121b647ba3b7259a6bc383b2034938ef0e275b58b920375ac605076178123693c6c4f1331661a62eba28c249386855637780e3ff5f23a6d854700eaa6803ef48907513b92</NServiceBusKey>
<NServiceBusTestsKey>00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5</NServiceBusTestsKey>
</PropertyGroup>
Expand Down
4 changes: 1 addition & 3 deletions src/Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
<Project>

<PropertyGroup>
<NuGetAuditMode Condition="'$(PackAsTool)' == 'true'">all</NuGetAuditMode>
</PropertyGroup>
<Import Project="msbuild\AutomaticVersionRanges.targets" Condition="Exists('msbuild\AutomaticVersionRanges.targets')" />

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.1" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.1.1" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.1.2" GeneratePathProperty="true" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.1.1" />
<PackageReference Include="NServiceBus" Version="9.1.2" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Particular.Packaging" Version="4.0.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="4.2.0" PrivateAssets="All" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Events;
using NUnit.Framework;

[TestFixture]
public class ChannelProviderTests
{
[Test]
public async Task Should_recover_connection_and_dispose_old_one_when_connection_shutdown()
{
var channelProvider = new TestableChannelProvider();
channelProvider.CreateConnection();

var publishConnection = channelProvider.PublishConnections.Dequeue();
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));

channelProvider.DelayTaskCompletionSource.SetResult();

await channelProvider.FireAndForgetAction(CancellationToken.None);

var recoveredConnection = channelProvider.PublishConnections.Dequeue();

Assert.That(publishConnection.WasDisposed, Is.True);
Assert.That(recoveredConnection.WasDisposed, Is.False);
}

[Test]
public void Should_dispose_connection_when_disposed()
{
var channelProvider = new TestableChannelProvider();
channelProvider.CreateConnection();

var publishConnection = channelProvider.PublishConnections.Dequeue();
channelProvider.Dispose();

Assert.That(publishConnection.WasDisposed, Is.True);
}

[Test]
public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_still_pending()
{
var channelProvider = new TestableChannelProvider();
channelProvider.CreateConnection();

var publishConnection = channelProvider.PublishConnections.Dequeue();
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));

// Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing
// to simulate a pending delay task
channelProvider.Dispose();

await channelProvider.FireAndForgetAction(CancellationToken.None);

Assert.That(publishConnection.WasDisposed, Is.True);
Assert.That(channelProvider.PublishConnections.TryDequeue(out _), Is.False);
}

[Test]
public async Task Should_dispose_newly_established_connection()
{
var channelProvider = new TestableChannelProvider();
channelProvider.CreateConnection();

var publishConnection = channelProvider.PublishConnections.Dequeue();
publishConnection.RaiseConnectionShutdown(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "Test"));

// This simulates the race of the reconnection loop being fired off with the delay task completed during
// the disposal of the channel provider. To achieve that it is necessary to kick off the reconnection loop
// and await its completion after the channel provider has been disposed.
var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None);
channelProvider.DelayTaskCompletionSource.SetResult();
channelProvider.Dispose();

await fireAndForgetTask;

var recoveredConnection = channelProvider.PublishConnections.Dequeue();

Assert.That(publishConnection.WasDisposed, Is.True);
Assert.That(recoveredConnection.WasDisposed, Is.True);
}

class TestableChannelProvider() : ChannelProvider(null!, TimeSpan.Zero, null!)
{
public Queue<FakeConnection> PublishConnections { get; } = new();

public TaskCompletionSource DelayTaskCompletionSource { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);

public Func<CancellationToken, Task> FireAndForgetAction { get; private set; }

protected override IConnection CreatePublishConnection()
{
var connection = new FakeConnection();
PublishConnections.Enqueue(connection);
return connection;
}

protected override void FireAndForget(Func<CancellationToken, Task> action, CancellationToken cancellationToken = default)
=> FireAndForgetAction = _ => action(cancellationToken);

protected override async Task DelayReconnect(CancellationToken cancellationToken = default)
{
await using var _ = cancellationToken.Register(() => DelayTaskCompletionSource.TrySetCanceled(cancellationToken));
await DelayTaskCompletionSource.Task;
}
}

class FakeConnection : IConnection
{
public int LocalPort { get; }
public int RemotePort { get; }

public void Dispose() => WasDisposed = true;

public bool WasDisposed { get; private set; }

public void UpdateSecret(string newSecret, string reason) => throw new NotImplementedException();

public void Abort() => throw new NotImplementedException();

public void Abort(ushort reasonCode, string reasonText) => throw new NotImplementedException();

public void Abort(TimeSpan timeout) => throw new NotImplementedException();

public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();

public void Close() => throw new NotImplementedException();

public void Close(ushort reasonCode, string reasonText) => throw new NotImplementedException();

public void Close(TimeSpan timeout) => throw new NotImplementedException();

public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) => throw new NotImplementedException();

public IModel CreateModel() => throw new NotImplementedException();

public void HandleConnectionBlocked(string reason) => throw new NotImplementedException();

public void HandleConnectionUnblocked() => throw new NotImplementedException();

public ushort ChannelMax { get; }
public IDictionary<string, object> ClientProperties { get; }
public ShutdownEventArgs CloseReason { get; }
public AmqpTcpEndpoint Endpoint { get; }
public uint FrameMax { get; }
public TimeSpan Heartbeat { get; }
public bool IsOpen { get; }
public AmqpTcpEndpoint[] KnownHosts { get; }
public IProtocol Protocol { get; }
public IDictionary<string, object> ServerProperties { get; }
public IList<ShutdownReportEntry> ShutdownReport { get; }
public string ClientProvidedName { get; } = $"FakeConnection{Interlocked.Increment(ref connectionCounter)}";
public event EventHandler<CallbackExceptionEventArgs> CallbackException = (_, _) => { };
public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked = (_, _) => { };
public event EventHandler<ShutdownEventArgs> ConnectionShutdown = (_, _) => { };
public event EventHandler<EventArgs> ConnectionUnblocked = (_, _) => { };

public void RaiseConnectionShutdown(ShutdownEventArgs args) => ConnectionShutdown?.Invoke(this, args);

static int connectionCounter;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.1" />
<PackageReference Include="NServiceBus" Version="9.1.1" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="Particular.Approvals" Version="2.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.1" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="9.1.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="9.1.2" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class When_changing_concurrency : NServiceBusTransportTest
public async Task Should_complete_current_message(TransportTransactionMode transactionMode)
{
var triggeredChangeConcurrency = CreateTaskCompletionSource();
var sentMessageReceived = CreateTaskCompletionSource();
Task concurrencyChanged = null;
int invocationCounter = 0;

Expand All @@ -30,6 +31,7 @@ await StartPump(async (context, ct) =>
await task;
}, ct);

sentMessageReceived.SetResult();
await triggeredChangeConcurrency.Task;

}, (_, _) =>
Expand All @@ -40,8 +42,10 @@ await StartPump(async (context, ct) =>
transactionMode);

await SendMessage(InputQueueName);
await sentMessageReceived.Task;
await concurrencyChanged;
await StopPump();

Assert.AreEqual(1, invocationCounter, "message should successfully complete on first processing attempt");
}

Expand All @@ -62,6 +66,7 @@ await StartPump((context, _) =>
if (context.Headers.TryGetValue("FromOnError", out var value) && value == bool.TrueString)
{
sentMessageReceived.SetResult();
return Task.CompletedTask;
}

throw new Exception("triggering recoverability pipeline");
Expand All @@ -84,9 +89,9 @@ await SendMessage(InputQueueName,
transactionMode);

await SendMessage(InputQueueName);

await sentMessageReceived.Task;
await StopPump();

Assert.AreEqual(2, invocationCounter, "there should be exactly 2 messages (initial message and new message from onError pipeline)");
}
}
Expand Down
Loading
Loading