From 69e24861f07309dcf988cfc619a382bd7837ac24 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 3 Sep 2024 13:17:08 +0200 Subject: [PATCH 01/12] During receive only mode track messages that should have been completed --- src/Tests/FakeProcessor.cs | 27 +++- src/Tests/FakeReceiver.cs | 12 +- src/Tests/Receiving/MessagePumpTests.cs | 137 ++++++++++++++++++ ...erviceBus.Transport.AzureServiceBus.csproj | 1 + src/Transport/Receiving/MessagePump.cs | 56 ++----- .../ProcessMessageEventArgsExtensions.cs | 127 +++++++++++++++- .../Utilities/TransactionExtensions.cs | 10 +- 7 files changed, 312 insertions(+), 58 deletions(-) diff --git a/src/Tests/FakeProcessor.cs b/src/Tests/FakeProcessor.cs index 2195fb63..80e38553 100644 --- a/src/Tests/FakeProcessor.cs +++ b/src/Tests/FakeProcessor.cs @@ -1,5 +1,8 @@ +#nullable enable + namespace NServiceBus.Transport.AzureServiceBus.Tests { + using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; @@ -21,7 +24,27 @@ public class FakeProcessor : ServiceBusProcessor return Task.CompletedTask; } - public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver receiver = null, CancellationToken cancellationToken = default) - => OnProcessMessageAsync(new ProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken)); + public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver? receiver = null, CancellationToken cancellationToken = default) + { + var eventArgs = new CustomProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken); + receivedMessageToEventArgs.Add(message, eventArgs); + return OnProcessMessageAsync(eventArgs); + } + + readonly ConditionalWeakTable + receivedMessageToEventArgs = []; + + sealed class CustomProcessMessageEventArgs : ProcessMessageEventArgs + { + public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, CancellationToken cancellationToken) : base(message, receiver, cancellationToken) + { + } + + public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, string identifier, CancellationToken cancellationToken) : base(message, receiver, identifier, cancellationToken) + { + } + + public Task RaiseMessageLockLost(MessageLockLostEventArgs args, CancellationToken cancellationToken = default) => OnMessageLockLostAsync(args); + } } } \ No newline at end of file diff --git a/src/Tests/FakeReceiver.cs b/src/Tests/FakeReceiver.cs index 304c2eaf..239caa7f 100644 --- a/src/Tests/FakeReceiver.cs +++ b/src/Tests/FakeReceiver.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Transport.AzureServiceBus.Tests { + using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -9,6 +10,9 @@ public class FakeReceiver : ServiceBusReceiver { readonly List<(ServiceBusReceivedMessage, IDictionary propertiesToModify)> abandonedMessages = []; readonly List completedMessages = []; + readonly List completingMessages = []; + + public Func CompleteMessageCallback = (_, _) => Task.CompletedTask; public IReadOnlyCollection<(ServiceBusReceivedMessage, IDictionary propertiesToModify)> AbandonedMessages => abandonedMessages; @@ -16,6 +20,9 @@ public class FakeReceiver : ServiceBusReceiver public IReadOnlyCollection CompletedMessages => completedMessages; + public IReadOnlyCollection CompletingMessages + => completingMessages; + public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) { @@ -23,11 +30,12 @@ public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDic return Task.CompletedTask; } - public override Task CompleteMessageAsync(ServiceBusReceivedMessage message, + public override async Task CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken = default) { + completingMessages.Add(message); + await CompleteMessageCallback(message, cancellationToken); completedMessages.Add(message); - return Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/Tests/Receiving/MessagePumpTests.cs b/src/Tests/Receiving/MessagePumpTests.cs index a63c989e..f94f843e 100644 --- a/src/Tests/Receiving/MessagePumpTests.cs +++ b/src/Tests/Receiving/MessagePumpTests.cs @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving { using System; using System.Collections.Generic; + using System.Linq; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; @@ -67,6 +68,142 @@ await pump.Initialize(new PushRuntimeSettings(1), (context, token) => }); } + [Test] + public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease() + { + var fakeClient = new FakeServiceBusClient(); + var fakeReceiver = new FakeReceiver(); + var onMessageCalled = 0; + var onErrorCalled = 0; + + var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress", + new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null); + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled()); + + await pump.Initialize(new PushRuntimeSettings(1), (_, _) => + { + onMessageCalled++; + return Task.CompletedTask; + }, + (_, _) => + { + onErrorCalled++; + return Task.FromResult(ErrorHandleResult.Handled); + }, CancellationToken.None); + await pump.StartReceive(); + + var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + + fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ? + Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) : + Task.CompletedTask; + + var fakeProcessor = fakeClient.Processors["receiveAddress"]; + await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver); + await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver); + + Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage)); + Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage)); + Assert.That(fakeReceiver.AbandonedMessages, Is.Empty); + Assert.That(onMessageCalled, Is.EqualTo(1)); + Assert.That(onErrorCalled, Is.Zero); + } + + [Test] + public async Task Should_abandon_message_in_atomic_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease() + { + var fakeClient = new FakeServiceBusClient(); + var fakeReceiver = new FakeReceiver(); + var onMessageCalled = 0; + var onErrorCalled = 0; + + var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive }, "receiveAddress", + new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null); + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled()); + + await pump.Initialize(new PushRuntimeSettings(1), (_, _) => + { + onMessageCalled++; + return Task.CompletedTask; + }, + (_, _) => + { + onErrorCalled++; + return Task.FromResult(ErrorHandleResult.Handled); + }, CancellationToken.None); + await pump.StartReceive(); + + var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + + fakeReceiver.CompleteMessageCallback = (message, _) => message == receivedMessage ? + Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) : + Task.CompletedTask; + + var fakeProcessor = fakeClient.Processors["receiveAddress"]; + await fakeProcessor.ProcessMessage(receivedMessage, fakeReceiver); + + Assert.Multiple(() => + { + Assert.That(fakeReceiver.AbandonedMessages.Select((tuple, _) => { var (message, _) = tuple; return message; }) + .ToList(), Does.Contain(receivedMessage)); + Assert.That(fakeReceiver.CompletedMessages, Is.Empty); + Assert.That(onMessageCalled, Is.EqualTo(1)); + Assert.That(onErrorCalled, Is.EqualTo(1)); + }); + } + + [Test] + public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_error_pipeline_successful_but_completion_failed_due_to_expired_lease() + { + var fakeClient = new FakeServiceBusClient(); + var fakeReceiver = new FakeReceiver(); + var onMessageCalled = 0; + var onErrorCalled = 0; + + var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress", + new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null); + + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled()); + + await pump.Initialize(new PushRuntimeSettings(1), (_, _) => + { + onMessageCalled++; + return Task.FromException(new InvalidOperationException()); + }, + (_, _) => + { + onErrorCalled++; + return Task.FromResult(ErrorHandleResult.Handled); + }, CancellationToken.None); + await pump.StartReceive(); + + var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60)); + + fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ? + Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) : + Task.CompletedTask; + + var fakeProcessor = fakeClient.Processors["receiveAddress"]; + await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver); + await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver); + + Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage)); + Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage)); + Assert.That(fakeReceiver.AbandonedMessages, Is.Empty); + Assert.That(onMessageCalled, Is.EqualTo(1)); + Assert.That(onErrorCalled, Is.EqualTo(1)); + } + [Test] public async Task Should_abandon_message_upon_failure_with_retry_required() { diff --git a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj index 10580aaf..d49a88c4 100644 --- a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj +++ b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index daafad33..d53af65b 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using System.Transactions; using Azure.Messaging.ServiceBus; + using BitFaster.Caching.Lru; using Extensibility; using Logging; @@ -15,6 +16,7 @@ class MessagePump : IMessageReceiver readonly ReceiveSettings receiveSettings; readonly Action criticalErrorAction; readonly ServiceBusClient serviceBusClient; + readonly FastConcurrentLru messagesToBeCompleted = new(1_000); OnMessage onMessage; OnError onError; @@ -135,28 +137,17 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg) { messageId = message.GetMessageId(); - if (processor.ReceiveMode == ServiceBusReceiveMode.PeekLock && message.LockedUntil < DateTimeOffset.UtcNow) + // Deliberately not using the cancellation token to make sure we abandon the message even when the + // cancellation token is already set. + if (await arg.TrySafeCompleteMessageAsync(message, transportSettings.TransportTransactionMode, messagesToBeCompleted, CancellationToken.None).ConfigureAwait(false)) { - Logger.Warn( - $"Skip handling the message with id '{messageId}' because the lock has expired at '{message.LockedUntil}'. " + - "This is usually an indication that the endpoint prefetches more messages than it is able to handle within the configured" + - " peek lock duration. Consider tweaking the prefetch configuration to values that are better aligned with the concurrency" + - " of the endpoint and the time it takes to handle the messages."); + return; + } - try - { - // Deliberately not using the cancellation token to make sure we abandon the message even when the - // cancellation token is already set. - await arg.SafeAbandonMessageAsync(message, - transportSettings.TransportTransactionMode, - cancellationToken: CancellationToken.None) - .ConfigureAwait(false); - } - catch (Exception abandonException) - { - // nothing we can do about it, message will be retried - Logger.Debug($"Error abandoning the message with id '{messageId}' because the lock has expired at '{message.LockedUntil}.", abandonException); - } + // Deliberately not using the cancellation token to make sure we abandon the message even when the + // cancellation token is already set. + if (await arg.TrySafeAbandonMessageAsync(message, transportSettings.TransportTransactionMode, CancellationToken.None).ConfigureAwait(false)) + { return; } @@ -165,26 +156,7 @@ await arg.SafeAbandonMessageAsync(message, } catch (Exception ex) { - var tryDeadlettering = transportSettings.TransportTransactionMode != TransportTransactionMode.None; - - Logger.Warn($"Poison message detected. Message {(tryDeadlettering ? "will be moved to the poison queue" : "will be discarded, transaction mode is set to None")}. Exception: {ex.Message}", ex); - - if (tryDeadlettering) - { - try - { - await arg.DeadLetterMessageAsync(message, - deadLetterReason: "Poisoned message", - deadLetterErrorDescription: ex.Message, - cancellationToken: arg.CancellationToken) - .ConfigureAwait(false); - } - catch (Exception deadLetterEx) when (!deadLetterEx.IsCausedBy(arg.CancellationToken)) - { - // nothing we can do about it, message will be retried - Logger.Debug("Error dead lettering poisoned message.", deadLetterEx); - } - } + await arg.SafeDeadLetterMessageAsync(message, transportSettings.TransportTransactionMode, ex, CancellationToken.None).ConfigureAwait(false); return; } @@ -287,6 +259,7 @@ async Task ProcessMessage(ServiceBusReceivedMessage message, await processMessageEventArgs.SafeCompleteMessageAsync(message, transportSettings.TransportTransactionMode, azureServiceBusTransaction, + messagesToBeCompleted, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); @@ -311,6 +284,7 @@ await processMessageEventArgs.SafeCompleteMessageAsync(message, await processMessageEventArgs.SafeCompleteMessageAsync(message, transportSettings.TransportTransactionMode, azureServiceBusTransaction, + messagesToBeCompleted, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } @@ -326,7 +300,7 @@ await processMessageEventArgs.SafeAbandonMessageAsync(message, .ConfigureAwait(false); } } - catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient || onErrorEx.Reason is ServiceBusFailureReason.MessageLockLost) + catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient) { Logger.Debug("Failed to execute recoverability.", onErrorEx); diff --git a/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs b/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs index 12f32663..bc8c7072 100644 --- a/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs +++ b/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs @@ -1,29 +1,140 @@ namespace NServiceBus.Transport.AzureServiceBus { + using System; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; + using BitFaster.Caching; + using Logging; static class ProcessMessageEventArgsExtensions { + public static async ValueTask TrySafeCompleteMessageAsync(this ProcessMessageEventArgs args, + ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, + ICache messagesToBeCompleted, + CancellationToken cancellationToken = default) + { + if (transportTransactionMode == TransportTransactionMode.ReceiveOnly && messagesToBeCompleted.TryGet(message.GetMessageId(), out _)) + { + Logger.DebugFormat("Received message with id '{0}' was marked as successfully completed. Trying to immediately acknowledge the message without invoking the pipeline.", message.GetMessageId()); + + try + { + await args.CompleteMessageAsync(message, cancellationToken: cancellationToken) + .ConfigureAwait(false); + return true; + } + // Doing a more generous catch here to make sure we are not losing the ID and can mark it to be completed another time + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) + { + messagesToBeCompleted.AddOrUpdate(message.GetMessageId(), true); + throw; + } + } + return false; + } + + public static async ValueTask TrySafeAbandonMessageAsync(this ProcessMessageEventArgs args, + ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, + CancellationToken cancellationToken = default) + { + // TransportTransactionMode.None uses ReceiveAndDelete mode which means the message is already removed from the queue + // once we get it. Therefore, we don't need to abandon it. + if (transportTransactionMode != TransportTransactionMode.None && message.LockedUntil < DateTimeOffset.UtcNow) + { + Logger.Warn( + $"Skip handling the message with id '{message.GetMessageId()}' because the lock has expired at '{message.LockedUntil}'. " + + "This is usually an indication that the endpoint prefetches more messages than it is able to handle within the configured" + + " peek lock duration. Consider tweaking the prefetch configuration to values that are better aligned with the concurrency" + + " of the endpoint and the time it takes to handle the messages."); + + try + { + await args.SafeAbandonMessageAsync(message, transportTransactionMode, cancellationToken: cancellationToken) + .ConfigureAwait(false); + return true; + } + catch (Exception e) when (!e.IsCausedBy(cancellationToken)) + { + // nothing we can do about it, message will be retried + Logger.Debug($"Error abandoning the message with id '{message.GetMessageId()}' because the lock has expired at '{message.LockedUntil}.", e); + } + } + return false; + } + + public static async Task SafeDeadLetterMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, + TransportTransactionMode transportTransactionMode, Exception exception, CancellationToken cancellationToken = default) + { + if (transportTransactionMode != TransportTransactionMode.None) + { + Logger.Warn($"Poison message detected. Message will be moved to the poison queue. Exception: {exception.Message}", exception); + + try + { + await args.DeadLetterMessageAsync(message, + deadLetterReason: "Poisoned message", + deadLetterErrorDescription: exception.Message, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + catch (Exception deadLetterEx) when (!deadLetterEx.IsCausedBy(cancellationToken)) + { + // nothing we can do about it, message will be retried + Logger.Debug("Error dead lettering poisoned message.", deadLetterEx); + } + } + else + { + Logger.Warn($"Poison message detected. Message will be discarded, transaction mode is set to None. Exception: {exception.Message}", exception); + } + } + public static async Task SafeCompleteMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, AzureServiceBusTransportTransaction azureServiceBusTransaction, + ICache messagesToBeCompleted, CancellationToken cancellationToken = default) { if (transportTransactionMode != TransportTransactionMode.None) { - using var scope = azureServiceBusTransaction.ToTransactionScope(); - await args.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); - - scope.Complete(); + try + { + using var scope = azureServiceBusTransaction.ToTransactionScope(); + await args.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); + scope.Complete(); + } + catch (ServiceBusException e) when (transportTransactionMode == TransportTransactionMode.ReceiveOnly && e.Reason == ServiceBusFailureReason.MessageLockLost) + { + // We tried to complete the message because it was successfully either by the pipeline or recoverability, but the lock was lost. + // To make sure we are not reprocessing it unnecessarily we are tracking the message ID and will complete it + // on the next receive. For SendsWithAtomicReceive it is necessary to throw which causes the rollback + // of the transaction and will trigger recoverability. + messagesToBeCompleted.AddOrUpdate(message.GetMessageId(), true); + } } } - public static Task SafeAbandonMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, + public static async Task SafeAbandonMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, CancellationToken cancellationToken = default) - => transportTransactionMode != TransportTransactionMode.None - ? args.AbandonMessageAsync(message, cancellationToken: cancellationToken) - : Task.CompletedTask; + { + if (transportTransactionMode != TransportTransactionMode.None) + { + try + { + await args.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost) + { + // We tried to abandon the message because it needs to be retried, but the lock was lost. + // the message will reappear on the next receive anyway so we can just ignore this case. + Logger.DebugFormat("Attempted to abandon the message with id '{0}' but the lock was lost.", message.GetMessageId()); + } + } + } + + // The extension methods here are related to functionality of the message pump. Therefore the same logger name + // is used as the message pump. + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/Transport/Utilities/TransactionExtensions.cs b/src/Transport/Utilities/TransactionExtensions.cs index 2de71429..0d259703 100644 --- a/src/Transport/Utilities/TransactionExtensions.cs +++ b/src/Transport/Utilities/TransactionExtensions.cs @@ -1,14 +1,14 @@ -namespace NServiceBus.Transport.AzureServiceBus +#nullable enable + +namespace NServiceBus.Transport.AzureServiceBus { using System.Transactions; static class TransactionExtensions { - public static TransactionScope ToScope(this Transaction transaction) - { - return transaction != null + public static TransactionScope ToScope(this Transaction? transaction) => + transaction != null ? new TransactionScope(transaction, TransactionScopeAsyncFlowOption.Enabled) : new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled); - } } } \ No newline at end of file From 057195a239660ea63647fd8c265c06528f37d3e3 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Tue, 10 Sep 2024 20:12:40 -0700 Subject: [PATCH 02/12] Add check for onError messageLockLost exception (cherry picked from commit 4bd949b1a10aa6b60b705f057cb27bcc5b93aa89) --- src/Transport/Receiving/MessagePump.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index d53af65b..f4b2ce9d 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -300,7 +300,7 @@ await processMessageEventArgs.SafeAbandonMessageAsync(message, .ConfigureAwait(false); } } - catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient) + catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient || onErrorEx.Reason == ServiceBusFailureReason.MessageLockLost) { Logger.Debug("Failed to execute recoverability.", onErrorEx); From 6e7e49837b4be3b6adfda37dd3778d97955e1f76 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 10:58:20 +0200 Subject: [PATCH 03/12] Acceptance Test --- .../When_message_visibility_expired.cs | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 src/AcceptanceTests/Receiving/When_message_visibility_expired.cs diff --git a/src/AcceptanceTests/Receiving/When_message_visibility_expired.cs b/src/AcceptanceTests/Receiving/When_message_visibility_expired.cs new file mode 100644 index 00000000..2e4ff6ae --- /dev/null +++ b/src/AcceptanceTests/Receiving/When_message_visibility_expired.cs @@ -0,0 +1,109 @@ +namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests +{ + using System; + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using Azure.Messaging.ServiceBus; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + + public class When_message_visibility_expired : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_complete_message_on_next_receive_when_pipeline_successful() + { + var ctx = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(c => + { + // Limiting the concurrency for this test to make sure messages that are made available again are + // not concurrently processed. This is not necessary for the test to pass but it makes + // reasoning about the test easier. + c.LimitMessageProcessingConcurrencyTo(1); + }); + b.When((session, _) => session.SendLocal(new MyMessage())); + }) + .Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c))) + .Run(); + + var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray(); + + Assert.That(items, Is.Not.Empty); + } + + [Test] + public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message() + { + var ctx = await Scenario.Define(c => + { + c.ShouldThrow = true; + }) + .WithEndpoint(b => + { + b.DoNotFailOnErrorMessages(); + b.CustomConfig(c => + { + var recoverability = c.Recoverability(); + recoverability.AddUnrecoverableException(); + + // Limiting the concurrency for this test to make sure messages that are made available again are + // not concurrently processed. This is not necessary for the test to pass but it makes + // reasoning about the test easier. + c.LimitMessageProcessingConcurrencyTo(1); + }); + b.When((session, _) => session.SendLocal(new MyMessage())); + }) + .Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c))) + .Run(); + + var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray(); + + Assert.That(items, Is.Not.Empty); + } + + static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c) + => l.Message.StartsWith($"Received message with id '{c.NativeMessageId}' was marked as successfully completed"); + + class Context : ScenarioContext + { + public bool ShouldThrow { get; set; } + + public string NativeMessageId { get; set; } + } + + class Receiver : EndpointConfigurationBuilder + { + public Receiver() => EndpointSetup(c => + { + var transport = c.ConfigureTransport(); + // Explicitly setting the transport transaction mode to ReceiveOnly because the message + // tracking only is implemented for this mode. + transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + }); + } + + public class MyMessage : IMessage; + + class MyMessageHandler(Context testContext) : IHandleMessages + { + public async Task Handle(MyMessage message, IMessageHandlerContext context) + { + var messageEventArgs = context.Extensions.Get(); + // By abandoning the message, the message will be "immediately available" for retrieval again and effectively the message pump + // has lost the message visibility timeout because any Complete or Abandon will be rejected by the azure service bus. + var serviceBusReceivedMessage = context.Extensions.Get(); + await messageEventArgs.AbandonMessageAsync(serviceBusReceivedMessage); + + testContext.NativeMessageId = serviceBusReceivedMessage.MessageId; + + if (testContext.ShouldThrow) + { + throw new InvalidOperationException("Simulated exception"); + } + } + } + } +} \ No newline at end of file From e81c50ed7a12f3842fa89edb141d9e07df9b284a Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 10:59:16 +0200 Subject: [PATCH 04/12] Small cosmetic changes --- src/Transport/Receiving/MessagePump.cs | 31 ++++++++++++-------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index f4b2ce9d..b59c1c13 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -205,10 +205,10 @@ public async Task StopReceive(CancellationToken cancellationToken = default) { // Wiring up the stop token to trigger the cancellation token that is being // used inside the message handling pipeline - using var _ = cancellationToken + await using var _ = cancellationToken .Register(state => (state as CancellationTokenSource)?.Cancel(), messageProcessingCancellationTokenSource, - useSynchronizationContext: false); + useSynchronizationContext: false).ConfigureAwait(false); // Deliberately not passing the cancellation token forward in order to make sure // the processor waits until all processing handlers have returned. This makes // the code compliant to the previous version that uses manual receives and is aligned @@ -244,27 +244,24 @@ async Task ProcessMessage(ServiceBusReceivedMessage message, // args.CancellationToken is currently not used because the v8 version that supports cancellation was designed // to not flip the cancellation token until the very last moment in time when the stop token is flipped. var contextBag = new ContextBag(); + contextBag.Set(message); + contextBag.Set(processMessageEventArgs); try { - using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey)) - { - contextBag.Set(message); - contextBag.Set(processMessageEventArgs); - - var messageContext = new MessageContext(messageId, headers, body, azureServiceBusTransaction.TransportTransaction, ReceiveAddress, contextBag); + using var azureServiceBusTransaction = CreateTransaction(message.PartitionKey); + var messageContext = new MessageContext(messageId, headers, body, azureServiceBusTransaction.TransportTransaction, ReceiveAddress, contextBag); - await onMessage(messageContext, messageProcessingCancellationToken).ConfigureAwait(false); + await onMessage(messageContext, messageProcessingCancellationToken).ConfigureAwait(false); - await processMessageEventArgs.SafeCompleteMessageAsync(message, - transportSettings.TransportTransactionMode, - azureServiceBusTransaction, - messagesToBeCompleted, - cancellationToken: messageProcessingCancellationToken) - .ConfigureAwait(false); + await processMessageEventArgs.SafeCompleteMessageAsync(message, + transportSettings.TransportTransactionMode, + azureServiceBusTransaction, + messagesToBeCompleted, + cancellationToken: messageProcessingCancellationToken) + .ConfigureAwait(false); - azureServiceBusTransaction.Commit(); - } + azureServiceBusTransaction.Commit(); } catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken)) { From 98a75b834a0e3e5b52b0c1e6fd1548f4cf396b8a Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 11:54:18 +0200 Subject: [PATCH 05/12] Remove unnecessary async suffix --- src/Transport/Receiving/MessagePump.cs | 16 ++++++++-------- .../ProcessMessageEventArgsExtensions.cs | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index b59c1c13..0f04f8cf 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -139,14 +139,14 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg) // Deliberately not using the cancellation token to make sure we abandon the message even when the // cancellation token is already set. - if (await arg.TrySafeCompleteMessageAsync(message, transportSettings.TransportTransactionMode, messagesToBeCompleted, CancellationToken.None).ConfigureAwait(false)) + if (await arg.TrySafeCompleteMessage(message, transportSettings.TransportTransactionMode, messagesToBeCompleted, CancellationToken.None).ConfigureAwait(false)) { return; } // Deliberately not using the cancellation token to make sure we abandon the message even when the // cancellation token is already set. - if (await arg.TrySafeAbandonMessageAsync(message, transportSettings.TransportTransactionMode, CancellationToken.None).ConfigureAwait(false)) + if (await arg.TrySafeAbandonMessage(message, transportSettings.TransportTransactionMode, CancellationToken.None).ConfigureAwait(false)) { return; } @@ -156,7 +156,7 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg) } catch (Exception ex) { - await arg.SafeDeadLetterMessageAsync(message, transportSettings.TransportTransactionMode, ex, CancellationToken.None).ConfigureAwait(false); + await arg.SafeDeadLetterMessage(message, transportSettings.TransportTransactionMode, ex, CancellationToken.None).ConfigureAwait(false); return; } @@ -254,7 +254,7 @@ async Task ProcessMessage(ServiceBusReceivedMessage message, await onMessage(messageContext, messageProcessingCancellationToken).ConfigureAwait(false); - await processMessageEventArgs.SafeCompleteMessageAsync(message, + await processMessageEventArgs.SafeCompleteMessage(message, transportSettings.TransportTransactionMode, azureServiceBusTransaction, messagesToBeCompleted, @@ -278,7 +278,7 @@ await processMessageEventArgs.SafeCompleteMessageAsync(message, if (result == ErrorHandleResult.Handled) { - await processMessageEventArgs.SafeCompleteMessageAsync(message, + await processMessageEventArgs.SafeCompleteMessage(message, transportSettings.TransportTransactionMode, azureServiceBusTransaction, messagesToBeCompleted, @@ -291,7 +291,7 @@ await processMessageEventArgs.SafeCompleteMessageAsync(message, if (result == ErrorHandleResult.RetryRequired) { - await processMessageEventArgs.SafeAbandonMessageAsync(message, + await processMessageEventArgs.SafeAbandonMessage(message, transportSettings.TransportTransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); @@ -301,7 +301,7 @@ await processMessageEventArgs.SafeAbandonMessageAsync(message, { Logger.Debug("Failed to execute recoverability.", onErrorEx); - await processMessageEventArgs.SafeAbandonMessageAsync(message, + await processMessageEventArgs.SafeAbandonMessage(message, transportSettings.TransportTransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); @@ -314,7 +314,7 @@ await processMessageEventArgs.SafeAbandonMessageAsync(message, { criticalErrorAction($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorEx, messageProcessingCancellationToken); - await processMessageEventArgs.SafeAbandonMessageAsync(message, + await processMessageEventArgs.SafeAbandonMessage(message, transportSettings.TransportTransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); diff --git a/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs b/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs index bc8c7072..211ec1cc 100644 --- a/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs +++ b/src/Transport/Receiving/ProcessMessageEventArgsExtensions.cs @@ -9,7 +9,7 @@ static class ProcessMessageEventArgsExtensions { - public static async ValueTask TrySafeCompleteMessageAsync(this ProcessMessageEventArgs args, + public static async ValueTask TrySafeCompleteMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, ICache messagesToBeCompleted, CancellationToken cancellationToken = default) @@ -34,7 +34,7 @@ await args.CompleteMessageAsync(message, cancellationToken: cancellationToken) return false; } - public static async ValueTask TrySafeAbandonMessageAsync(this ProcessMessageEventArgs args, + public static async ValueTask TrySafeAbandonMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, CancellationToken cancellationToken = default) { @@ -50,7 +50,7 @@ public static async ValueTask TrySafeAbandonMessageAsync(this ProcessMessa try { - await args.SafeAbandonMessageAsync(message, transportTransactionMode, cancellationToken: cancellationToken) + await args.SafeAbandonMessage(message, transportTransactionMode, cancellationToken: cancellationToken) .ConfigureAwait(false); return true; } @@ -63,7 +63,7 @@ await args.SafeAbandonMessageAsync(message, transportTransactionMode, cancellati return false; } - public static async Task SafeDeadLetterMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, + public static async Task SafeDeadLetterMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, Exception exception, CancellationToken cancellationToken = default) { if (transportTransactionMode != TransportTransactionMode.None) @@ -90,7 +90,7 @@ await args.DeadLetterMessageAsync(message, } } - public static async Task SafeCompleteMessageAsync(this ProcessMessageEventArgs args, + public static async Task SafeCompleteMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, AzureServiceBusTransportTransaction azureServiceBusTransaction, ICache messagesToBeCompleted, @@ -115,7 +115,7 @@ public static async Task SafeCompleteMessageAsync(this ProcessMessageEventArgs a } } - public static async Task SafeAbandonMessageAsync(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, + public static async Task SafeAbandonMessage(this ProcessMessageEventArgs args, ServiceBusReceivedMessage message, TransportTransactionMode transportTransactionMode, CancellationToken cancellationToken = default) { if (transportTransactionMode != TransportTransactionMode.None) From 8133cfc73ccbb6cb7a947b86862dc3c38f3fe34c Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 11:56:47 +0200 Subject: [PATCH 06/12] Simplify catch guard --- src/Transport/Receiving/MessagePump.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 0f04f8cf..2f7f21ea 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -306,11 +306,7 @@ await processMessageEventArgs.SafeAbandonMessage(message, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } - catch (Exception onErrorEx) when (onErrorEx.IsCausedBy(messageProcessingCancellationToken)) - { - throw; - } - catch (Exception onErrorEx) + catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(messageProcessingCancellationToken)) { criticalErrorAction($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorEx, messageProcessingCancellationToken); From 93cc9297a9908059fbca72a53506436febf981b9 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 12:03:09 +0200 Subject: [PATCH 07/12] Cleanup transaction mode access --- src/Transport/Receiving/MessagePump.cs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 2f7f21ea..1b2afd82 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -72,7 +72,7 @@ public async Task StartReceive(CancellationToken cancellationToken = default) var receiveOptions = new ServiceBusProcessorOptions { PrefetchCount = prefetchCount, - ReceiveMode = transportSettings.TransportTransactionMode == TransportTransactionMode.None + ReceiveMode = TransactionMode == TransportTransactionMode.None ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock, Identifier = $"Processor-{Id}-{ReceiveAddress}-{Guid.NewGuid()}", @@ -110,6 +110,8 @@ await processor.StartProcessingAsync(cancellationToken) .ConfigureAwait(false); } + TransportTransactionMode TransactionMode => transportSettings.TransportTransactionMode; + int CalculatePrefetchCount() { var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier; @@ -139,14 +141,14 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg) // Deliberately not using the cancellation token to make sure we abandon the message even when the // cancellation token is already set. - if (await arg.TrySafeCompleteMessage(message, transportSettings.TransportTransactionMode, messagesToBeCompleted, CancellationToken.None).ConfigureAwait(false)) + if (await arg.TrySafeCompleteMessage(message, TransactionMode, messagesToBeCompleted, CancellationToken.None).ConfigureAwait(false)) { return; } // Deliberately not using the cancellation token to make sure we abandon the message even when the // cancellation token is already set. - if (await arg.TrySafeAbandonMessage(message, transportSettings.TransportTransactionMode, CancellationToken.None).ConfigureAwait(false)) + if (await arg.TrySafeAbandonMessage(message, TransactionMode, CancellationToken.None).ConfigureAwait(false)) { return; } @@ -156,7 +158,7 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg) } catch (Exception ex) { - await arg.SafeDeadLetterMessage(message, transportSettings.TransportTransactionMode, ex, CancellationToken.None).ConfigureAwait(false); + await arg.SafeDeadLetterMessage(message, TransactionMode, ex, CancellationToken.None).ConfigureAwait(false); return; } @@ -255,7 +257,7 @@ async Task ProcessMessage(ServiceBusReceivedMessage message, await onMessage(messageContext, messageProcessingCancellationToken).ConfigureAwait(false); await processMessageEventArgs.SafeCompleteMessage(message, - transportSettings.TransportTransactionMode, + TransactionMode, azureServiceBusTransaction, messagesToBeCompleted, cancellationToken: messageProcessingCancellationToken) @@ -279,7 +281,7 @@ await processMessageEventArgs.SafeCompleteMessage(message, if (result == ErrorHandleResult.Handled) { await processMessageEventArgs.SafeCompleteMessage(message, - transportSettings.TransportTransactionMode, + TransactionMode, azureServiceBusTransaction, messagesToBeCompleted, cancellationToken: messageProcessingCancellationToken) @@ -292,7 +294,7 @@ await processMessageEventArgs.SafeCompleteMessage(message, if (result == ErrorHandleResult.RetryRequired) { await processMessageEventArgs.SafeAbandonMessage(message, - transportSettings.TransportTransactionMode, + TransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } @@ -302,7 +304,7 @@ await processMessageEventArgs.SafeAbandonMessage(message, Logger.Debug("Failed to execute recoverability.", onErrorEx); await processMessageEventArgs.SafeAbandonMessage(message, - transportSettings.TransportTransactionMode, + TransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } @@ -311,7 +313,7 @@ await processMessageEventArgs.SafeAbandonMessage(message, criticalErrorAction($"Failed to execute recoverability policy for message with native ID: `{message.MessageId}`", onErrorEx, messageProcessingCancellationToken); await processMessageEventArgs.SafeAbandonMessage(message, - transportSettings.TransportTransactionMode, + TransactionMode, cancellationToken: messageProcessingCancellationToken) .ConfigureAwait(false); } @@ -319,7 +321,7 @@ await processMessageEventArgs.SafeAbandonMessage(message, } AzureServiceBusTransportTransaction CreateTransaction(string incomingQueuePartitionKey) => - transportSettings.TransportTransactionMode == TransportTransactionMode.SendsAtomicWithReceive + TransactionMode == TransportTransactionMode.SendsAtomicWithReceive ? new AzureServiceBusTransportTransaction(serviceBusClient, incomingQueuePartitionKey, new TransactionOptions { From 3349309adf9c17c76ccd355fd8a974631f1d0f6d Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 12:03:35 +0200 Subject: [PATCH 08/12] Lambda expression --- src/Transport/Receiving/MessagePump.cs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/Transport/Receiving/MessagePump.cs b/src/Transport/Receiving/MessagePump.cs index 1b2afd82..f8608bfd 100644 --- a/src/Transport/Receiving/MessagePump.cs +++ b/src/Transport/Receiving/MessagePump.cs @@ -97,14 +97,9 @@ public async Task StartReceive(CancellationToken cancellationToken = default) criticalErrorAction("Failed to receive message from Azure Service Bus.", ex, messageProcessingCancellationTokenSource.Token); }, () => - { //We don't have to update the prefetch count since we are failing to receive anyway - processor.UpdateConcurrency(1); - }, - () => - { - processor.UpdateConcurrency(limitations.MaxConcurrency); - }); + processor.UpdateConcurrency(1), + () => processor.UpdateConcurrency(limitations.MaxConcurrency)); await processor.StartProcessingAsync(cancellationToken) .ConfigureAwait(false); From 75c5cc97b6df724daca36bf207d9df30dc5d1bbb Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 12:06:04 +0200 Subject: [PATCH 09/12] BitFaster.Caching should be a range --- src/Transport/NServiceBus.Transport.AzureServiceBus.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj index d49a88c4..e638c089 100644 --- a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj +++ b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj @@ -9,7 +9,7 @@ - + From 8c90235d2e1b4b75d0754b7dd2b7f672a6775104 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 12:07:36 +0200 Subject: [PATCH 10/12] Add Bitfaster caching as dependency --- .../NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj | 1 + src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj | 1 + .../NServiceBus.Transport.AzureServiceBus.TransportTests.csproj | 1 + 3 files changed, 3 insertions(+) diff --git a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj index 1ffc376b..15502bea 100644 --- a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj +++ b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj @@ -13,6 +13,7 @@ + diff --git a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj index f18ba68b..88a44277 100644 --- a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj +++ b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj @@ -12,6 +12,7 @@ + diff --git a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj index 0abf8e3b..fb4fde5b 100644 --- a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj +++ b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj @@ -10,6 +10,7 @@ + From 0ba299eced20922fd69ccf7554691f9cb7edf086 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Fri, 13 Sep 2024 12:10:58 +0200 Subject: [PATCH 11/12] Properly group dependencies --- ...s.Transport.AzureServiceBus.AcceptanceTests.csproj | 5 ++++- ...NServiceBus.Transport.AzureServiceBus.Tests.csproj | 11 +++++++---- ...us.Transport.AzureServiceBus.TransportTests.csproj | 5 ++++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj index 15502bea..9c318d13 100644 --- a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj +++ b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj @@ -14,9 +14,12 @@ + + + + - diff --git a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj index 88a44277..179df10f 100644 --- a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj +++ b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj @@ -13,15 +13,18 @@ - - + + + + + + + - - \ No newline at end of file diff --git a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj index fb4fde5b..a615b1a3 100644 --- a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj +++ b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj @@ -11,9 +11,12 @@ + + + + - From d4621e416c17d70870d884f83bd9fbe91de45ae1 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 17 Sep 2024 12:25:25 +0200 Subject: [PATCH 12/12] Bump bitfaster caching --- ...NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj | 2 +- src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj | 2 +- src/Transport/NServiceBus.Transport.AzureServiceBus.csproj | 2 +- .../NServiceBus.Transport.AzureServiceBus.TransportTests.csproj | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj index 9c318d13..585cd307 100644 --- a/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj +++ b/src/AcceptanceTests/NServiceBus.Transport.AzureServiceBus.AcceptanceTests.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj index 179df10f..d9042a79 100644 --- a/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj +++ b/src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj index e638c089..2a358abe 100644 --- a/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj +++ b/src/Transport/NServiceBus.Transport.AzureServiceBus.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj index a615b1a3..705d1107 100644 --- a/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj +++ b/src/TransportTests/NServiceBus.Transport.AzureServiceBus.TransportTests.csproj @@ -10,7 +10,7 @@ - +