Skip to content

Commit 072d74f

Browse files
committed
Fix batcher logic to never create empty batches (#2793)
* Reproduce the batcher bug * Verify there is something to batch * Fix batcher logic --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent 0017275 commit 072d74f

File tree

2 files changed

+77
-30
lines changed

2 files changed

+77
-30
lines changed

src/NServiceBus.Transport.SQS.Tests/SqsPreparedMessageBatcherTests.cs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,44 @@ public void SingleBatchForLessOrEqual10Entries()
9090
});
9191
}
9292

93+
[Test]
94+
public void SingleBatchEvenWhenSingleMessageNotFittingIntoBatchDueToMessageSize()
95+
{
96+
var preparedMessages = new[]
97+
{
98+
new SqsPreparedMessage {MessageId = Guid.NewGuid().ToString(), Destination = "destination1", QueueUrl = "https://destination1", Body = GenerateBody(256)},
99+
};
100+
PrecalculateSize(preparedMessages);
101+
102+
var batches = SqsPreparedMessageBatcher.Batch(preparedMessages);
103+
104+
Assert.Multiple(() =>
105+
{
106+
Assert.That(batches, Has.Count.EqualTo(1));
107+
Assert.That(batches.ElementAt(0).BatchRequest.Entries, Has.Count.EqualTo(1));
108+
});
109+
}
110+
111+
[Test]
112+
public void BatchPerMessageNotFittingIntoBatchDueToMessageSize()
113+
{
114+
var preparedMessages = new[]
115+
{
116+
new SqsPreparedMessage {MessageId = Guid.NewGuid().ToString(), Destination = "destination1", QueueUrl = "https://destination1", Body = GenerateBody(256)},
117+
new SqsPreparedMessage {MessageId = Guid.NewGuid().ToString(), Destination = "destination1", QueueUrl = "https://destination1", Body = GenerateBody(256)},
118+
};
119+
PrecalculateSize(preparedMessages);
120+
121+
var batches = SqsPreparedMessageBatcher.Batch(preparedMessages);
122+
123+
Assert.Multiple(() =>
124+
{
125+
Assert.That(batches, Has.Count.EqualTo(2));
126+
Assert.That(batches.ElementAt(0).BatchRequest.Entries, Has.Count.EqualTo(1));
127+
Assert.That(batches.ElementAt(1).BatchRequest.Entries, Has.Count.EqualTo(1));
128+
});
129+
}
130+
93131
[Test]
94132
public void MultipleBatchesForGreaterThan10Entries()
95133
{
@@ -116,7 +154,7 @@ public void MultipleBatchesForGreaterThan10Entries()
116154

117155
Assert.Multiple(() =>
118156
{
119-
Assert.That(batches.Count(), Is.EqualTo(2));
157+
Assert.That(batches, Has.Count.EqualTo(2));
120158
Assert.That(batches.ElementAt(0).BatchRequest.Entries, Has.Count.EqualTo(10));
121159
Assert.That(batches.ElementAt(1).BatchRequest.Entries, Has.Count.EqualTo(3));
122160
});
@@ -182,7 +220,7 @@ public void MultipleBatchesForMessagesNotFittingIntoBatchDueToMessageSize()
182220

183221
Assert.Multiple(() =>
184222
{
185-
Assert.That(batches.Count(), Is.EqualTo(7));
223+
Assert.That(batches, Has.Count.EqualTo(7));
186224
Assert.That(batches.ElementAt(0).BatchRequest.Entries, Has.Count.EqualTo(1));
187225
Assert.That(batches.ElementAt(1).BatchRequest.Entries, Has.Count.EqualTo(1));
188226
Assert.That(batches.ElementAt(2).BatchRequest.Entries, Has.Count.EqualTo(4));
@@ -253,7 +291,7 @@ public void MultipleBatchesForMessagesWithMessageIdNotFittingIntoBatchDueToMessa
253291

254292
Assert.Multiple(() =>
255293
{
256-
Assert.That(batches.Count(), Is.EqualTo(7));
294+
Assert.That(batches, Has.Count.EqualTo(7));
257295
Assert.That(batches.ElementAt(0).BatchRequest.Entries, Has.Count.EqualTo(1));
258296
Assert.That(batches.ElementAt(1).BatchRequest.Entries, Has.Count.EqualTo(1));
259297
Assert.That(batches.ElementAt(2).BatchRequest.Entries, Has.Count.EqualTo(4));
@@ -304,7 +342,7 @@ public void BatchPerDestination_MultipleBatchesForGreaterThan10Entries()
304342

305343
Assert.Multiple(() =>
306344
{
307-
Assert.That(batches.Count(), Is.EqualTo(4));
345+
Assert.That(batches, Has.Count.EqualTo(4));
308346
Assert.That(batches.ElementAt(0).BatchRequest.Entries, Has.Count.EqualTo(10));
309347
Assert.That(batches.ElementAt(1).BatchRequest.Entries, Has.Count.EqualTo(3));
310348
Assert.That(batches.ElementAt(2).BatchRequest.Entries, Has.Count.EqualTo(10));

src/NServiceBus.Transport.SQS/SqsPreparedMessageBatcher.cs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,47 +11,56 @@ static class SqsPreparedMessageBatcher
1111
public static IReadOnlyList<SqsBatchEntry> Batch(IEnumerable<SqsPreparedMessage> preparedMessages)
1212
{
1313
var allBatches = new List<SqsBatchEntry>();
14-
var currentDestinationBatches = new Dictionary<string, SqsPreparedMessage>(TransportConstraints.MaximumItemsInBatch);
14+
var currentBatch = new Dictionary<string, SqsPreparedMessage>(TransportConstraints.MaximumItemsInBatch);
1515

16+
// Group messages by destination to ensure batches only contain messages for the same queue
1617
var groupByDestination = preparedMessages.GroupBy(m => m.QueueUrl, StringComparer.Ordinal);
17-
foreach (var group in groupByDestination)
18+
19+
foreach (var destinationGroup in groupByDestination)
1820
{
19-
SqsPreparedMessage? firstMessage = null;
20-
var payloadSize = 0L;
21-
foreach (var message in group)
21+
SqsPreparedMessage? referenceMessage = null;
22+
var currentBatchSize = 0L;
23+
24+
foreach (var message in destinationGroup)
2225
{
23-
firstMessage ??= message;
26+
referenceMessage ??= message;
27+
var messageSize = message.Size; // Size calculation is assumed to be done previously
2428

25-
// Assumes the size was already calculated by the dispatcher
26-
var size = message.Size;
27-
payloadSize += size;
29+
// Check if this message would push the batch over the size limit
30+
if (currentBatchSize + messageSize > TransportConstraints.MaximumMessageSize)
31+
{
32+
// Finalize current batch if it has any messages
33+
if (currentBatch.Count > 0)
34+
{
35+
allBatches.Add(referenceMessage.ToBatchRequest(currentBatch));
36+
currentBatch.Clear();
37+
}
2838

29-
if (payloadSize > TransportConstraints.MaximumMessageSize)
39+
currentBatchSize = messageSize;
40+
}
41+
else
3042
{
31-
allBatches.Add(message.ToBatchRequest(currentDestinationBatches));
32-
currentDestinationBatches.Clear();
33-
payloadSize = size;
43+
// Message will fit within the current batch
44+
currentBatchSize += messageSize;
3445
}
3546

36-
// we don't have to recheck payload size here because the support layer checks that a request can always fit 256 KB size limit
37-
// we can't take MessageId because batch request ID can only contain alphanumeric characters, hyphen and underscores, message id could be overloaded
38-
currentDestinationBatches.Add(Guid.NewGuid().ToString(), message);
47+
// Add message to the current batch with a unique batch ID
48+
currentBatch.Add(Guid.NewGuid().ToString(), message);
3949

40-
var currentCount = currentDestinationBatches.Count;
41-
if (currentCount != TransportConstraints.MaximumItemsInBatch)
50+
// Check if we've reached the maximum items per batch
51+
if (currentBatch.Count == TransportConstraints.MaximumItemsInBatch)
4252
{
43-
continue;
53+
allBatches.Add(message.ToBatchRequest(currentBatch));
54+
currentBatch.Clear();
55+
currentBatchSize = 0;
4456
}
45-
46-
allBatches.Add(message.ToBatchRequest(currentDestinationBatches));
47-
currentDestinationBatches.Clear();
48-
payloadSize = 0;
4957
}
5058

51-
if (currentDestinationBatches.Count > 0)
59+
// Finalize any remaining messages in the batch
60+
if (currentBatch.Count > 0)
5261
{
53-
allBatches.Add(firstMessage!.ToBatchRequest(currentDestinationBatches));
54-
currentDestinationBatches.Clear();
62+
allBatches.Add(referenceMessage!.ToBatchRequest(currentBatch));
63+
currentBatch.Clear();
5564
}
5665
}
5766

0 commit comments

Comments
 (0)