Skip to content

Commit 9536b34

Browse files
awright18bording
andauthored
Return message to queue between retry attempts (#1913)
* Return message to queue between retry attempts * Tweaks --------- Co-authored-by: Brandon Ording <bording@gmail.com>
1 parent 464f8ee commit 9536b34

File tree

2 files changed

+196
-46
lines changed

2 files changed

+196
-46
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
namespace NServiceBus.Transport.SQS
2+
{
3+
using System.Collections.Generic;
4+
5+
// The data structure has fixed maximum size. When the data structure reaches its maximum size,
6+
// the least recently used (LRU) message processing failure is removed from the storage.
7+
class FailureInfoStorage
8+
{
9+
public FailureInfoStorage(int maxElements)
10+
{
11+
this.maxElements = maxElements;
12+
}
13+
14+
public void RecordFailureInfoForMessage(string messageId)
15+
{
16+
lock (lockObject)
17+
{
18+
if (failureInfoPerMessage.TryGetValue(messageId, out var node))
19+
{
20+
// We have seen this message before, just update the counter
21+
node.Attempts++;
22+
23+
// Maintain invariant: leastRecentlyUsedMessages.First contains the LRU item.
24+
leastRecentlyUsedMessages.Remove(node.LeastRecentlyUsedEntry);
25+
leastRecentlyUsedMessages.AddLast(node.LeastRecentlyUsedEntry);
26+
}
27+
else
28+
{
29+
if (failureInfoPerMessage.Count == maxElements)
30+
{
31+
// We have reached the maximum allowed capacity. Remove the LRU item.
32+
var leastRecentlyUsedEntry = leastRecentlyUsedMessages.First;
33+
failureInfoPerMessage.Remove(leastRecentlyUsedEntry.Value);
34+
leastRecentlyUsedMessages.RemoveFirst();
35+
}
36+
37+
var newNode = new FailureInfoNode(messageId, 1);
38+
39+
failureInfoPerMessage[messageId] = newNode;
40+
41+
// Maintain invariant: leastRecentlyUsedMessages.First contains the LRU item.
42+
leastRecentlyUsedMessages.AddLast(newNode.LeastRecentlyUsedEntry);
43+
}
44+
}
45+
}
46+
47+
public bool TryGetFailureInfoForMessage(string messageId, out int attempts)
48+
{
49+
lock (lockObject)
50+
{
51+
if (failureInfoPerMessage.TryGetValue(messageId, out var node))
52+
{
53+
attempts = node.Attempts;
54+
return true;
55+
}
56+
attempts = 0;
57+
return false;
58+
}
59+
}
60+
61+
readonly Dictionary<string, FailureInfoNode> failureInfoPerMessage = new Dictionary<string, FailureInfoNode>();
62+
readonly LinkedList<string> leastRecentlyUsedMessages = new LinkedList<string>();
63+
readonly object lockObject = new object();
64+
readonly int maxElements;
65+
66+
class FailureInfoNode
67+
{
68+
public FailureInfoNode(string messageId, int attempts)
69+
{
70+
Attempts = attempts;
71+
LeastRecentlyUsedEntry = new LinkedListNode<string>(messageId);
72+
}
73+
74+
public int Attempts { get; set; }
75+
76+
public LinkedListNode<string> LeastRecentlyUsedEntry { get; }
77+
}
78+
}
79+
}

src/NServiceBus.Transport.SQS/InputQueuePump.cs

Lines changed: 117 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,18 @@ async Task ConsumeMessages(CancellationToken token)
136136
return;
137137
}
138138

