Skip to content

Commit 8836449

Browse files
Update the processing capacity consistently together with the prefetch count to make sure during throttling the number of prefetched messages is kept aligned with the settings of the user (#1058)
Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
1 parent fcd0dab commit 8836449

File tree

1 file changed

+12
-13
lines changed

1 file changed

+12
-13
lines changed

src/Transport/Receiving/MessagePump.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,9 @@ public Task Initialize(
6767

6868
public async Task StartReceive(CancellationToken cancellationToken = default)
6969
{
70-
int prefetchCount = CalculatePrefetchCount();
71-
7270
var receiveOptions = new ServiceBusProcessorOptions
7371
{
74-
PrefetchCount = prefetchCount,
72+
PrefetchCount = CalculatePrefetchCount(limitations.MaxConcurrency),
7573
ReceiveMode = TransactionMode == TransportTransactionMode.None
7674
? ServiceBusReceiveMode.ReceiveAndDelete
7775
: ServiceBusReceiveMode.PeekLock,
@@ -96,20 +94,18 @@ public async Task StartReceive(CancellationToken cancellationToken = default)
9694
{
9795
criticalErrorAction("Failed to receive message from Azure Service Bus.", ex,
9896
messageProcessingCancellationTokenSource.Token);
99-
}, () =>
100-
//We don't have to update the prefetch count since we are failing to receive anyway
101-
processor.UpdateConcurrency(1),
102-
() => processor.UpdateConcurrency(limitations.MaxConcurrency));
97+
}, () => UpdateProcessingCapacity(1),
98+
() => UpdateProcessingCapacity(limitations.MaxConcurrency));
10399

104100
await processor.StartProcessingAsync(cancellationToken)
105101
.ConfigureAwait(false);
106102
}
107103

108104
TransportTransactionMode TransactionMode => transportSettings.TransportTransactionMode;
109105

110-
int CalculatePrefetchCount()
106+
int CalculatePrefetchCount(int maxConcurrency)
111107
{
112-
var prefetchCount = limitations.MaxConcurrency * transportSettings.PrefetchMultiplier;
108+
var prefetchCount = maxConcurrency * transportSettings.PrefetchMultiplier;
113109

114110
if (transportSettings.PrefetchCount.HasValue)
115111
{
@@ -190,14 +186,17 @@ public Task ChangeConcurrency(PushRuntimeSettings newLimitations, CancellationTo
190186
{
191187
limitations = newLimitations;
192188

193-
processor.UpdateConcurrency(limitations.MaxConcurrency);
194-
195-
int prefetchCount = CalculatePrefetchCount();
196-
processor.UpdatePrefetchCount(prefetchCount);
189+
UpdateProcessingCapacity(limitations.MaxConcurrency);
197190

198191
return Task.CompletedTask;
199192
}
200193

194+
void UpdateProcessingCapacity(int maxConcurrency)
195+
{
196+
processor.UpdateConcurrency(maxConcurrency);
197+
processor.UpdatePrefetchCount(CalculatePrefetchCount(maxConcurrency));
198+
}
199+
201200
public async Task StopReceive(CancellationToken cancellationToken = default)
202201
{
203202
// Wiring up the stop token to trigger the cancellation token that is being

0 commit comments

Comments
 (0)