Skip to content

Commit 8d4fb8b

Browse files
Streamline the extracting logic to make the message ID handling explicit (#2787) (#2789)
* Streamline the extracting logic to make the message ID handling explicit * Fix header creation bug --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent c980d2b commit 8d4fb8b

File tree

5 files changed

+54
-62
lines changed

5 files changed

+54
-62
lines changed

src/NServiceBus.Transport.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ string GetHeaders(string s3Key = null, string messageId = null)
9595

9696
if (!string.IsNullOrEmpty(s3Key))
9797
{
98-
nsbHeaders.Add("S3BodyKey", "s3Key");
98+
nsbHeaders.Add("S3BodyKey", s3Key);
9999
}
100100

101101
if (!string.IsNullOrEmpty(messageId))

src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,6 @@ await SetupInitializedPump(onMessage: (ctx, ct) =>
429429

430430
[Theory]
431431
[TestCase(TransportHeaders.Headers, "{}")]
432-
// [TestCase(TransportHeaders.MessageTypeFullName)] special case that is forwarded due to historic reason
433-
[TestCase(TransportHeaders.S3BodyKey)]
434432
[TestCase(TransportHeaders.DelaySeconds)]
435433
[TestCase(TransportHeaders.TimeToBeReceived)]
436434
public async Task Excluded_transport_headers_are_not_propagate_to_transport_message_headers(string headerKey, string headerValue = "custom-header-value")

src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,38 @@ namespace NServiceBus.Transport.SQS.Extensions
33
{
44
using System;
55
using System.Buffers;
6+
using System.Collections.Generic;
67
using System.IO;
8+
using System.Linq;
79
using System.Text;
810
using System.Threading;
911
using System.Threading.Tasks;
1012
using Amazon.S3.Model;
13+
using Amazon.SQS.Model;
1114

1215
static class TransportMessageExtensions
1316
{
17+
public static void CopyMessageAttributes(this TransportMessage transportMessage, Dictionary<string, MessageAttributeValue>? receiveMessageAttributes)
18+
{
19+
foreach (var messageAttribute in receiveMessageAttributes ??
20+
Enumerable.Empty<KeyValuePair<string, MessageAttributeValue>>())
21+
{
22+
// The message ID requires complicated special handling and in the majority of cases
23+
// the NServiceBus message ID when present on the headers might take precedence
24+
if (messageAttribute.Key == Headers.MessageId)
25+
{
26+
continue;
27+
}
28+
29+
transportMessage.Headers[messageAttribute.Key] = messageAttribute.Value.StringValue;
30+
}
31+
32+
// These headers are not needed in the transport message and would only blow up the message size.
33+
_ = transportMessage.Headers.Remove(TransportHeaders.Headers);
34+
_ = transportMessage.Headers.Remove(TransportHeaders.DelaySeconds);
35+
_ = transportMessage.Headers.Remove(TransportHeaders.TimeToBeReceived);
36+
}
37+
1438
public static async ValueTask<(ReadOnlyMemory<byte> MessageBody, byte[]? MessageBodyBuffer)> RetrieveBody(this TransportMessage transportMessage, string messageId, S3Settings s3Settings, ArrayPool<byte> arrayPool,
1539
CancellationToken cancellationToken = default)
1640
{

src/NServiceBus.Transport.SQS/InputQueuePump.cs

Lines changed: 29 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ internal async Task ProcessMessage(Message receivedMessage, CancellationToken ca
298298

299299
try
300300
{
301-
transportMessage = ExtractTransportMessage(receivedMessage, messageId);
301+
transportMessage = ExtractTransportMessage(receivedMessage, nativeMessageId);
302302
messageId = transportMessage.Headers[Headers.MessageId];
303303
(messageBody, messageBodyBuffer) = await transportMessage.RetrieveBody(messageId, s3Settings, arrayPool, cancellationToken).ConfigureAwait(false);
304304
}
@@ -354,42 +354,34 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
354354
TransportMessage transportMessage;
355355
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.Headers, out var headersAttribute))
356356
{
357+
var headers = JsonSerializer.Deserialize<Dictionary<string, string>>(headersAttribute.StringValue) ?? [];
357358
transportMessage = new TransportMessage
358359
{
359-
Headers = JsonSerializer.Deserialize<Dictionary<string, string>>(headersAttribute.StringValue) ?? [],
360+
Headers = headers,
360361
Body = receivedMessage.Body
361362
};
362-
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey))
363-
{
364-
transportMessage.Headers[TransportHeaders.S3BodyKey] = s3BodyKey.StringValue;
365-
transportMessage.S3BodyKey = s3BodyKey.StringValue;
366-
}
363+
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
364+
365+
// It is possible that the transport message already had a message ID and that one
366+
// takes precedence
367+
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
368+
transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey);
367369
}
368370
else
369371
{
370372
// When the MessageTypeFullName attribute is available, we're assuming native integration
371373
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.MessageTypeFullName, out var enclosedMessageType))
372374
{
373-
var headers = new Dictionary<string, string>
374-
{
375-
{ Headers.MessageId, messageIdOverride },
376-
{ Headers.EnclosedMessageTypes, enclosedMessageType.StringValue },
377-
{
378-
TransportHeaders.MessageTypeFullName, enclosedMessageType.StringValue
379-
} // we're copying over the value of the native message attribute into the headers, converting this into a nsb message
380-
};
381-
382-
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey))
383-
{
384-
headers.Add(TransportHeaders.S3BodyKey, s3BodyKey.StringValue);
385-
}
386-
387375
transportMessage = new TransportMessage
388376
{
389-
Headers = headers,
390-
S3BodyKey = s3BodyKey?.StringValue,
377+
Headers = [],
391378
Body = receivedMessage.Body
392379
};
380+
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
381+
382+
transportMessage.Headers[Headers.MessageId] = messageIdOverride;
383+
transportMessage.Headers[Headers.EnclosedMessageTypes] = enclosedMessageType.StringValue;
384+
transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey);
393385
}
394386
else
395387
{
@@ -405,12 +397,17 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
405397
transportMessage = new TransportMessage
406398
{
407399
Body = receivedMessage.Body,
408-
Headers = new Dictionary<string, string>
409-
{
410-
// HINT: Message Id is a required field for InnerProcessMessage
411-
[Headers.MessageId] = receivedMessage.MessageId,
412-
}
400+
Headers = []
413401
};
402+
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
403+
// For native integration scenarios the native message id should be used
404+
transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId;
405+
}
406+
else
407+
{
408+
// It is possible that the transport message already had a message ID and that one
409+
// takes precedence
410+
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
414411
}
415412
}
416413
catch (Exception ex)
@@ -422,36 +419,18 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
422419
transportMessage = new TransportMessage
423420
{
424421
Body = receivedMessage.Body,
425-
Headers = new Dictionary<string, string>
426-
{
427-
// HINT: Message Id is a required field for InnerProcessMessage
428-
[Headers.MessageId] = receivedMessage.MessageId,
429-
}
422+
Headers = []
430423
};
424+
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
425+
// For native integration scenarios the native message id should be used
426+
transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId;
431427
}
432428
}
433429
}
434-
// HINT: Message Id is the only required header
435-
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
436-
AddCustomNativeHeadersToNServiceBusHeaders(receivedMessage, transportMessage);
437430