139-
_ = ProcessMessage(receivedMessage, maxConcurrencySemaphore, token);
139+
try
140+
{
141+
_ = ProcessMessage(receivedMessage, maxConcurrencySemaphore, token);
142+
}
143+
catch (Exception ex)
144+
{
145+
Logger.Debug("Returning message to queue...", ex);
146+
147+
await ReturnMessageToQueue(receivedMessage).ConfigureAwait(false);
148+
149+
throw;
150+
}
140151
}
141152
}
142153
catch (OperationCanceledException)
@@ -161,6 +172,12 @@ internal async Task ProcessMessage(Message receivedMessage, SemaphoreSlim proces
161172
string messageId = null;
162173
var isPoisonMessage = false;
163174

175+
if (messagesToBeDeletedStorage.TryGetFailureInfoForMessage(nativeMessageId, out _))
176+
{
177+
await DeleteMessage(receivedMessage, null).ConfigureAwait(false);
178+
return;
179+
}
180+
164181
try
165182
{
166183
if (receivedMessage.MessageAttributes.TryGetValue(Headers.MessageId, out var messageIdAttribute))
@@ -230,81 +247,126 @@ internal async Task ProcessMessage(Message receivedMessage, SemaphoreSlim proces
230247
return;
231248
}
232249

233-
if (!IsMessageExpired(receivedMessage, transportMessage.Headers, messageId, CorrectClockSkew.GetClockCorrectionForEndpoint(awsEndpointUrl)))
250+
if (IsMessageExpired(receivedMessage, transportMessage.Headers, messageId, CorrectClockSkew.GetClockCorrectionForEndpoint(awsEndpointUrl)))
234251
{
235-
// here we also want to use the native message id because the core demands it like that
236-
await ProcessMessageWithInMemoryRetries(transportMessage.Headers, nativeMessageId, messageBody, receivedMessage, token).ConfigureAwait(false);
252+
await DeleteMessage(receivedMessage, transportMessage.S3BodyKey).ConfigureAwait(false);
237253
}
254+
else
255+
{
256+
// here we also want to use the native message id because the core demands it like that
257+
var messageProcessed = await InnerProcessMessage(transportMessage.Headers, nativeMessageId, messageBody, receivedMessage, token).ConfigureAwait(false);
238258

239-
// Always delete the message from the queue.
240-
// If processing failed, the onError handler will have moved the message
241-
// to a retry queue.
242-
await DeleteMessage(receivedMessage, transportMessage.S3BodyKey).ConfigureAwait(false);
259+
if (messageProcessed)
260+
{
261+
await DeleteMessage(receivedMessage, transportMessage.S3BodyKey).ConfigureAwait(false);
262+
}
263+
}
243264
}
244265
finally
245266
{
246267
processingSemaphoreSlim.Release();
247268
}
248269
}
249270

250-
async Task ProcessMessageWithInMemoryRetries(Dictionary<string, string> headers, string nativeMessageId, byte[] body, Message nativeMessage, CancellationToken token)
271+
async Task<bool> InnerProcessMessage(Dictionary<string, string> headers, string nativeMessageId, byte[] body, Message nativeMessage, CancellationToken token)
251272
{
252-
var immediateProcessingAttempts = 0;
253-
var messageProcessedOk = false;
254-
var errorHandled = false;
255-
256-
while (!errorHandled && !messageProcessedOk)
273+
// set the native message on the context for advanced usage scenario's
274+
var context = new ContextBag();
275+
context.Set(nativeMessage);
276+
// We add it to the transport transaction to make it available in dispatching scenario's so we copy over message attributes when moving messages to the error/audit queue
277+
var transportTransaction = new TransportTransaction();
278+
transportTransaction.Set(nativeMessage);
279+
transportTransaction.Set("IncomingMessageId", headers[Headers.MessageId]);
280+
281+
using (var messageContextCancellationTokenSource = new CancellationTokenSource())
257282
{
258-
// set the native message on the context for advanced usage scenario's
259-
var context = new ContextBag();
260-
context.Set(nativeMessage);
261-
// We add it to the transport transaction to make it available in dispatching scenario's so we copy over message attributes when moving messages to the error/audit queue
262-
var transportTransaction = new TransportTransaction();
263-
transportTransaction.Set(nativeMessage);
264-
transportTransaction.Set("IncomingMessageId", headers[Headers.MessageId]);
265-
266283
try
267284
{
268-
using (var messageContextCancellationTokenSource = new CancellationTokenSource())
269-
{
270-
var messageContext = new MessageContext(
271-
nativeMessageId,
272-
new Dictionary<string, string>(headers),
273-
body,
274-
transportTransaction,
275-
messageContextCancellationTokenSource,
276-
context);
277285

278-
await onMessage(messageContext).ConfigureAwait(false);
286+
var messageContext = new MessageContext(
287+
nativeMessageId,
288+
new Dictionary<string, string>(headers),
289+
body,
290+
transportTransaction,
291+
messageContextCancellationTokenSource,
292+
context);
293+
294+
await onMessage(messageContext).ConfigureAwait(false);
279295

280-
messageProcessedOk = !messageContextCancellationTokenSource.IsCancellationRequested;
281-
}
282296
}
283-
catch (Exception ex)
284-
when (!(ex is OperationCanceledException && token.IsCancellationRequested))
297+
catch (Exception ex) when (!(ex is OperationCanceledException && token.IsCancellationRequested))
285298
{
286-
immediateProcessingAttempts++;
287-
var errorHandlerResult = ErrorHandleResult.RetryRequired;
299+
var deliveryAttempts = GetDeliveryAttempts(nativeMessageId);
288300

289301
try
290302
{
291-
errorHandlerResult = await onError(new ErrorContext(ex,
303+
var errorHandlerResult = await onError(new ErrorContext(ex,
292304
new Dictionary<string, string>(headers),
293305
nativeMessageId,
294306
body,
295307
transportTransaction,
296-
immediateProcessingAttempts)).ConfigureAwait(false);
308+
deliveryAttempts)).ConfigureAwait(false);
309+
310+
if (errorHandlerResult == ErrorHandleResult.RetryRequired)
311+
{
312+
await ReturnMessageToQueue(nativeMessage).ConfigureAwait(false);
313+
314+
return false;
315+
}
297316
}
298317
catch (Exception onErrorEx)
299318
{
300319
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", onErrorEx);
320+
321+
await ReturnMessageToQueue(nativeMessage).ConfigureAwait(false);
322+
323+
return false;
301324
}
325+
}
326+
327+
if (messageContextCancellationTokenSource.IsCancellationRequested)
328+
{
329+
await ReturnMessageToQueue(nativeMessage).ConfigureAwait(false);
302330

303-
errorHandled = errorHandlerResult == ErrorHandleResult.Handled;
331+
return false;
304332
}
333+
334+
return true;
335+
}
336+
}
337+
338+
async Task ReturnMessageToQueue(Message message)
339+
{
340+
try
341+
{
342+
await sqsClient.ChangeMessageVisibilityAsync(
343+
new ChangeMessageVisibilityRequest()
344+
{
345+
QueueUrl = inputQueueUrl,
346+
ReceiptHandle = message.ReceiptHandle,
347+
VisibilityTimeout = 0
348+
},
349+
CancellationToken.None).ConfigureAwait(false);
350+
}
351+
catch (ReceiptHandleIsInvalidException ex)
352+
{
353+
Logger.Warn($"Failed to return message with native ID '{message.MessageId}' to the queue because the visibility timeout has expired. The message has already been returned to the queue.", ex);
354+
}
355+
catch (Exception ex)
356+
{
357+
Logger.Warn($"Failed to return message with native ID '{message.MessageId}' to the queue. The message will return to the queue after the visibility timeout expires.", ex);
305358
}
306359
}
307360

