Skip to content

Commit c682506

Browse files
committed
kafka ajout DLQ et retry
1 parent c7a98b5 commit c682506

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

messaging-tutorial/kafka-tutorial/kafka-receiver/README.md

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,35 @@ Assurez-vous que Kafka est démarré (via `docker-compose up -d` dans le dossier
2727
2. Exécutez `mvn spring-boot:run`.
2828
3. L'application va démarrer et commencer à écouter les messages. Envoyez des messages via l'application `kafka-sender` pour les voir apparaître ici.
2929
* Envoyez un message normal (ex: `{"message": "Hello Kafka!"}`) : Il sera traité normalement.
30-
* Envoyez un message contenant "erreur" (ex: `{"message": "Ceci est une erreur"}`) : Il sera loggé comme une erreur par le consommateur principal et **vous verrez un log indiquant qu'il a été reçu par le listener de la DLQ**.
30+
* Envoyez un message contenant "erreur" (ex: `{"message": "Ceci est une erreur"}`) : # Application Consommateur Kafka (kafka-receiver)
31+
32+
Cette application Spring Boot agit comme un **consommateur** de messages pour Apache Kafka.
33+
34+
## Description
35+
36+
Son unique rôle est d'écouter en continu un topic Kafka spécifique. Lorsqu'un message est publié dans ce topic (par l'application `kafka-sender`), cette application le consomme et l'affiche dans la console.
37+
38+
Elle démontre la manière la plus simple de recevoir des messages avec Spring Kafka.
39+
40+
## Concepts Clés
41+
42+
- **Spring Kafka `@KafkaListener`** : L'annotation qui transforme une simple méthode en un consommateur de messages Kafka puissant et managé par Spring.
43+
- **Consommateur Kafka** : Le rôle de cette application dans l'écosystème Kafka.
44+
- **Dead Letter Queue (DLQ)** : Un mécanisme pour gérer les messages en erreur, les redirigeant vers un topic dédié (`my-topic.DLT`).
45+
46+
## Fichiers Principaux
47+
48+
- `KafkaConsumerService.java` : Contient la méthode annotée avec `@KafkaListener` qui est automatiquement invoquée pour chaque nouveau message dans le topic, ainsi que le listener pour la DLQ.
49+
- `KafkaConsumerConfig.java` : Configure le `ConcurrentKafkaListenerContainerFactory` et le `DefaultErrorHandler` pour la gestion des DLQ.
50+
51+
## Comment l'exécuter
52+
53+
Assurez-vous que Kafka est démarré (via `docker-compose up -d` dans le dossier parent).
54+
55+
1. Naviguez dans ce dossier.
56+
2. Exécutez `mvn spring-boot:run`.
57+
3. L'application va démarrer et commencer à écouter les messages. Envoyez des messages via l'application `kafka-sender` pour les voir apparaître ici.
58+
* Envoyez un message normal (ex: `{"message": "Hello Kafka!"}`) : Il sera traité normalement.
59+
* Envoyez un message contenant "erreur" (ex: `{"message": "Ceci est une erreur"}`) : Il sera loggé comme une erreur par le consommateur principal, envoyé à la DLQ, et **vous verrez un log indiquant qu'il a été reçu par le listener de la DLQ pour une tentative de re-traitement**.
60+
* Envoyez un message contenant "re-erreur" (ex: `{"message": "Ceci est une re-erreur"}`) : Il sera d'abord envoyé à la DLQ, puis le listener de la DLQ tentera de le re-traiter, échouera à nouveau, et **vous verrez un log indiquant l'échec du re-traitement depuis la DLQ**.
61+

messaging-tutorial/kafka-tutorial/kafka-receiver/src/main/java/fr/eletutour/receiver/service/KafkaConsumerService.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ public class KafkaConsumerService {
1818
)
1919
public void listen(String message, Acknowledgment acknowledgment) {
2020
LOG.info("Message reçu : {}", message);
21-
21+
22+
// Simuler une erreur pour les messages contenant "erreur"
2223
if (message.contains("erreur")) {
2324
LOG.error("Simulating processing error for message: {}", message);
25+
// L'exception sera interceptée par le DefaultErrorHandler
2426
throw new RuntimeException("Erreur de traitement simulée pour le message: " + message);
2527
}
2628

@@ -29,13 +31,30 @@ public void listen(String message, Acknowledgment acknowledgment) {
2931
acknowledgment.acknowledge(); // Commit manuel de l'offset
3032
}
3133

34+
// Listener pour la Dead Letter Queue (DLQ)
3235
@KafkaListener(
3336
topics = "${app.kafka.dlt-topic}",
3437
groupId = "${spring.kafka.consumer.group-id}",
3538
containerFactory = "kafkaListenerContainerFactory"
3639
)
3740
public void listenDlt(String message, Acknowledgment acknowledgment) {
3841
LOG.warn("Message reçu de la DLQ : {}", message);
39-
acknowledgment.acknowledge();
42+
43+
try {
44+
// Simuler une tentative de re-traitement
45+
LOG.info("Tentative de re-traitement du message de la DLQ : {}", message);
46+
47+
// Pour la démonstration, si le message contient "re-erreur", il échoue à nouveau
48+
if (message.contains("re-erreur")) {
49+
throw new RuntimeException("Échec du re-traitement simulé pour le message: " + message);
50+
}
51+
52+
// Si le re-traitement réussit
53+
LOG.info("Message de la DLQ re-traité avec succès : {}", message);
54+
acknowledgment.acknowledge(); // Acknowledge le message de la DLQ
55+
} catch (Exception e) {
56+
LOG.error("Échec du re-traitement du message de la DLQ : {}", message, e);
57+
acknowledgment.acknowledge();
58+
}
4059
}
4160
}

0 commit comments

Comments
 (0)