Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 126 additions & 20 deletions src/Transport/Receiving/RepeatedFailuresOverTimeCircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,154 @@ public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBe
timer = new Timer(CircuitBreakerTriggered);
}

public void Success()
public void Success() => DisarmIfArmedOrTriggered();

public Task Failure(Exception exception, CancellationToken cancellationToken = default)
{
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Armed);
UpdateLastException(exception);

ArmIfDisarmed(exception);

return DelayIfTriggered(cancellationToken);
}

// If the circuit breaker was Armed or triggered before, disarm it
if (previousState == Armed || Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Triggered) == Triggered)
public void Dispose() => timer?.Dispose();

void CircuitBreakerTriggered(object state) => TriggerIfArmed();

void DisarmIfArmedOrTriggered()
{
if (TryDisarmIfArmed() || TryDisarmIfTriggered())
{
Disarm();
}

bool TryDisarmIfArmed()
{
return Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Armed) == Armed;
}

bool TryDisarmIfTriggered()
{
return Interlocked.CompareExchange(ref circuitBreakerState, Disarmed, Triggered) == Triggered;
}

void Disarm()
{
DisableTriggerTimer();
LogDisarmed();
OnDisarmed();
}

void LogDisarmed()
{
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
disarmedAction();
}

void OnDisarmed()
{
disarmedAction?.Invoke();
}

void DisableTriggerTimer()
{
_ = timer.Change(Timeout.Infinite, Timeout.Infinite);
}
}

public Task Failure(Exception exception, CancellationToken cancellationToken = default)
void UpdateLastException(Exception exception)
{
_ = Interlocked.Exchange(ref lastException, exception);
}

// Atomically set state to Armed if it was previously Disarmed
var previousState = Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed);
void ArmIfDisarmed(Exception exception)
{
if (TryArmIfDisarmed())
{
Arm();
}

if (previousState == Disarmed)
bool TryArmIfDisarmed()
{
return Interlocked.CompareExchange(ref circuitBreakerState, Armed, Disarmed) == Disarmed;
}

void Arm()
{
SetTimeToTrigger();

LogArmed(exception);

OnArmed();
}

void OnArmed() => armedAction?.Invoke();

void SetTimeToTrigger()
{
armedAction();
_ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state due to {1}", name, exception);
}

// If the circuit breaker has been triggered, wait for 10 seconds before proceeding to prevent flooding the logs and hammering the ServiceBus
return Task.Delay(previousState == Triggered ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(1), cancellationToken);
void LogArmed(Exception exception)
{
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state due to {1}", name, exception);
}
}

public void Dispose() => timer?.Dispose();
Task DelayIfTriggered(CancellationToken cancellationToken)
{
if (IsTriggered())
{
return DelayFor10Seconds();
}

void CircuitBreakerTriggered(object state)
return DelayFor1Second();

bool IsTriggered()
{
return Interlocked.CompareExchange(ref circuitBreakerState, Triggered, Triggered) == Triggered;
}

Task DelayFor10Seconds()
{
return Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
}

Task DelayFor1Second()
{
return Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}

void TriggerIfArmed()
{
if (Interlocked.CompareExchange(ref circuitBreakerState, Triggered, Armed) != Armed)
if (TryTriggerIfArmed())
{
Trigger();
}

void Trigger()
{
LogTriggered();

OnTriggered();
}

bool TryTriggerIfArmed()
{
return Interlocked.CompareExchange(ref circuitBreakerState, Triggered, Armed) == Armed;
}

void LogTriggered()
{
Logger.WarnFormat("The circuit breaker for {0} will now be triggered with exception {1}", name, lastException);
}

void OnTriggered()
{
return;
triggerAction?.Invoke(lastException);
}

Logger.WarnFormat("The circuit breaker for {0} will now be triggered with exception {1}", name, lastException);
triggerAction(lastException);
}

int circuitBreakerState = Disarmed;
Expand Down