Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit.Analyzers" Version="4.3.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
Expand Down
109 changes: 109 additions & 0 deletions src/AcceptanceTests/Receiving/When_message_visibility_expired.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using Azure.Messaging.ServiceBus;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_message_visibility_expired : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_complete_message_on_next_receive_when_pipeline_successful()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.CustomConfig(c =>
{
// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

[Test]
public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message()
{
var ctx = await Scenario.Define<Context>(c =>
{
c.ShouldThrow = true;
})
.WithEndpoint<Receiver>(b =>
{
b.DoNotFailOnErrorMessages();
b.CustomConfig(c =>
{
var recoverability = c.Recoverability();
recoverability.AddUnrecoverableException<InvalidOperationException>();

// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c)
=> l.Message.StartsWith($"Received message with id '{c.NativeMessageId}' was marked as successfully completed");

class Context : ScenarioContext
{
public bool ShouldThrow { get; set; }

public string NativeMessageId { get; set; }
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
var transport = c.ConfigureTransport<AzureServiceBusTransport>();
// Explicitly setting the transport transaction mode to ReceiveOnly because the message
// tracking only is implemented for this mode.
transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
});
}

public class MyMessage : IMessage;

class MyMessageHandler(Context testContext) : IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var messageEventArgs = context.Extensions.Get<ProcessMessageEventArgs>();
// By abandoning the message, the message will be "immediately available" for retrieval again and effectively the message pump
// has lost the message visibility timeout because any Complete or Abandon will be rejected by the azure service bus.
var serviceBusReceivedMessage = context.Extensions.Get<ServiceBusReceivedMessage>();
await messageEventArgs.AbandonMessageAsync(serviceBusReceivedMessage);

testContext.NativeMessageId = serviceBusReceivedMessage.MessageId;

if (testContext.ShouldThrow)
{
throw new InvalidOperationException("Simulated exception");
}
}
}
}
}
27 changes: 25 additions & 2 deletions src/Tests/FakeProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus.Tests
{
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
Expand All @@ -21,7 +24,27 @@ public class FakeProcessor : ServiceBusProcessor
return Task.CompletedTask;
}

public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver receiver = null, CancellationToken cancellationToken = default)
=> OnProcessMessageAsync(new ProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken));
public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver? receiver = null, CancellationToken cancellationToken = default)
{
var eventArgs = new CustomProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken);
receivedMessageToEventArgs.Add(message, eventArgs);
return OnProcessMessageAsync(eventArgs);
}

readonly ConditionalWeakTable<ServiceBusReceivedMessage, CustomProcessMessageEventArgs>
receivedMessageToEventArgs = [];

sealed class CustomProcessMessageEventArgs : ProcessMessageEventArgs
{
public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, CancellationToken cancellationToken) : base(message, receiver, cancellationToken)
{
}

public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, string identifier, CancellationToken cancellationToken) : base(message, receiver, identifier, cancellationToken)
{
}

public Task RaiseMessageLockLost(MessageLockLostEventArgs args, CancellationToken cancellationToken = default) => OnMessageLockLostAsync(args);
}
}
}
12 changes: 10 additions & 2 deletions src/Tests/FakeReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.Transport.AzureServiceBus.Tests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,25 +10,32 @@ public class FakeReceiver : ServiceBusReceiver
{
readonly List<(ServiceBusReceivedMessage, IDictionary<string, object> propertiesToModify)> abandonedMessages = [];
readonly List<ServiceBusReceivedMessage> completedMessages = [];
readonly List<ServiceBusReceivedMessage> completingMessages = [];

public Func<ServiceBusReceivedMessage, CancellationToken, Task> CompleteMessageCallback = (_, _) => Task.CompletedTask;

public IReadOnlyCollection<(ServiceBusReceivedMessage, IDictionary<string, object> propertiesToModify)> AbandonedMessages
=> abandonedMessages;

public IReadOnlyCollection<ServiceBusReceivedMessage> CompletedMessages
=> completedMessages;

public IReadOnlyCollection<ServiceBusReceivedMessage> CompletingMessages
=> completingMessages;

public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default)
{
abandonedMessages.Add((message, propertiesToModify ?? new Dictionary<string, object>(0)));
return Task.CompletedTask;
}

