Skip to content

Commit dd0341c

Browse files
author
Travis Nickels
committed
Add LRU array and update exception handling
1 parent 9e9a8a1 commit dd0341c

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

src/Transport/NServiceBus.Transport.AzureServiceBus.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
</ItemGroup>
1414

1515
<ItemGroup>
16+
<PackageReference Include="BitFaster.Caching" Version="2.1.1" />
1617
<PackageReference Include="Fody" Version="6.8.1" PrivateAssets="All" />
1718
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
1819
<PackageReference Include="Particular.Packaging" Version="4.1.0" PrivateAssets="All" />

src/Transport/Receiving/MessagePump.cs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading.Tasks;
77
using System.Transactions;
88
using Azure.Messaging.ServiceBus;
9+
using BitFaster.Caching.Lru;
910
using Extensibility;
1011
using Logging;
1112

@@ -15,6 +16,7 @@ class MessagePump : IMessageReceiver
1516
readonly ReceiveSettings receiveSettings;
1617
readonly Action<string, Exception, CancellationToken> criticalErrorAction;
1718
readonly ServiceBusClient serviceBusClient;
19+
readonly FastConcurrentLru<string, bool> messagesToBeDeleted = new(1_000);
1820

1921
OnMessage onMessage;
2022
OnError onError;
@@ -120,6 +122,30 @@ int CalculatePrefetchCount()
120122
return prefetchCount;
121123
}
122124

125+
#pragma warning disable PS0018
126+
async Task DeleteMessage(ProcessMessageEventArgs processMessageEventArgs, ServiceBusReceivedMessage message)
127+
#pragma warning restore PS0018
128+
{
129+
try
130+
{
131+
using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey))
132+
{
133+
await processMessageEventArgs.SafeCompleteMessageAsync(message,
134+
transportSettings.TransportTransactionMode,
135+
azureServiceBusTransaction, CancellationToken.None)
136+
.ConfigureAwait(false);
137+
138+
azureServiceBusTransaction.Commit();
139+
}
140+
}
141+
catch (Exception ex)
142+
{
143+
Logger.Warn($"Failed to delete message with id '{message.GetMessageId()}'. This message will be returned to the queue", ex);
144+
145+
messagesToBeDeleted.AddOrUpdate(message.GetMessageId(), true);
146+
}
147+
}
148+
123149
#pragma warning disable PS0018
124150
async Task OnProcessMessage(ProcessMessageEventArgs arg)
125151
#pragma warning restore PS0018
@@ -135,6 +161,12 @@ async Task OnProcessMessage(ProcessMessageEventArgs arg)
135161
{
136162
messageId = message.GetMessageId();
137163

164+
if (messagesToBeDeleted.TryGet(messageId, out _))
165+
{
166+
await DeleteMessage(arg, message).ConfigureAwait(false);
167+
return;
168+
}
169+
138170
if (processor.ReceiveMode == ServiceBusReceiveMode.PeekLock && message.LockedUntil < DateTimeOffset.UtcNow)
139171
{
140172
Logger.Warn(
@@ -297,6 +329,18 @@ await processMessageEventArgs.SafeCompleteMessageAsync(message,
297329
{
298330
try
299331
{
332+
if (ex is ServiceBusException serviceBusException && serviceBusException.Reason == ServiceBusFailureReason.MessageLockLost)
333+
{
334+
if (transportSettings.TransportTransactionMode == TransportTransactionMode.ReceiveOnly)
335+
{
336+
Logger.Warn($"Message with id `{messageId}` has been returned to the queue. NServiceBus recoverability is being skipped. {serviceBusException.Message}");
337+
messagesToBeDeleted.AddOrUpdate(messageId, true);
338+
//Since the message lock was lost, we can't complete or abandon the message without throwing an error
339+
//Recoverability should be skipped
340+
return;
341+
}
342+
}
343+
300344
ErrorHandleResult result;
301345

302346
using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey))
@@ -326,7 +370,7 @@ await processMessageEventArgs.SafeAbandonMessageAsync(message,
326370
.ConfigureAwait(false);
327371
}
328372
}
329-
catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient || onErrorEx.Reason is ServiceBusFailureReason.MessageLockLost)
373+
catch (ServiceBusException onErrorEx) when (onErrorEx.IsTransient)
330374
{
331375
Logger.Debug("Failed to execute recoverability.", onErrorEx);
332376

0 commit comments

Comments
 (0)