Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 93 additions & 19 deletions src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
Original file line number Diff line number Diff line change
@@ -1,79 +1,153 @@
namespace NServiceBus.Transport.AzureServiceBus
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Logging;

/// <summary>
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
/// changing other state when the circuit breaker is armed or disarmed.
/// </summary>
sealed class RepeatedFailuresOverTimeCircuitBreaker
{
/// <summary>
/// A circuit breaker that is armed on a failure and disarmed on success. After <see cref="timeToWaitBeforeTriggering"/> in the
/// armed state, the <see cref="triggerAction"/> will fire. The <see cref="armedAction"/> and <see cref="disarmedAction"/> allow
/// changing other state when the circuit breaker is armed or disarmed.
/// </summary>
/// <param name="name">A name that is output in log messages when the circuit breaker changes states.</param>
/// <param name="timeToWaitBeforeTriggering">The time to wait after the first failure before triggering.</param>
/// <param name="triggerAction">The action to take when the circuit breaker is triggered.</param>
/// <param name="armedAction">The action to execute on the first failure.
/// WARNING: This action is called from within a lock to serialize arming and disarming actions.</param>
/// <param name="disarmedAction">The action to execute when a success disarms the circuit breaker.
/// WARNING: This action is called from within a lock to serialize arming and disarming actions.</param>
/// <param name="timeToWaitWhenTriggered">How long to delay on each failure when in the Triggered state. Defaults to 10 seconds.</param>
/// <param name="timeToWaitWhenArmed">How long to delay on each failure when in the Armed state. Defaults to 1 second.</param>
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering,
Action<Exception> triggerAction,
Action armedAction,
Action disarmedAction)
Action? armedAction = null,
Action? disarmedAction = null,
TimeSpan? timeToWaitWhenTriggered = default,
TimeSpan? timeToWaitWhenArmed = default)
{
this.name = name;
this.triggerAction = triggerAction;
this.armedAction = armedAction;
this.disarmedAction = disarmedAction;
this.armedAction = armedAction ?? (static () => { });
this.disarmedAction = disarmedAction ?? (static () => { });
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10);
this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1);

timer = new Timer(CircuitBreakerTriggered);
}

/// <summary>
/// Log a success, disarming the circuit breaker if it was previously armed.
/// </summary>
public void Success()
{
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Armed);
// Check the status of the circuit breaker, exiting early outside the lock if already disarmed
if (Volatile.Read(ref circuitBreakerState) == Disarmed)
{
return;
}

// If the circuit breaker was Armed or triggered before, disarm it
if (previousState == Armed || Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Triggered) == Triggered)
lock (stateLock)
{
// Recheck state after obtaining the lock
if (circuitBreakerState == Disarmed)
{
return;
}

circuitBreakerState = Disarmed;
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
disarmedAction();
}
}

/// <summary>
/// Log a failure, arming the circuit breaker if it was previously disarmed.
/// </summary>
/// <param name="exception">The exception that caused the failure.</param>
/// <param name="cancellationToken">A cancellation token.</param>
public Task Failure(Exception exception, CancellationToken cancellationToken = default)
{
// Atomically store the exception that caused the circuit breaker to trip
_ = Interlocked.Exchange(ref lastException, exception);

// Atomically set state to Armed if it was previously Disarmed
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed);
// Check the status of the circuit breaker, exiting early outside the lock if already armed or triggered
var previousState = Volatile.Read(ref circuitBreakerState);
if (previousState != Disarmed)
{
return Delay();
}

if (previousState == Disarmed)
lock (stateLock)
{
// Recheck state after obtaining the lock
previousState = circuitBreakerState;
if (previousState != Disarmed)
{
return Delay();
}

circuitBreakerState = Armed;
armedAction();
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state due to {1}", name, exception);
}

// If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus
return Task.Delay(previousState == Triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1), cancellationToken);
return Delay();

Task Delay() => Task.Delay(previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed, cancellationToken);
}

public void Dispose() => timer?.Dispose();
/// <summary>
/// Disposes the resources associated with the circuit breaker.
/// </summary>
public void Dispose() => timer.Dispose();

void CircuitBreakerTriggered(object state)
void CircuitBreakerTriggered(object? state)
{
if (Interlocked.CompareExchange(ref circuitBreakerState, Triggered, Armed) != Armed)
var previousState = Volatile.Read(ref circuitBreakerState);
if (previousState == Disarmed)
{
return;
}

Logger.WarnFormat("The circuit breaker for {0} will now be triggered with exception {1}", name, lastException);
triggerAction(lastException);
lock (stateLock)
{
if (circuitBreakerState == Disarmed)
{
return;
}

circuitBreakerState = Triggered;
Logger.WarnFormat("The circuit breaker for {0} will now be triggered with exception {1}", name, lastException);
triggerAction(lastException!);

}
}

int circuitBreakerState = Disarmed;
Exception lastException;
Exception? lastException;

readonly string name;
readonly Timer timer;
readonly TimeSpan timeToWaitBeforeTriggering;
readonly Action<Exception> triggerAction;
readonly Action armedAction;
readonly Action disarmedAction;
readonly TimeSpan timeToWaitWhenTriggered;
readonly TimeSpan timeToWaitWhenArmed;
readonly object stateLock = new();

const int Disarmed = 0;
const int Armed = 1;
Expand Down