Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Transport/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Task DispatchBatchedOperations(
var messagesToSend = new Queue<ServiceBusMessage>(operations.Count);
foreach (var operation in operations)
{
var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
var message = operation.ToAzureServiceBusMessage(azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
operation.ApplyCustomizationToOutgoingNativeMessage(message, transportTransaction, Log);
messagesToSend.Enqueue(message);
}
Expand Down Expand Up @@ -262,7 +262,7 @@ Task DispatchIsolatedOperations(

foreach (var operation in operations)
{
var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
var message = operation.ToAzureServiceBusMessage(azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
operation.ApplyCustomizationToOutgoingNativeMessage(message, transportTransaction, Log);
dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken));
}
Expand Down
55 changes: 40 additions & 15 deletions src/Transport/Sending/OutgoingMessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,24 @@
using System;
using System.Collections.Generic;
using Azure.Messaging.ServiceBus;
using Configuration;

static class OutgoingMessageExtensions
{
public static ServiceBusMessage ToAzureServiceBusMessage(this OutgoingMessage outgoingMessage, DispatchProperties dispatchProperties, string incomingQueuePartitionKey)
public static ServiceBusMessage ToAzureServiceBusMessage(this IOutgoingTransportOperation outgoingTransportOperation, string incomingQueuePartitionKey)
{
var message = new ServiceBusMessage(outgoingMessage.Body)
{
// Cannot re-use MessageId to be compatible with ASB transport that could have native de-dup enabled
MessageId = Guid.NewGuid().ToString()
};
var outgoingMessage = outgoingTransportOperation.Message;
var message = new ServiceBusMessage(outgoingMessage.Body);
var dispatchProperties = outgoingTransportOperation.Properties;

// The value needs to be "application/octect-stream" and not "application/octet-stream" for interop with ASB transport
message.ApplicationProperties[TransportMessageHeaders.TransportEncoding] = "application/octect-stream";
ApplyMessageId(message, outgoingTransportOperation);

message.TransactionPartitionKey = incomingQueuePartitionKey;

ApplyDeliveryConstraints(message, dispatchProperties);

ApplyCorrelationId(message, outgoingMessage.Headers);
ApplyCorrelationId(message, outgoingTransportOperation);

ApplyContentType(message, outgoingMessage.Headers);
ApplyContentType(message, outgoingTransportOperation);

SetReplyToAddress(message, outgoingMessage.Headers);

Expand All @@ -51,17 +47,31 @@ static void ApplyDeliveryConstraints(ServiceBusMessage message, DispatchProperti
}
}

static void ApplyCorrelationId(ServiceBusMessage message, Dictionary<string, string> headers)
static void ApplyCorrelationId(ServiceBusMessage message, IOutgoingTransportOperation outgoingTransportOperation)
{
if (headers.TryGetValue(Headers.CorrelationId, out var correlationId))
var properties = outgoingTransportOperation.Properties;
var headers = outgoingTransportOperation.Message.Headers;

if (properties.TryGetValue(Headers.CorrelationId, out var correlationId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I recall, the properties get serialized into the outbox. It might be worthwhile considering removing those temporary properties after reading them or add a comment why they need to be preserved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question. Technically, already today, it is possible to add a customization delegate that allows you to customize the entire native message created. Any reason this can't be used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes, it seems the customizer should also be removed from the properties dictionary because I doubt we can every serialize action delegates into outbox records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I didn't know that TransportTransaction was itself a ContextBag 🤯
  2. Looked at the type you linked but I haven't figured out how that could be used as so far all invoked types I see are internal

So, I need additional research to validate if this could be used for the ******* connector.

{
message.CorrelationId = correlationId;
}
else if (headers.TryGetValue(Headers.CorrelationId, out correlationId))
{
message.CorrelationId = correlationId;
}
}

static void ApplyContentType(ServiceBusMessage message, Dictionary<string, string> headers)
static void ApplyContentType(ServiceBusMessage message, IOutgoingTransportOperation outgoingTransportOperation)
{
if (headers.TryGetValue(Headers.ContentType, out var contentType))
var properties = outgoingTransportOperation.Properties;
var headers = outgoingTransportOperation.Message.Headers;

if (properties.TryGetValue(Headers.ContentType, out var contentType))
{
message.ContentType = contentType;
}
else if (headers.TryGetValue(Headers.ContentType, out contentType))
{
message.ContentType = contentType;
}
Expand All @@ -82,5 +92,20 @@ static void CopyHeaders(ServiceBusMessage outgoingMessage, Dictionary<string, st
outgoingMessage.ApplicationProperties[header.Key] = header.Value;
}
}

static void ApplyMessageId(ServiceBusMessage message, IOutgoingTransportOperation outgoingTransportOperation)
{
var properties = outgoingTransportOperation.Properties;

if (properties.TryGetValue(Headers.MessageId, out var messageId))
{
message.MessageId = messageId;
}
else
{
// Cannot re-use MessageId to be compatible with ASB transport that could have native de-dup enabled
message.MessageId = Guid.NewGuid().ToString();
}
}
}
}