@@ -16,6 +16,7 @@ public class When_changing_concurrency : NServiceBusTransportTest
16
16
public async Task Should_complete_current_message ( TransportTransactionMode transactionMode )
17
17
{
18
18
var triggeredChangeConcurrency = CreateTaskCompletionSource ( ) ;
19
+ var sentMessageReceived = CreateTaskCompletionSource ( ) ;
19
20
Task concurrencyChanged = null ;
20
21
int invocationCounter = 0 ;
21
22
@@ -30,6 +31,7 @@ await StartPump(async (context, ct) =>
30
31
await task ;
31
32
} , ct ) ;
32
33
34
+ sentMessageReceived . SetResult ( ) ;
33
35
await triggeredChangeConcurrency . Task ;
34
36
35
37
} , ( _ , _ ) =>
@@ -40,8 +42,10 @@ await StartPump(async (context, ct) =>
40
42
transactionMode ) ;
41
43
42
44
await SendMessage ( InputQueueName ) ;
45
+ await sentMessageReceived . Task ;
43
46
await concurrencyChanged ;
44
47
await StopPump ( ) ;
48
+
45
49
Assert . AreEqual ( 1 , invocationCounter , "message should successfully complete on first processing attempt" ) ;
46
50
}
47
51
@@ -62,6 +66,7 @@ await StartPump((context, _) =>
62
66
if ( context . Headers . TryGetValue ( "FromOnError" , out var value ) && value == bool . TrueString )
63
67
{
64
68
sentMessageReceived . SetResult ( ) ;
69
+ return Task . CompletedTask ;
65
70
}
66
71
67
72
throw new Exception ( "triggering recoverability pipeline" ) ;
@@ -84,9 +89,9 @@ await SendMessage(InputQueueName,
84
89
transactionMode ) ;
85
90
86
91
await SendMessage ( InputQueueName ) ;
87
-
88
92
await sentMessageReceived . Task ;
89
93
await StopPump ( ) ;
94
+
90
95
Assert . AreEqual ( 2 , invocationCounter , "there should be exactly 2 messages (initial message and new message from onError pipeline)" ) ;
91
96
}
92
97
}
0 commit comments