Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/Tests/FakeSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,26 @@ public IReadOnlyCollection<ServiceBusMessage> this[ServiceBusMessageBatch batch]
set => throw new NotSupportedException();
}

public override async ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(CancellationToken cancellationToken = default)
public override ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(CancellationToken cancellationToken = default)
{
var batchMessageStore = new List<ServiceBusMessage>();
ServiceBusMessageBatch serviceBusMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(256 * 1024, batchMessageStore, tryAddCallback: TryAdd);
batchToBackingStore.Add(serviceBusMessageBatch, batchMessageStore);
await Task.Yield();
return serviceBusMessageBatch;
return new ValueTask<ServiceBusMessageBatch>(serviceBusMessageBatch);
}

public override async Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default)
public override Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
sentMessages.Add(message);
await Task.Yield();
return Task.CompletedTask;
}

public override async Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default)
public override Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
batchedMessages.Add(messageBatch);
await Task.Yield();
return Task.CompletedTask;
}
}
}
38 changes: 27 additions & 11 deletions src/Transport/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
Dictionary<string, List<IOutgoingTransportOperation>>? defaultOperationsPerDestination = null;
var numberOfDefaultOperations = 0;
var numberOfIsolatedOperations = 0;
var numberOfDefaultOperationDestinations = 0;

foreach (var operation in transportOperations)
{
Expand All @@ -59,8 +58,6 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
if (!defaultOperationsPerDestination.ContainsKey(destination))
{
defaultOperationsPerDestination[destination] = [operation];
// because we batch only the number of destinations are relevant
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of this here also reduces the cognitive overhead because this code doesn't have knowledge anymore about the inner workings of the batching path.

numberOfDefaultOperationDestinations++;
}
else
{
Expand Down Expand Up @@ -91,14 +88,15 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
throw new Exception($"The number of outgoing messages ({numberOfDefaultOperations}) exceeds the limits permitted by Azure Service Bus ({MaxMessageThresholdForTransaction}) in a single transaction");
}

var concurrentDispatchTasks =
new List<Task>(numberOfIsolatedOperations + numberOfDefaultOperationDestinations);
AddIsolatedOperationsTo(concurrentDispatchTasks, isolatedOperationsPerDestination ?? emptyDestinationAndOperations, transaction, azureServiceBusTransaction, cancellationToken);
AddBatchedOperationsTo(concurrentDispatchTasks, defaultOperationsPerDestination ?? emptyDestinationAndOperations, transaction, azureServiceBusTransaction, cancellationToken);
Task[] dispatchTasks =
[
DispatchIsolatedOperations(isolatedOperationsPerDestination ?? emptyDestinationAndOperations, numberOfIsolatedOperations, transaction, azureServiceBusTransaction, cancellationToken),
DispatchBatchedOperations(defaultOperationsPerDestination ?? emptyDestinationAndOperations, numberOfDefaultOperations, transaction, azureServiceBusTransaction, cancellationToken)
];

try
{
await Task.WhenAll(concurrentDispatchTasks).ConfigureAwait(false);
await Task.WhenAll(dispatchTasks).ConfigureAwait(false);
}
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
{
Expand All @@ -109,11 +107,18 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa

// The parameters of this method are deliberately mutable and of the original collection type to make sure
// no boxing occurs
void AddBatchedOperationsTo(List<Task> dispatchTasks,
Task DispatchBatchedOperations(
Dictionary<string, List<IOutgoingTransportOperation>> transportOperationsPerDestination,
int numberOfTransportOperations,
TransportTransaction transportTransaction,
AzureServiceBusTransportTransaction? azureServiceBusTransportTransaction, CancellationToken cancellationToken)
{
if (numberOfTransportOperations == 0)
{
return Task.CompletedTask;
}

var dispatchTasks = new List<Task>(transportOperationsPerDestination.Count);
foreach (var destinationAndOperations in transportOperationsPerDestination)
{
var destination = destinationAndOperations.Key;
Expand All @@ -127,9 +132,12 @@ void AddBatchedOperationsTo(List<Task> dispatchTasks,
messagesToSend.Enqueue(message);
}
// Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet
// doing the access as late as possible but still on the synchronous path.
// doing the access as late as possible but still on the synchronous path. Initializing the transaction
// as late as possible is important because it will start the transaction timer. If the transaction
// is started too early it might shorten the overall transaction time available.
dispatchTasks.Add(DispatchBatchOrFallbackToIndividualSendsForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, cancellationToken));
}
return Task.WhenAll(dispatchTasks);
}

async Task DispatchBatchOrFallbackToIndividualSendsForDestination(string destination, ServiceBusClient? client, Transaction? transaction,
Expand Down Expand Up @@ -230,16 +238,23 @@ await Task.WhenAll(individualSendTasks)

// The parameters of this method are deliberately mutable and of the original collection type to make sure
// no boxing occurs
void AddIsolatedOperationsTo(List<Task> dispatchTasks,
Task DispatchIsolatedOperations(
Dictionary<string, List<IOutgoingTransportOperation>> transportOperationsPerDestination,
int numberOfTransportOperations,
TransportTransaction transportTransaction,
AzureServiceBusTransportTransaction? azureServiceBusTransportTransaction,
CancellationToken cancellationToken)
{
if (numberOfTransportOperations == 0)
{
return Task.CompletedTask;
}

// It is OK to use the pumps client and partition key (keeps things compliant as before) but
// isolated dispatches should never use the committable transaction regardless whether it is present
// or not.
Transaction? noTransaction = default;
var dispatchTasks = new List<Task>(numberOfTransportOperations);
foreach (var destinationAndOperations in transportOperationsPerDestination)
{
var destination = destinationAndOperations.Key;
Expand All @@ -252,6 +267,7 @@ void AddIsolatedOperationsTo(List<Task> dispatchTasks,
dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken));
}
}
return Task.WhenAll(dispatchTasks);
}

async Task DispatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken)
Expand Down