From ebc9947611a71bd5745c71df6562d4782bd246ca Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 5 Jul 2024 14:27:33 +0200 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=20Allow=20overriding=20native=20`?= =?UTF-8?q?MessageId`=20via=20message=20property?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Sending/OutgoingMessageExtensions.cs | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/Transport/Sending/OutgoingMessageExtensions.cs b/src/Transport/Sending/OutgoingMessageExtensions.cs index e8b558fd..8c909084 100644 --- a/src/Transport/Sending/OutgoingMessageExtensions.cs +++ b/src/Transport/Sending/OutgoingMessageExtensions.cs @@ -9,14 +9,8 @@ static class OutgoingMessageExtensions { public static ServiceBusMessage ToAzureServiceBusMessage(this OutgoingMessage outgoingMessage, DispatchProperties dispatchProperties, string incomingQueuePartitionKey) { - var message = new ServiceBusMessage(outgoingMessage.Body) - { - // Cannot re-use MessageId to be compatible with ASB transport that could have native de-dup enabled - MessageId = Guid.NewGuid().ToString() - }; - // The value needs to be "application/octect-stream" and not "application/octet-stream" for interop with ASB transport - message.ApplicationProperties[TransportMessageHeaders.TransportEncoding] = "application/octect-stream"; + ApplyMessageId(message, outgoingTransportOperation); message.TransactionPartitionKey = incomingQueuePartitionKey; @@ -82,5 +76,20 @@ static void CopyHeaders(ServiceBusMessage outgoingMessage, Dictionary Date: Fri, 5 Jul 2024 14:33:26 +0200 Subject: [PATCH 2/3] =?UTF-8?q?=E2=9C=A8=20Allow=20setting=20native=20`Con?= =?UTF-8?q?tentType`=20and=20`CorrelationId`=20via=20operation=20propertie?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Transport/Sending/MessageDispatcher.cs | 4 +-- .../Sending/OutgoingMessageExtensions.cs | 31 ++++++++++++++----- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/Transport/Sending/MessageDispatcher.cs b/src/Transport/Sending/MessageDispatcher.cs index 31b9f207..00921270 100644 --- a/src/Transport/Sending/MessageDispatcher.cs +++ b/src/Transport/Sending/MessageDispatcher.cs @@ -127,7 +127,7 @@ Task DispatchBatchedOperations( var messagesToSend = new Queue(operations.Count); foreach (var operation in operations) { - var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); + var message = operation.ToAzureServiceBusMessage(azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); operation.ApplyCustomizationToOutgoingNativeMessage(message, transportTransaction, Log); messagesToSend.Enqueue(message); } @@ -262,7 +262,7 @@ Task DispatchIsolatedOperations( foreach (var operation in operations) { - var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); + var message = operation.ToAzureServiceBusMessage(azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); operation.ApplyCustomizationToOutgoingNativeMessage(message, transportTransaction, Log); dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken)); } diff --git a/src/Transport/Sending/OutgoingMessageExtensions.cs b/src/Transport/Sending/OutgoingMessageExtensions.cs index 8c909084..c9421291 100644 --- a/src/Transport/Sending/OutgoingMessageExtensions.cs +++ b/src/Transport/Sending/OutgoingMessageExtensions.cs @@ -7,8 +7,11 @@ static class OutgoingMessageExtensions { - public static ServiceBusMessage ToAzureServiceBusMessage(this OutgoingMessage outgoingMessage, DispatchProperties dispatchProperties, string incomingQueuePartitionKey) + public static ServiceBusMessage ToAzureServiceBusMessage(this IOutgoingTransportOperation outgoingTransportOperation, string incomingQueuePartitionKey) { + var outgoingMessage = outgoingTransportOperation.Message; + var message = new ServiceBusMessage(outgoingMessage.Body); + var dispatchProperties = outgoingTransportOperation.Properties; ApplyMessageId(message, outgoingTransportOperation); @@ -16,9 +19,9 @@ public static ServiceBusMessage ToAzureServiceBusMessage(this OutgoingMessage ou ApplyDeliveryConstraints(message, dispatchProperties); - ApplyCorrelationId(message, outgoingMessage.Headers); + ApplyCorrelationId(message, outgoingTransportOperation); - ApplyContentType(message, outgoingMessage.Headers); + ApplyContentType(message, outgoingTransportOperation); SetReplyToAddress(message, outgoingMessage.Headers); @@ -45,17 +48,31 @@ static void ApplyDeliveryConstraints(ServiceBusMessage message, DispatchProperti } } - static void ApplyCorrelationId(ServiceBusMessage message, Dictionary headers) + static void ApplyCorrelationId(ServiceBusMessage message, IOutgoingTransportOperation outgoingTransportOperation) { - if (headers.TryGetValue(Headers.CorrelationId, out var correlationId)) + var properties = outgoingTransportOperation.Properties; + var headers = outgoingTransportOperation.Message.Headers; + + if (properties.TryGetValue(Headers.CorrelationId, out var correlationId)) + { + message.CorrelationId = correlationId; + } + else if (headers.TryGetValue(Headers.CorrelationId, out correlationId)) { message.CorrelationId = correlationId; } } - static void ApplyContentType(ServiceBusMessage message, Dictionary headers) + static void ApplyContentType(ServiceBusMessage message, IOutgoingTransportOperation outgoingTransportOperation) { - if (headers.TryGetValue(Headers.ContentType, out var contentType)) + var properties = outgoingTransportOperation.Properties; + var headers = outgoingTransportOperation.Message.Headers; + + if (properties.TryGetValue(Headers.ContentType, out var contentType)) + { + message.ContentType = contentType; + } + else if (headers.TryGetValue(Headers.ContentType, out contentType)) { message.ContentType = contentType; } From 41c735476427ef5356a542948c7f4499da7379a2 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 10 Oct 2024 08:58:49 +0200 Subject: [PATCH 3/3] IDE0005: Using directive is unnecessary. --- src/Transport/Sending/OutgoingMessageExtensions.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Transport/Sending/OutgoingMessageExtensions.cs b/src/Transport/Sending/OutgoingMessageExtensions.cs index c9421291..82ff842f 100644 --- a/src/Transport/Sending/OutgoingMessageExtensions.cs +++ b/src/Transport/Sending/OutgoingMessageExtensions.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using Azure.Messaging.ServiceBus; - using Configuration; static class OutgoingMessageExtensions {