361+
int GetDeliveryAttempts(string nativeMessageId)
362+
{
363+
deliveryAttemptsStorage.RecordFailureInfoForMessage(nativeMessageId);
364+
365+
deliveryAttemptsStorage.TryGetFailureInfoForMessage(nativeMessageId, out var attempts);
366+
367+
return attempts;
368+
}
369+
308370
static bool IsMessageExpired(Message receivedMessage, Dictionary<string, string> headers, string messageId, TimeSpan clockOffset)
309371
{
310372
if (!headers.TryGetValue(TimeToBeReceived, out var rawTtbr))
@@ -338,16 +400,23 @@ async Task DeleteMessage(Message message, string s3BodyKey)
338400
{
339401
// should not be cancelled
340402
await sqsClient.DeleteMessageAsync(inputQueueUrl, message.ReceiptHandle, CancellationToken.None).ConfigureAwait(false);
403+
404+
if (!string.IsNullOrEmpty(s3BodyKey))
405+
{
406+
Logger.Info($"Message body data with key '{s3BodyKey}' will be aged out by the S3 lifecycle policy when the TTL expires.");
407+
}
341408
}
342409
catch (ReceiptHandleIsInvalidException ex)
343410
{
344-
Logger.Info($"Message receipt handle {message.ReceiptHandle} no longer valid.", ex);
345-
return; // if another receiver fetches the data from S3
346-
}
411+
Logger.Error($"Failed to delete message with native ID '{message.MessageId}' because the handler execution time exceeded the visibility timeout. Increase the length of the timeout on the queue. The message was returned to the queue.", ex);
347412

348-
if (!string.IsNullOrEmpty(s3BodyKey))
413+
messagesToBeDeletedStorage.RecordFailureInfoForMessage(message.MessageId);
414+
}
415+
catch (Exception ex)
349416
{
350-
Logger.Info($"Message body data with key '{s3BodyKey}' will be aged out by the S3 lifecycle policy when the TTL expires.");
417+
Logger.Warn($"Failed to delete message with native ID '{message.MessageId}'. The message will be returned to the queue when the visibility timeout expires.", ex);
418+
419+
messagesToBeDeletedStorage.RecordFailureInfoForMessage(message.MessageId);
351420
}
352421
}
353422

@@ -405,6 +474,8 @@ await sqsClient.DeleteMessageAsync(new DeleteMessageRequest
405474
// If there is a message body in S3, simply leave it there
406475
}
407476

477+
readonly FailureInfoStorage deliveryAttemptsStorage = new FailureInfoStorage(1_000);
478+
readonly FailureInfoStorage messagesToBeDeletedStorage = new FailureInfoStorage(1_000);
408479
List<Task> pumpTasks;
409480
Func<ErrorContext, Task<ErrorHandleResult>> onError;
410481
Func<MessageContext, Task> onMessage;

0 commit comments

Comments
 (0)