From 7662fb150549d685056fb09d9cb5668e81e35a32 Mon Sep 17 00:00:00 2001 From: Jason Blackburn Date: Tue, 25 Mar 2025 13:33:04 -0500 Subject: [PATCH] 413 - only process consume errors if decider does not return Directive.Resume --- .../Stages/Consumers/Actors/KafkaConsumerActor.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs index ec94cb9f..fc71c49d 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs @@ -976,10 +976,11 @@ private void ProcessExceptions(Exception? exception) return; var directive = _decider(exception); - ProcessError(exception); if (directive == Directive.Resume) return; + ProcessError(exception); + Timers.CancelAll(); if (directive == Directive.Stop && _log.IsErrorEnabled) _log.Error(exception, "Exception when polling from consumer, stopping actor: {0}", exception.Message);