public override Task CompleteMessageAsync(ServiceBusReceivedMessage message,
public override async Task CompleteMessageAsync(ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
completingMessages.Add(message);
await CompleteMessageCallback(message, cancellationToken);
completedMessages.Add(message);
return Task.CompletedTask;
}
}
}
12 changes: 8 additions & 4 deletions src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus" Version="9.2.2" />
<PackageReference Include="NServiceBus.Testing" Version="9.0.0" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit.Analyzers" Version="4.3.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
</ItemGroup>

</Project>
137 changes: 137 additions & 0 deletions src/Tests/Receiving/MessagePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
Expand Down Expand Up @@ -67,6 +68,142 @@ await pump.Initialize(new PushRuntimeSettings(1), (context, token) =>
});
}

[Test]
public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeClient = new FakeServiceBusClient();
var fakeReceiver = new FakeReceiver();
var onMessageCalled = 0;
var onErrorCalled = 0;

var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress",
new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null);

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled());

await pump.Initialize(new PushRuntimeSettings(1), (_, _) =>
{
onMessageCalled++;
return Task.CompletedTask;
},
(_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, CancellationToken.None);
await pump.StartReceive();

var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));
var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));

fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ?
Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) :
Task.CompletedTask;

var fakeProcessor = fakeClient.Processors["receiveAddress"];
await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver);
await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver);

Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage));
Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage));
Assert.That(fakeReceiver.AbandonedMessages, Is.Empty);
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.Zero);
}

[Test]
public async Task Should_abandon_message_in_atomic_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeClient = new FakeServiceBusClient();
var fakeReceiver = new FakeReceiver();
var onMessageCalled = 0;
var onErrorCalled = 0;

var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive }, "receiveAddress",
new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null);

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled());

await pump.Initialize(new PushRuntimeSettings(1), (_, _) =>
{
onMessageCalled++;
return Task.CompletedTask;
},
(_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, CancellationToken.None);
await pump.StartReceive();

var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));

fakeReceiver.CompleteMessageCallback = (message, _) => message == receivedMessage ?
Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) :
Task.CompletedTask;

var fakeProcessor = fakeClient.Processors["receiveAddress"];
await fakeProcessor.ProcessMessage(receivedMessage, fakeReceiver);

Assert.Multiple(() =>
{
Assert.That(fakeReceiver.AbandonedMessages.Select((tuple, _) => { var (message, _) = tuple; return message; })
.ToList(), Does.Contain(receivedMessage));
Assert.That(fakeReceiver.CompletedMessages, Is.Empty);
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.EqualTo(1));
});
}

[Test]
public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_error_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeClient = new FakeServiceBusClient();
var fakeReceiver = new FakeReceiver();
var onMessageCalled = 0;
var onErrorCalled = 0;

var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress",
new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null);

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled());

await pump.Initialize(new PushRuntimeSettings(1), (_, _) =>
{
onMessageCalled++;
return Task.FromException<InvalidOperationException>(new InvalidOperationException());
},
(_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, CancellationToken.None);
await pump.StartReceive();

var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));
var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));

fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ?
Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) :
Task.CompletedTask;

var fakeProcessor = fakeClient.Processors["receiveAddress"];
await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver);
await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver);

Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage));
Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage));
Assert.That(fakeReceiver.AbandonedMessages, Is.Empty);
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.EqualTo(1));
}

[Test]
public async Task Should_abandon_message_upon_failure_with_retry_required()
{
Expand Down
1 change: 1 addition & 0 deletions src/Transport/NServiceBus.Transport.AzureServiceBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="[7.18.1, 8.0.0)" />
<PackageReference Include="BitFaster.Caching" Version="[2.5.2, 3.0.0)" />
<PackageReference Include="NServiceBus" Version="[9.0.0, 10.0.0)" />
</ItemGroup>

Expand Down
Loading