Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ string GetHeaders(string s3Key = null, string messageId = null)

if (!string.IsNullOrEmpty(s3Key))
{
nsbHeaders.Add("S3BodyKey", "s3Key");
nsbHeaders.Add("S3BodyKey", s3Key);
}

if (!string.IsNullOrEmpty(messageId))
Expand Down
2 changes: 0 additions & 2 deletions src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,6 @@ await SetupInitializedPump(onMessage: (ctx, ct) =>

[Theory]
[TestCase(TransportHeaders.Headers, "{}")]
// [TestCase(TransportHeaders.MessageTypeFullName)] special case that is forwarded due to historic reason
[TestCase(TransportHeaders.S3BodyKey)]
[TestCase(TransportHeaders.DelaySeconds)]
[TestCase(TransportHeaders.TimeToBeReceived)]
public async Task Excluded_transport_headers_are_not_propagate_to_transport_message_headers(string headerKey, string headerValue = "custom-header-value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,38 @@ namespace NServiceBus.Transport.SQS.Extensions
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3.Model;
using Amazon.SQS.Model;

static class TransportMessageExtensions
{
public static void CopyMessageAttributes(this TransportMessage transportMessage, Dictionary<string, MessageAttributeValue>? receiveMessageAttributes)
{
foreach (var messageAttribute in receiveMessageAttributes ??
Enumerable.Empty<KeyValuePair<string, MessageAttributeValue>>())
{
// The message ID requires complicated special handling and in the majority of cases
// the NServiceBus message ID when present on the headers might take precedence
if (messageAttribute.Key == Headers.MessageId)
{
continue;
}

transportMessage.Headers[messageAttribute.Key] = messageAttribute.Value.StringValue;
}

// These headers are not needed in the transport message and would only blow up the message size.
_ = transportMessage.Headers.Remove(TransportHeaders.Headers);
_ = transportMessage.Headers.Remove(TransportHeaders.DelaySeconds);
_ = transportMessage.Headers.Remove(TransportHeaders.TimeToBeReceived);
}

public static async ValueTask<(ReadOnlyMemory<byte> MessageBody, byte[]? MessageBodyBuffer)> RetrieveBody(this TransportMessage transportMessage, string messageId, S3Settings s3Settings, ArrayPool<byte> arrayPool,
CancellationToken cancellationToken = default)
{
Expand Down
79 changes: 29 additions & 50 deletions src/NServiceBus.Transport.SQS/InputQueuePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ internal async Task ProcessMessage(Message receivedMessage, CancellationToken ca

try
{
transportMessage = ExtractTransportMessage(receivedMessage, messageId);
transportMessage = ExtractTransportMessage(receivedMessage, nativeMessageId);
messageId = transportMessage.Headers[Headers.MessageId];
(messageBody, messageBodyBuffer) = await transportMessage.RetrieveBody(messageId, s3Settings, arrayPool, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -354,42 +354,34 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
TransportMessage transportMessage;
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.Headers, out var headersAttribute))
{
var headers = JsonSerializer.Deserialize<Dictionary<string, string>>(headersAttribute.StringValue) ?? [];
transportMessage = new TransportMessage
{
Headers = JsonSerializer.Deserialize<Dictionary<string, string>>(headersAttribute.StringValue) ?? [],
Headers = headers,
Body = receivedMessage.Body
};
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey))
{
transportMessage.Headers[TransportHeaders.S3BodyKey] = s3BodyKey.StringValue;
transportMessage.S3BodyKey = s3BodyKey.StringValue;
}
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);

// It is possible that the transport message already had a message ID and that one
// takes precedence
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey);
}
else
{
// When the MessageTypeFullName attribute is available, we're assuming native integration
if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.MessageTypeFullName, out var enclosedMessageType))
{
var headers = new Dictionary<string, string>
{
{ Headers.MessageId, messageIdOverride },
{ Headers.EnclosedMessageTypes, enclosedMessageType.StringValue },
{
TransportHeaders.MessageTypeFullName, enclosedMessageType.StringValue
} // we're copying over the value of the native message attribute into the headers, converting this into a nsb message
};

if (receivedMessage.MessageAttributes.TryGetValue(TransportHeaders.S3BodyKey, out var s3BodyKey))
{
headers.Add(TransportHeaders.S3BodyKey, s3BodyKey.StringValue);
}

transportMessage = new TransportMessage
{
Headers = headers,
S3BodyKey = s3BodyKey?.StringValue,
Headers = [],
Body = receivedMessage.Body
};
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);

