Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,6 @@ await SetupInitializedPump(onMessage: (ctx, ct) =>

[Theory]
[TestCase(TransportHeaders.Headers, "{}")]
// [TestCase(TransportHeaders.MessageTypeFullName)] special case that is forwarded due to historic reason
[TestCase(TransportHeaders.S3BodyKey)]
[TestCase(TransportHeaders.DelaySeconds)]
[TestCase(TransportHeaders.TimeToBeReceived)]
public async Task Excluded_transport_headers_are_not_propagate_to_transport_message_headers(string headerKey, string headerValue = "custom-header-value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,38 @@ namespace NServiceBus.Transport.SQS.Extensions
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3.Model;
using Amazon.SQS.Model;

static class TransportMessageExtensions
{
public static void CopyMessageAttributes(this TransportMessage transportMessage, Dictionary<string, MessageAttributeValue>? receiveMessageAttributes)
{
foreach (var messageAttribute in receiveMessageAttributes ??
Enumerable.Empty<KeyValuePair<string, MessageAttributeValue>>())
{
// The message ID requires complicated special handling and in the majority of cases
// the NServiceBus message ID when present on the headers might take precedence
if (messageAttribute.Key == Headers.MessageId)
{
continue;
}

transportMessage.Headers[messageAttribute.Key] = messageAttribute.Value.StringValue;
}

// These headers are not needed in the transport message and would only blow up the message size.
_ = transportMessage.Headers.Remove(TransportHeaders.Headers);
_ = transportMessage.Headers.Remove(TransportHeaders.DelaySeconds);
_ = transportMessage.Headers.Remove(TransportHeaders.TimeToBeReceived);
}

public static async ValueTask<(ReadOnlyMemory<byte> MessageBody, byte[]? MessageBodyBuffer)> RetrieveBody(this TransportMessage transportMessage, string messageId, S3Settings s3Settings, ArrayPool<byte> arrayPool,
CancellationToken cancellationToken = default)
{
Expand Down
79 changes: 29 additions & 50 deletions src/NServiceBus.Transport.SQS/InputQueuePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ internal async Task ProcessMessage(Message receivedMessage, CancellationToken ca

try
{
transportMessage = ExtractTransportMessage(receivedMessage, messageId);
transportMessage = ExtractTransportMessage(receivedMessage, nativeMessageId);
messageId = transportMessage.Headers[Headers.MessageId];
(messageBody, messageBodyBuffer) = await transportMessage.RetrieveBody(messageId, s3Settings, arrayPool, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -354,42 +354,34 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
TransportMessage transportMessage;
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.Headers, out var headersAttribute))
{
var headers = JsonSerializer.Deserialize<Dictionary<string, string>>(headersAttribute.StringValue) ?? [];
transportMessage = new TransportMessage
{
Headers = JsonSerializer.Deserialize<Dictionary<string, string>>(headersAttribute.StringValue) ?? [],
Headers = headers,
Body = receivedMessage.Body
};
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey))
{
transportMessage.Headers[TransportHeaders.S3BodyKey] = s3BodyKey.StringValue;
transportMessage.S3BodyKey = s3BodyKey.StringValue;
}
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);

// It is possible that the transport message already had a message ID and that one
// takes precedence
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey);
}
else
{
// When the MessageTypeFullName attribute is available, we're assuming native integration
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.MessageTypeFullName, out var enclosedMessageType))
{
var headers = new Dictionary<string, string>
{
{ Headers.MessageId, messageIdOverride },
{ Headers.EnclosedMessageTypes, enclosedMessageType.StringValue },
{
TransportHeaders.MessageTypeFullName, enclosedMessageType.StringValue
} // we're copying over the value of the native message attribute into the headers, converting this into a nsb message
};

if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey))
{
headers.Add(TransportHeaders.S3BodyKey, s3BodyKey.StringValue);
}

transportMessage = new TransportMessage
{
Headers = headers,
S3BodyKey = s3BodyKey?.StringValue,
Headers = [],
Body = receivedMessage.Body
};
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);

transportMessage.Headers[Headers.MessageId] = messageIdOverride;
transportMessage.Headers[Headers.EnclosedMessageTypes] = enclosedMessageType.StringValue;
transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey);
}
else
{
Expand All @@ -405,12 +397,17 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
transportMessage = new TransportMessage
{
Body = receivedMessage.Body,
Headers = new Dictionary<string, string>
{
// HINT: Message Id is a required field for InnerProcessMessage
[Headers.MessageId] = receivedMessage.MessageId,
}
Headers = []
};
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
// For native integration scenarios the native message id should be used
transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId;
}
else
{
// It is possible that the transport message already had a message ID and that one
// takes precedence
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
}
}
catch (Exception ex)
Expand All @@ -422,36 +419,18 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
transportMessage = new TransportMessage
{
Body = receivedMessage.Body,
Headers = new Dictionary<string, string>
{
// HINT: Message Id is a required field for InnerProcessMessage
[Headers.MessageId] = receivedMessage.MessageId,
}
Headers = []
};
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
// For native integration scenarios the native message id should be used
transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId;
}
}
}
// HINT: Message Id is the only required header
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
AddCustomNativeHeadersToNServiceBusHeaders(receivedMessage, transportMessage);

return transportMessage;
}

static void AddCustomNativeHeadersToNServiceBusHeaders(Message receivedMessage, TransportMessage transportMessage)
{
foreach (var receivedMessageMessageAttribute in receivedMessage.MessageAttributes)
{
// The code doesn't allow overriding the message ID at this point because
// message id has its own complex set of rules handled earlier in the process
if (TransportHeaders.NativeMessageAttributesNotCopiedToNServiceBusHeaders.Contains(receivedMessageMessageAttribute.Key) || receivedMessageMessageAttribute.Key == Headers.MessageId)
{
continue;
}
transportMessage.Headers[receivedMessageMessageAttribute.Key] = receivedMessageMessageAttribute.Value?.StringValue ?? string.Empty;
}
}

static bool CouldBeNativeMessage(TransportMessage msg)
{
if (msg.Headers == null)
Expand Down
9 changes: 0 additions & 9 deletions src/NServiceBus.Transport.SQS/TransportHeaders.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
namespace NServiceBus.Transport.SQS
{
using System.Collections.Frozen;
using System.Collections.Generic;

static class TransportHeaders
{
const string Prefix = "NServiceBus.AmazonSQS.";
Expand All @@ -11,11 +8,5 @@ static class TransportHeaders
public const string Headers = Prefix + nameof(Headers);
public const string S3BodyKey = "S3BodyKey";
public const string MessageTypeFullName = "MessageTypeFullName";

// The following set represents the list of native message attributes that will
// not be copied to NServiceBus headers. When adding a new header to this class
// consider if that needs to be propagated to NServiceBus headers, if not, add it
// to this frozen set.
public static readonly FrozenSet<string> NativeMessageAttributesNotCopiedToNServiceBusHeaders = new HashSet<string>([TimeToBeReceived, DelaySeconds, Headers, S3BodyKey]).ToFrozenSet();
}
}
Loading