Skip to content

Commit 9ddaa06

Browse files
Simplify dispatcher (#1052)
* Simplify the message dispatcher slightly to remove parts of the assumptions of things running synchronously to be able to share one common task list * Rolling back the yield since that is not needed anymore * Explicit array creation because I don't like params overloads * Rename methods --------- Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
1 parent 624f5dc commit 9ddaa06

File tree

2 files changed

+33
-18
lines changed

2 files changed

+33
-18
lines changed

src/Tests/FakeSender.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,26 @@ public IReadOnlyCollection<ServiceBusMessage> this[ServiceBusMessageBatch batch]
2727
set => throw new NotSupportedException();
2828
}
2929

30-
public override async ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(CancellationToken cancellationToken = default)
30+
public override ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(CancellationToken cancellationToken = default)
3131
{
3232
var batchMessageStore = new List<ServiceBusMessage>();
3333
ServiceBusMessageBatch serviceBusMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(256 * 1024, batchMessageStore, tryAddCallback: TryAdd);
3434
batchToBackingStore.Add(serviceBusMessageBatch, batchMessageStore);
35-
await Task.Yield();
36-
return serviceBusMessageBatch;
35+
return new ValueTask<ServiceBusMessageBatch>(serviceBusMessageBatch);
3736
}
3837

39-
public override async Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default)
38+
public override Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default)
4039
{
4140
cancellationToken.ThrowIfCancellationRequested();
4241
sentMessages.Add(message);
43-
await Task.Yield();
42+
return Task.CompletedTask;
4443
}
4544

46-
public override async Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default)
45+
public override Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default)
4746
{
4847
cancellationToken.ThrowIfCancellationRequested();
4948
batchedMessages.Add(messageBatch);
50-
await Task.Yield();
49+
return Task.CompletedTask;
5150
}
5251
}
5352
}

src/Transport/Sending/MessageDispatcher.cs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
4444
Dictionary<string, List<IOutgoingTransportOperation>>? defaultOperationsPerDestination = null;
4545
var numberOfDefaultOperations = 0;
4646
var numberOfIsolatedOperations = 0;
47-
var numberOfDefaultOperationDestinations = 0;
4847

4948
foreach (var operation in transportOperations)
5049
{
@@ -59,8 +58,6 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
5958
if (!defaultOperationsPerDestination.ContainsKey(destination))
6059
{
6160
defaultOperationsPerDestination[destination] = [operation];
62-
// because we batch only the number of destinations are relevant
63-
numberOfDefaultOperationDestinations++;
6461
}
6562
else
6663
{
@@ -91,14 +88,15 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
9188
throw new Exception($"The number of outgoing messages ({numberOfDefaultOperations}) exceeds the limits permitted by Azure Service Bus ({MaxMessageThresholdForTransaction}) in a single transaction");
9289
}
9390

94-
var concurrentDispatchTasks =
95-
new List<Task>(numberOfIsolatedOperations + numberOfDefaultOperationDestinations);
96-
AddIsolatedOperationsTo(concurrentDispatchTasks, isolatedOperationsPerDestination ?? emptyDestinationAndOperations, transaction, azureServiceBusTransaction, cancellationToken);
97-
AddBatchedOperationsTo(concurrentDispatchTasks, defaultOperationsPerDestination ?? emptyDestinationAndOperations, transaction, azureServiceBusTransaction, cancellationToken);
91+
Task[] dispatchTasks =
92+
[
93+
DispatchIsolatedOperations(isolatedOperationsPerDestination ?? emptyDestinationAndOperations, numberOfIsolatedOperations, transaction, azureServiceBusTransaction, cancellationToken),
94+
DispatchBatchedOperations(defaultOperationsPerDestination ?? emptyDestinationAndOperations, numberOfDefaultOperations, transaction, azureServiceBusTransaction, cancellationToken)
95+
];
9896

9997
try
10098
{
101-
await Task.WhenAll(concurrentDispatchTasks).ConfigureAwait(false);
99+
await Task.WhenAll(dispatchTasks).ConfigureAwait(false);
102100
}
103101
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
104102
{
@@ -109,11 +107,18 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
109107

110108
// The parameters of this method are deliberately mutable and of the original collection type to make sure
111109
// no boxing occurs
112-
void AddBatchedOperationsTo(List<Task> dispatchTasks,
110+
Task DispatchBatchedOperations(
113111
Dictionary<string, List<IOutgoingTransportOperation>> transportOperationsPerDestination,
112+
int numberOfTransportOperations,
114113
TransportTransaction transportTransaction,
115114
AzureServiceBusTransportTransaction? azureServiceBusTransportTransaction, CancellationToken cancellationToken)
116115
{
116+
if (numberOfTransportOperations == 0)
117+
{
118+
return Task.CompletedTask;
119+
}
120+
121+
var dispatchTasks = new List<Task>(transportOperationsPerDestination.Count);
117122
foreach (var destinationAndOperations in transportOperationsPerDestination)
118123
{
119124
var destination = destinationAndOperations.Key;
@@ -127,9 +132,12 @@ void AddBatchedOperationsTo(List<Task> dispatchTasks,
127132
messagesToSend.Enqueue(message);
128133
}
129134
// Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet
130-
// doing the access as late as possible but still on the synchronous path.
135+
// doing the access as late as possible but still on the synchronous path. Initializing the transaction
136+
// as late as possible is important because it will start the transaction timer. If the transaction
137+
// is started too early it might shorten the overall transaction time available.
131138
dispatchTasks.Add(DispatchBatchOrFallbackToIndividualSendsForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, cancellationToken));
132139
}
140+
return Task.WhenAll(dispatchTasks);
133141
}
134142

135143
async Task DispatchBatchOrFallbackToIndividualSendsForDestination(string destination, ServiceBusClient? client, Transaction? transaction,
@@ -230,16 +238,23 @@ await Task.WhenAll(individualSendTasks)
230238

231239
// The parameters of this method are deliberately mutable and of the original collection type to make sure
232240
// no boxing occurs
233-
void AddIsolatedOperationsTo(List<Task> dispatchTasks,
241+
Task DispatchIsolatedOperations(
234242
Dictionary<string, List<IOutgoingTransportOperation>> transportOperationsPerDestination,
243+
int numberOfTransportOperations,
235244
TransportTransaction transportTransaction,
236245
AzureServiceBusTransportTransaction? azureServiceBusTransportTransaction,
237246
CancellationToken cancellationToken)
238247
{
248+
if (numberOfTransportOperations == 0)
249+
{
250+
return Task.CompletedTask;
251+
}
252+
239253
// It is OK to use the pumps client and partition key (keeps things compliant as before) but
240254
// isolated dispatches should never use the committable transaction regardless whether it is present
241255
// or not.
242256
Transaction? noTransaction = default;
257+
var dispatchTasks = new List<Task>(numberOfTransportOperations);
243258
foreach (var destinationAndOperations in transportOperationsPerDestination)
244259
{
245260
var destination = destinationAndOperations.Key;
@@ -252,6 +267,7 @@ void AddIsolatedOperationsTo(List<Task> dispatchTasks,
252267
dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, noTransaction, message, cancellationToken));
253268
}
254269
}
270+
return Task.WhenAll(dispatchTasks);
255271
}
256272

257273
async Task DispatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)