transportMessage.Headers[Headers.MessageId] = messageIdOverride;
transportMessage.Headers[Headers.EnclosedMessageTypes] = enclosedMessageType.StringValue;
transportMessage.S3BodyKey = transportMessage.Headers.GetValueOrDefault(TransportHeaders.S3BodyKey);
}
else
{
Expand All @@ -405,12 +397,17 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
transportMessage = new TransportMessage
{
Body = receivedMessage.Body,
Headers = new Dictionary<string, string>
{
// HINT: Message Id is a required field for InnerProcessMessage
[Headers.MessageId] = receivedMessage.MessageId,
}
Headers = []
};
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
// For native integration scenarios the native message id should be used
transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId;
}
else
{
// It is possible that the transport message already had a message ID and that one
// takes precedence
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
}
}
catch (Exception ex)
Expand All @@ -422,36 +419,18 @@ public static TransportMessage ExtractTransportMessage(Message receivedMessage,
transportMessage = new TransportMessage
{
Body = receivedMessage.Body,
Headers = new Dictionary<string, string>
{
// HINT: Message Id is a required field for InnerProcessMessage
[Headers.MessageId] = receivedMessage.MessageId,
}
Headers = []
};
transportMessage.CopyMessageAttributes(receivedMessage.MessageAttributes);
// For native integration scenarios the native message id should be used
transportMessage.Headers[Headers.MessageId] = receivedMessage.MessageId;
}
}
}
// HINT: Message Id is the only required header
transportMessage.Headers.TryAdd(Headers.MessageId, messageIdOverride);
AddCustomNativeHeadersToNServiceBusHeaders(receivedMessage, transportMessage);

return transportMessage;
}

static void AddCustomNativeHeadersToNServiceBusHeaders(Message receivedMessage, TransportMessage transportMessage)
{
foreach (var receivedMessageMessageAttribute in receivedMessage.MessageAttributes)
{
// The code doesn't allow overriding the message ID at this point because
// message id has its own complex set of rules handled earlier in the process
if (TransportHeaders.NativeMessageAttributesNotCopiedToNServiceBusHeaders.Contains(receivedMessageMessageAttribute.Key) || receivedMessageMessageAttribute.Key == Headers.MessageId)
{
continue;
}
transportMessage.Headers[receivedMessageMessageAttribute.Key] = receivedMessageMessageAttribute.Value?.StringValue ?? string.Empty;
}
}

static bool CouldBeNativeMessage(TransportMessage msg)
{
if (msg.Headers == null)
Expand Down
9 changes: 0 additions & 9 deletions src/NServiceBus.Transport.SQS/TransportHeaders.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
namespace NServiceBus.Transport.SQS
{
using System.Collections.Frozen;
using System.Collections.Generic;

static class TransportHeaders
{
const string Prefix = "NServiceBus.AmazonSQS.";
Expand All @@ -11,11 +8,5 @@ static class TransportHeaders
public const string Headers = Prefix + nameof(Headers);
public const string S3BodyKey = "S3BodyKey";
public const string MessageTypeFullName = "MessageTypeFullName";

// The following set represents the list of native message attributes that will
// not be copied to NServiceBus headers. When adding a new header to this class
// consider if that needs to be propagated to NServiceBus headers, if not, add it
// to this frozen set.
public static readonly FrozenSet<string> NativeMessageAttributesNotCopiedToNServiceBusHeaders = new HashSet<string>([TimeToBeReceived, DelaySeconds, Headers, S3BodyKey]).ToFrozenSet();
}
}