-
Notifications
You must be signed in to change notification settings - Fork 1.7k
With a @KafkaListener that returns a Mono, the error handler does not retry #4198
Copy link
Copy link
Closed
Closed
Copy link
Milestone
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.7
Describe the bug
When creating an @KafkaListener that returns a Mono, the error handler does not retry when an exception occurs. I got this bug from stack overflow report.
To Reproduce
@Component
public class MyListener {
@KafkaListener(
clientIdPrefix = "myNewClient",
groupId = "aa",
topics = "monoTest"
)
public void listen(ConsumerRecord<?, ?> record) throws MyException {
System.out.println("get data: offset" + record.offset());
throw new MyException();
}
}@Configuration
public class Config {
private static final Logger logger = LoggerFactory.getLogger(Config.class);
@Bean
public CommonErrorHandler handler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1L, 20L));
errorHandler.addRetryableExceptions(MyException.class, TimeoutException.class, ListenerExecutionFailedException.class);
errorHandler.setRetryListeners((record, ex, attempt) ->
logger.info("[CUSTOM-RETRY] attempt={} topic={} partition={} offset={} ex={}",
attempt, record.topic(), record.partition(), record.offset(), ex.toString())
);
return errorHandler;
}
}Expected behavior
When an exception occurs, the message is retried. For non-Mono listeners, retries behave as shown below.
2025-12-04T16:07:54.241+09:00 INFO 9045 --- [| adminclient-2] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
get data: offset0
2025-12-04T16:07:54.246+09:00 INFO 9045 --- [ntainer#0-0-C-1] org.example.consumer.Config : [CUSTOM-RETRY] attempt=1 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:07:54.257+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:07:54.258+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
get data: offset0
2025-12-04T16:07:54.742+09:00 INFO 9045 --- [ntainer#0-0-C-1] org.example.consumer.Config : [CUSTOM-RETRY] attempt=2 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:07:54.754+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:07:54.755+09:00 INFO 9045 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
get data: offset0
But when the listener returns a Mono, the message is not retried as shown below.
get data: offset0
2025-12-04T16:12:40.109+09:00 INFO 10101 --- [ntainer#0-0-C-1] org.example.consumer.Config : [CUSTOM-RETRY] attempt=1 topic=monoTest partition=0 offset=0 ex=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public reactor.core.publisher.Mono<java.lang.Void> org.example.consumer.MyListener.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>) throws org.example.consumer.MyException' threw exception
2025-12-04T16:12:40.122+09:00 INFO 10101 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=myNewClient-0, groupId=aa] Seeking to offset 0 for partition monoTest-0
2025-12-04T16:12:40.123+09:00 INFO 10101 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Record in retry and not yet recovered
... then no retry
Sample
A link to a GitHub repository with a minimal, reproducible, sample.
Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.
Reactions are currently unavailable