Skip to content

Commit b2ea0e2

Browse files
Backport of circuit breaker fixes involved in bug in later versions (#1062)
* Address races in the repeated failure circuit breaker by introducing explicit state transitions (#1057) * Use locks to serialize arm/disarm actions (#1066) * Use locks to serialize arm/disarm actions * Explicit state lock, Volatile read of state outside lock, actually reduce nesting where possible * Small cosmetics for better readability * Better logging * Verbose comment * Even better logging * Basic test coverage --------- Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com> * Adapt update to older C# version --------- Co-authored-by: Daniel Marbach <daniel.marbach@openplace.net> Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
1 parent 7984165 commit b2ea0e2

File tree

3 files changed

+386
-31
lines changed

3 files changed

+386
-31
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving
2+
{
3+
using System;
4+
using System.Diagnostics;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using NUnit.Framework;
9+
10+
// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
11+
// and we want keep the changes to a minimum for now to allow backporting to older versions.
12+
[TestFixture]
13+
public class RepeatedFailuresOverTimeCircuitBreakerTests
14+
{
15+
[Test]
16+
public async Task Should_disarm_on_success()
17+
{
18+
var armedActionCalled = false;
19+
var disarmedActionCalled = false;
20+
21+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
22+
"TestCircuitBreaker",
23+
TimeSpan.FromMilliseconds(100),
24+
ex => { },
25+
() => armedActionCalled = true,
26+
() => disarmedActionCalled = true,
27+
TimeSpan.Zero,
28+
TimeSpan.Zero
29+
);
30+
31+
await circuitBreaker.Failure(new Exception("Test Exception"));
32+
circuitBreaker.Success();
33+
34+
Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
35+
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
36+
}
37+
38+
[Test]
39+
public async Task Should_rethrow_exception_on_success()
40+
{
41+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
42+
"TestCircuitBreaker",
43+
TimeSpan.FromMilliseconds(100),
44+
x => { },
45+
() => { },
46+
() => throw new Exception("Exception from disarmed action"),
47+
timeToWaitWhenTriggered: TimeSpan.Zero,
48+
timeToWaitWhenArmed: TimeSpan.Zero
49+
);
50+
51+
await circuitBreaker.Failure(new Exception("Test Exception"));
52+
53+
var ex = Assert.Throws<Exception>(() => circuitBreaker.Success());
54+
Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
55+
}
56+
57+
[Test]
58+
public async Task Should_trigger_after_failure_timeout()
59+
{
60+
var triggerActionCalled = false;
61+
Exception lastTriggerException = null;
62+
63+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
64+
"TestCircuitBreaker",
65+
TimeSpan.Zero,
66+
ex => { triggerActionCalled = true; lastTriggerException = ex; },
67+
timeToWaitWhenTriggered: TimeSpan.Zero,
68+
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
69+
);
70+
71+
await circuitBreaker.Failure(new Exception("Test Exception"));
72+
73+
Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
74+
Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
75+
}
76+
77+
[Test]
78+
public void Should_rethrow_exception_on_failure()
79+
{
80+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
81+
"TestCircuitBreaker",
82+
TimeSpan.FromMilliseconds(100),
83+
x => { },
84+
() => throw new Exception("Exception from armed action"),
85+
() => { },
86+
timeToWaitWhenTriggered: TimeSpan.Zero,
87+
timeToWaitWhenArmed: TimeSpan.Zero
88+
);
89+
90+
var ex = Assert.ThrowsAsync<Exception>(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
91+
Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
92+
}
93+
94+
[Test]
95+
public async Task Should_delay_after_trigger_failure()
96+
{
97+
var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
98+
var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);
99+
100+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
101+
"TestCircuitBreaker",
102+
TimeSpan.Zero,
103+
_ => { },
104+
timeToWaitWhenTriggered: timeToWaitWhenTriggered,
105+
timeToWaitWhenArmed: timeToWaitWhenArmed
106+
);
107+
108+
var stopWatch = Stopwatch.StartNew();
109+
110+
await circuitBreaker.Failure(new Exception("Test Exception"));
111+
await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));
112+
113+
stopWatch.Stop();
114+
115+
Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
116+
}
117+
118+
[Test]
119+
public async Task Should_not_trigger_if_disarmed_before_timeout()
120+
{
121+
var triggerActionCalled = false;
122+
123+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
124+
"TestCircuitBreaker",
125+
TimeSpan.FromMilliseconds(100),
126+
ex => triggerActionCalled = true,
127+
timeToWaitWhenTriggered: TimeSpan.Zero,
128+
timeToWaitWhenArmed: TimeSpan.Zero
129+
);
130+
131+
await circuitBreaker.Failure(new Exception("Test Exception"));
132+
circuitBreaker.Success();
133+
134+
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
135+
}
136+
137+
[Test]
138+
public async Task Should_handle_concurrent_failure_and_success()
139+
{
140+
var armedActionCalled = false;
141+
var disarmedActionCalled = false;
142+
var triggerActionCalled = false;
143+
144+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
145+
"TestCircuitBreaker",
146+
TimeSpan.FromMilliseconds(100),
147+
ex => triggerActionCalled = true,
148+
() => armedActionCalled = true,
149+
() => disarmedActionCalled = true,
150+
TimeSpan.Zero,
151+
TimeSpan.Zero
152+
);
153+
154+
var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
155+
var successTask = Task.Run(() =>
156+
{
157+
Thread.Sleep(50); // Simulate some delay before success
158+
circuitBreaker.Success();
159+
});
160+
161+
await Task.WhenAll(failureTask, successTask);
162+
163+
Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
164+
Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
165+
Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
166+
}
167+
168+
[Test]
169+
public async Task Should_handle_high_concurrent_failure_and_success()
170+
{
171+
var armedActionCalled = 0;
172+
var disarmedActionCalled = 0;
173+
var triggerActionCalled = 0;
174+
175+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
176+
"TestCircuitBreaker",
177+
TimeSpan.FromSeconds(5),
178+
ex => Interlocked.Increment(ref triggerActionCalled),
179+
() => Interlocked.Increment(ref armedActionCalled),
180+
() => Interlocked.Increment(ref disarmedActionCalled),
181+
TimeSpan.Zero,
182+
TimeSpan.FromMilliseconds(25)
183+
);
184+
185+
var tasks = Enumerable.Range(0, 1000)
186+
.Select(
187+
i => i % 2 == 0 ?
188+
circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
189+
Task.Run(() =>
190+
{
191+
Thread.Sleep(25); // Simulate some delay before success
192+
circuitBreaker.Success();
193+
})
194+
).ToArray();
195+
196+
await Task.WhenAll(tasks);
197+
198+
Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
199+
Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
200+
Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
201+
}
202+
203+
[Test]
204+
public async Task Should_trigger_after_multiple_failures_and_timeout()
205+
{
206+
var triggerActionCalled = false;
207+
208+
var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
209+
"TestCircuitBreaker",
210+
TimeSpan.FromMilliseconds(50),
211+
ex => triggerActionCalled = true,
212+
timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
213+
timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
214+
);
215+
216+
await circuitBreaker.Failure(new Exception("Test Exception"));
217+
await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));
218+
219+
Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
220+
}
221+
}
222+
}

src/Transport/Receiving/MessagePump.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
6565
this.criticalError = criticalError;
6666
pushSettings = settings;
6767

68+
Action noOp = () => { };
69+
6870
circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker($"'{settings.InputQueue}'",
6971
timeToWaitBeforeTriggeringCircuitBreaker, ex =>
7072
{
7173
criticalError.Raise("Failed to receive message from Azure Service Bus.", ex);
72-
});
74+
}, noOp, noOp);
7375

7476
return Task.CompletedTask;
7577
}

0 commit comments

Comments
 (0)