Skip to content

Commit 6b093a7

Browse files
committed
Upgrade clients
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent f69de2c commit 6b093a7

File tree

3 files changed

+28
-19
lines changed

3 files changed

+28
-19
lines changed

src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,11 @@ public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int del
120120
// Remove self to cause only delay once (in case the same message gets repeated)
121121
pauseAtOffsets.Remove(message.Message.Counter);
122122

123-
consumerControl.Stop().ContinueWith(async (_) =>
123+
// Use fire-and-forget pattern to avoid blocking the event handler
124+
_ = Task.Run(async () =>
124125
{
125126
Logger.LogInformation("Consumer stopped at message {MessageCounter}, waiting {DelayTime} before restart", message.Message.Counter, DelayTimeSpan);
127+
await consumerControl.Stop();
126128
await Task.Delay(DelayTimeSpan);
127129
await consumerControl.Start();
128130

@@ -133,7 +135,7 @@ public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int del
133135
await Task.Delay(100);
134136
}
135137
Logger.LogInformation("Consumer restarted after {ElapsedTime}, IsStarted: {IsStarted}", timeout.Elapsed, consumerControl.IsStarted);
136-
}).Wait();
138+
});
137139
}
138140
};
139141

@@ -177,7 +179,7 @@ public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int del
177179
await consumedMessages.WaitUntilArriving(newMessagesTimeout: consumeTimeout, expectedCount: messages.Count);
178180

179181
stopwatch.Stop();
180-
Logger.LogInformation("Consumed {MessageCount} messages in {ConsumeTime} including simlulated delay {DelayTime}", consumedMessages.Count, stopwatch.Elapsed, DelayTimeSpan);
182+
Logger.LogInformation("Consumed {ActualCount}/{ExpectedCount} messages in {ConsumeTime} including simlulated delay {DelayTime}", consumedMessages.Count, messages.Count, stopwatch.Elapsed, DelayTimeSpan);
181183

182184
// assert
183185

src/Tests/SlimMessageBus.Host.Outbox.Sql.DbContext.Test/BaseOutboxIntegrationTest.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ protected async Task WaitForOutboxToBeDrained(int timeoutSeconds = 10)
2828
{
2929
var stopwatch = Stopwatch.StartNew();
3030
var lastPendingCount = int.MaxValue;
31+
var stuckCount = 0;
32+
const int maxStuckIterations = 5; // If count doesn't change after 5 checks, exit early
3133

3234
while (stopwatch.Elapsed.TotalSeconds < timeoutSeconds)
3335
{
@@ -58,8 +60,20 @@ protected async Task WaitForOutboxToBeDrained(int timeoutSeconds = 10)
5860
return;
5961
}
6062

61-
if (pendingCount != lastPendingCount)
63+
// Track if count is stuck (not changing)
64+
if (pendingCount == lastPendingCount)
6265
{
66+
stuckCount++;
67+
if (stuckCount >= maxStuckIterations)
68+
{
69+
Logger.LogWarning("Outbox pending count stuck at {PendingCount} for {StuckIterations} iterations after {Elapsed}, exiting early",
70+
pendingCount, stuckCount, stopwatch.Elapsed);
71+
return;
72+
}
73+
}
74+
else
75+
{
76+
stuckCount = 0;
6377
Logger.LogInformation("Outbox has {PendingCount} pending messages after {Elapsed}", pendingCount, stopwatch.Elapsed);
6478
lastPendingCount = pendingCount;
6579
}

src/Tests/SlimMessageBus.Host.Outbox.Sql.DbContext.Test/OutboxTests.cs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,10 @@ public async Task Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHa
177177

178178
await Task.WhenAll(sendTasks);
179179

180-
// Wait for outbox to be fully processed and drained
181-
await WaitForOutboxToBeDrained(timeoutSeconds: 20);
182-
183-
// Wait for messages to be processed through outbox:
184-
// - Max processing delay per message: 250ms
185-
// - Total max delay: 100 messages / 3 valid per batch × 250ms ≈ 8.3s
186-
// - Outbox poll interval: 250ms (reduced from 1s)
187-
// - Add buffer: 5s (reduced since outbox is drained)
188-
// Total: ~15s timeout
189-
await store.WaitUntilArriving(newMessagesTimeout: 15, expectedCount: validCommands.Count);
180+
// Wait for messages to arrive first (they should start arriving quickly)
181+
await store.WaitUntilArriving(newMessagesTimeout: 20);
182+
183+
Logger.LogInformation("Received {ActualCount}/{ExpectedCount} messages after initial wait", store.Count, validCommands.Count);
190184

191185
// assert
192186
using var scope = ServiceProvider!.CreateScope();
@@ -281,11 +275,10 @@ public async Task Given_PublishExternalEventInTransaction_When_ExceptionThrownDu
281275

282276
await Task.WhenAll(publishTasks);
283277

284-
// Wait for outbox to be fully processed and drained
285-
await WaitForOutboxToBeDrained(timeoutSeconds: 20);
286-
287-
// Wait for messages to be processed through outbox (same reasoning as other test)
288-
await store.WaitUntilArriving(newMessagesTimeout: 15, expectedCount: validEvents.Count);
278+
// Wait for messages to arrive (they should start arriving quickly)
279+
await store.WaitUntilArriving(newMessagesTimeout: 20);
280+
281+
Logger.LogInformation("Received {ActualCount}/{ExpectedCount} messages after initial wait", store.Count, validEvents.Count);
289282

290283
// assert
291284

0 commit comments

Comments
 (0)