438431
return transportMessage;
439432
}
440433

441-
static void AddCustomNativeHeadersToNServiceBusHeaders(Message receivedMessage, TransportMessage transportMessage)
442-
{
443-
foreach (var receivedMessageMessageAttribute in receivedMessage.MessageAttributes)
444-
{
445-
// The code doesn't allow overriding the message ID at this point because
446-
// message id has its own complex set of rules handled earlier in the process
447-
if (TransportHeaders.NativeMessageAttributesNotCopiedToNServiceBusHeaders.Contains(receivedMessageMessageAttribute.Key) || receivedMessageMessageAttribute.Key == Headers.MessageId)
448-
{
449-
continue;
450-
}
451-
transportMessage.Headers[receivedMessageMessageAttribute.Key] = receivedMessageMessageAttribute.Value?.StringValue ?? string.Empty;
452-
}
453-
}
454-
455434
static bool CouldBeNativeMessage(TransportMessage msg)
456435
{
457436
if (msg.Headers == null)
Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
namespace NServiceBus.Transport.SQS
22
{
3-
using System.Collections.Frozen;
4-
using System.Collections.Generic;
5-
63
static class TransportHeaders
74
{
85
const string Prefix = "NServiceBus.AmazonSQS.";
@@ -11,11 +8,5 @@ static class TransportHeaders
118
public const string Headers = Prefix + nameof(Headers);
129
public const string S3BodyKey = "S3BodyKey";
1310
public const string MessageTypeFullName = "MessageTypeFullName";
14-
15-
// The following set represents the list of native message attributes that will
16-
// not be copied to NServiceBus headers. When adding a new header to this class
17-
// consider if that needs to be propagated to NServiceBus headers, if not, add it
18-
// to this frozen set.
19-
public static readonly FrozenSet<string> NativeMessageAttributesNotCopiedToNServiceBusHeaders = new HashSet<string>([TimeToBeReceived, DelaySeconds, Headers, S3BodyKey]).ToFrozenSet();
2011
}
2112
}

0 commit comments

Comments
 (0)