From 5b7dd595412f9a6b06ffb101da0d871d0c76a7b4 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 7 Apr 2025 16:02:04 +0200 Subject: [PATCH 1/2] Streamline the extracting logic to make the message ID handling explicit --- .../InputQueuePumpTests.cs | 2 - .../Extensions/TransportMessageExtensions.cs | 24 ++++++ .../InputQueuePump.cs | 79 +++++++------------ .../TransportHeaders.cs | 9 --- 4 files changed, 53 insertions(+), 61 deletions(-) diff --git a/src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs b/src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs index 8272c02dd..1824b90ce 100644 --- a/src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs @@ -429,8 +429,6 @@ await SetupInitializedPump(onMessage: (ctx, ct) => [Theory] [TestCase(TransportHeaders.Headers, "{}")] - // [TestCase(TransportHeaders.MessageTypeFullName)] special case that is forwarded due to historic reason - [TestCase(TransportHeaders.S3BodyKey)] [TestCase(TransportHeaders.DelaySeconds)] [TestCase(TransportHeaders.TimeToBeReceived)] public async Task Excluded_transport_headers_are_not_propagate_to_transport_message_headers(string headerKey, string headerValue = "custom-header-value") diff --git a/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs b/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs index b2c917e9b..4e569b1fc 100644 --- a/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs +++ b/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs @@ -3,14 +3,38 @@ namespace NServiceBus.Transport.SQS.Extensions { using System; using System.Buffers; + using System.Collections.Generic; using System.IO; + using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Amazon.S3.Model; + using Amazon.SQS.Model; static class TransportMessageExtensions { + public static void CopyMessageAttributes(this TransportMessage transportMessage, Dictionary? receiveMessageAttributes) + { + foreach (var messageAttribute in receiveMessageAttributes ?? + Enumerable.Empty>()) + { + // The message ID requires complicated special handling and in the majority of cases + // the NServiceBus message ID when present on the headers might take precedence + if (messageAttribute.Key == Headers.MessageId) + { + continue; + } + + transportMessage.Headers[messageAttribute.Key] = messageAttribute.Value.StringValue; + } + + // These headers are not needed in the transport message and would only blow up the message size. + _ = transportMessage.Headers.Remove(TransportHeaders.Headers); + _ = transportMessage.Headers.Remove(TransportHeaders.DelaySeconds); + _ = transportMessage.Headers.Remove(TransportHeaders.TimeToBeReceived); + } + public static async ValueTask<(ReadOnlyMemory MessageBody, byte[]? MessageBodyBuffer)> RetrieveBody(this TransportMessage transportMessage, string messageId, S3Settings s3Settings, ArrayPool arrayPool, CancellationToken cancellationToken = default) { diff --git a/src/NServiceBus.Transport.SQS/InputQueuePump.cs b/src/NServiceBus.Transport.SQS/InputQueuePump.cs index 90362f233..4cdc8fd90 100644 --- a/src/NServiceBus.Transport.SQS/InputQueuePump.cs +++ b/src/NServiceBus.Transport.SQS/InputQueuePump.cs @@ -298,7 +298,7 @@ internal async Task ProcessMessage(Message receivedMessage, CancellationToken ca try { - transportMessage = ExtractTransportMessage(receivedMessage, messageId); + transportMessage = ExtractTransportMessage(receivedMessage, nativeMessageId); messageId = transportMessage.Headers[Headers.MessageId]; (messageBody, messageBodyBuffer) = await transportMessage.RetrieveBody(messageId, s3Settings, arrayPool, cancellationToken).ConfigureAwait(false); } @@ -354,42 +354,34 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage, TransportMessage transportMessage; if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.Headers, out var headersAttribute)) { + var headers = JsonSerializer.Deserialize>(headersAttribute.StringValue) ?? []; transportMessage = new TransportMessage { - Headers = JsonSerializer.Deserialize>(headersAttribute.StringValue) ?? [], + Headers = headers, Body = receivedMessage.Body }; - if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey)) - { - transportMessage.Headers[TransportHeaders.S3BodyKey] = s3BodyKey.StringValue; - transportMessage.S3BodyKey = s3BodyKey.StringValue; - } + transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes); + + // It is possible that the transport message already had a message ID and that one + // takes precedence + transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride); + transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey); } else { // When the MessageTypeFullName attribute is available, we're assuming native integration if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.MessageTypeFullName, out var enclosedMessageType)) { - var headers = new Dictionary - { - { Headers.MessageId, messageIdOverride }, - { Headers.EnclosedMessageTypes, enclosedMessageType.StringValue }, - { - TransportHeaders.MessageTypeFullName, enclosedMessageType.StringValue - } // we're copying over the value of the native message attribute into the headers, converting this into a nsb message - }; - - if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey)) - { - headers.Add(TransportHeaders.S3BodyKey, s3BodyKey.StringValue); - } - transportMessage = new TransportMessage { - Headers = headers, - S3BodyKey = s3BodyKey?.StringValue, + Headers = [], Body = receivedMessage.Body }; + transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes); + + transportMessage.Headers[Headers.MessageId] = messageIdOverride; + transportMessage.Headers[Headers.EnclosedMessageTypes] = enclosedMessageType.StringValue; + transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey); } else { @@ -405,12 +397,17 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage, transportMessage = new TransportMessage { Body = receivedMessage.Body, - Headers = new Dictionary - { - // HINT: Message Id is a required field for InnerProcessMessage - [Headers.MessageId] = receivedMessage.MessageId, - } + Headers = [] }; + transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes); + // For native integration scenarios the native message id should be used + transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId; + } + else + { + // It is possible that the transport message already had a message ID and that one + // takes precedence + transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride); } } catch (Exception ex) @@ -422,36 +419,18 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage, transportMessage = new TransportMessage { Body = receivedMessage.Body, - Headers = new Dictionary - { - // HINT: Message Id is a required field for InnerProcessMessage - [Headers.MessageId] = receivedMessage.MessageId, - } + Headers = [] }; + transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes); + // For native integration scenarios the native message id should be used + transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId; } } } - // HINT: Message Id is the only required header - transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride); - AddCustomNativeHeadersToNServiceBusHeaders(receivedMessage, transportMessage); return transportMessage; } - static void AddCustomNativeHeadersToNServiceBusHeaders(Message receivedMessage, TransportMessage transportMessage) - { - foreach (var receivedMessageMessageAttribute in receivedMessage.MessageAttributes) - { - // The code doesn't allow overriding the message ID at this point because - // message id has its own complex set of rules handled earlier in the process - if (TransportHeaders.NativeMessageAttributesNotCopiedToNServiceBusHeaders.Contains(receivedMessageMessageAttribute.Key) || receivedMessageMessageAttribute.Key == Headers.MessageId) - { - continue; - } - transportMessage.Headers[receivedMessageMessageAttribute.Key] = receivedMessageMessageAttribute.Value?.StringValue ?? string.Empty; - } - } - static bool CouldBeNativeMessage(TransportMessage msg) { if (msg.Headers == null) diff --git a/src/NServiceBus.Transport.SQS/TransportHeaders.cs b/src/NServiceBus.Transport.SQS/TransportHeaders.cs index 1bed11768..02227a97b 100644 --- a/src/NServiceBus.Transport.SQS/TransportHeaders.cs +++ b/src/NServiceBus.Transport.SQS/TransportHeaders.cs @@ -1,8 +1,5 @@ namespace NServiceBus.Transport.SQS { - using System.Collections.Frozen; - using System.Collections.Generic; - static class TransportHeaders { const string Prefix = "NServiceBus.AmazonSQS."; @@ -11,11 +8,5 @@ static class TransportHeaders public const string Headers = Prefix + nameof(Headers); public const string S3BodyKey = "S3BodyKey"; public const string MessageTypeFullName = "MessageTypeFullName"; - - // The following set represents the list of native message attributes that will - // not be copied to NServiceBus headers. When adding a new header to this class - // consider if that needs to be propagated to NServiceBus headers, if not, add it - // to this frozen set. - public static readonly FrozenSet NativeMessageAttributesNotCopiedToNServiceBusHeaders = new HashSet([TimeToBeReceived, DelaySeconds, Headers, S3BodyKey]).ToFrozenSet(); } } \ No newline at end of file From 1b3f434655f75c2155f8ee308b4ea9c37bf2cde8 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Tue, 8 Apr 2025 13:34:23 +0200 Subject: [PATCH 2/2] Fix header creation bug --- .../When_receiving_a_native_message_without_wrapper.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs index f07f56921..aa0529b5b 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs @@ -95,7 +95,7 @@ string GetHeaders(string s3Key = null, string messageId = null) if (!string.IsNullOrEmpty(s3Key)) { - nsbHeaders.Add("S3BodyKey", "s3Key"); + nsbHeaders.Add("S3BodyKey", s3Key); } if (!string.IsNullOrEmpty(messageId))