Skip to content

Commit da7877d

Browse files
committed
kafka indempotence et transaction end
1 parent 87aa184 commit da7877d

File tree

2 files changed

+3
-15
lines changed

2 files changed

+3
-15
lines changed

messaging-tutorial/kafka-tutorial/kafka-receiver/src/main/java/fr/eletutour/receiver/configuration/KafkaConsumerConfig.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,17 @@ public class KafkaConsumerConfig {
2020
private String dltTopic;
2121

2222
@Bean
23-
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory( // Changé à Message
24-
ConsumerFactory<String, Message> consumerFactory,
23+
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory( ConsumerFactory<String, Message> consumerFactory,
2524
KafkaTemplate<String, Message> kafkaTemplate) { // Changé à Message
2625

2726
ConcurrentKafkaListenerContainerFactory<String, Message> factory = // Changé à Message
2827
new ConcurrentKafkaListenerContainerFactory<>();
2928

3029
factory.setConsumerFactory(consumerFactory);
31-
32-
// Active le mode Ack manuel
33-
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
3430

35-
// Configure le DefaultErrorHandler avec DeadLetterPublishingRecoverer
36-
// FixedBackOff(0L, 0) signifie pas de re-tentative avant d'envoyer au DLT
31+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
3732
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
38-
new DeadLetterPublishingRecoverer(kafkaTemplate, (consumerRecord, e) -> {
39-
// Définir le topic DLQ et la partition (null pour laisser Kafka choisir)
40-
return new TopicPartition(dltTopic, -1);
41-
}),
33+
new DeadLetterPublishingRecoverer(kafkaTemplate, (consumerRecord, e) -> new TopicPartition(dltTopic, -1)),
4234
new FixedBackOff(0L, 0)
4335
);
4436
factory.setCommonErrorHandler(errorHandler);

messaging-tutorial/kafka-tutorial/kafka-receiver/src/main/resources/application.properties

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,8 @@ spring.kafka.consumer.group-id=my-group
55
app.kafka.topic=my-topic
66
app.kafka.dlt-topic=my-topic.DLT
77

8-
# Kafka Consumer Deserialization Configuration
9-
# Use ErrorHandlingDeserializer to gracefully handle malformed messages
108
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
11-
# Delegate the actual deserialization to JsonDeserializer
129
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
13-
# Configure the delegate JsonDeserializer to trust all packages
1410
spring.kafka.consumer.properties[spring.json.trusted.packages]=*
1511

1612
# H2 Database Configuration

0 commit comments

Comments
 (0)