From ebae43a5cc54db760f7a861a333995fb8ed088dd Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Mon, 7 Oct 2024 20:29:13 +0900 Subject: [PATCH 1/8] GH-3407 Draft --- .../DeliveryAttemptAwareRetryListener.java | 77 +++++ ...emptAwareRetryListenerIntegrationTest.java | 309 ++++++++++++++++++ ...DeliveryAttemptAwareRetryListenerTest.java | 140 ++++++++ 3 files changed, 526 insertions(+) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java new file mode 100644 index 0000000000..a6ee9a4676 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java @@ -0,0 +1,77 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.nio.ByteBuffer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.kafka.support.KafkaHeaders; + +/** + * DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations. + * The DeliveryAttemptAwareRetryListener adds the {@link KafkaHeaders}.DELIVERY_ATTEMPT header + * to the record's headers when batch records fail and are retried. + * Note that DeliveryAttemptAwareRetryListener modifies the headers of the original record. + * + * @author Sanghyeok An + * @since 3.3 + * @see KafkaConsumerBackoffManager + */ + +public class DeliveryAttemptAwareRetryListener implements RetryListener { + + @Override + public void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt) { + // Pass + } + + /** + * Called after a delivery failed for batch records. + * If the {@link KafkaHeaders}.DELIVERY_ATTEMPT header already exists in the {@link ConsumerRecord}'s headers, + * it will be removed. Then, the provided `deliveryAttempt` is added to the {@link ConsumerRecord}'s headers. + * + * @param records the records. + * @param ex the exception. + * @param deliveryAttempt the delivery attempt, if available. + */ + @Override + public void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { + for (ConsumerRecord record : records) { + + Headers headers = record.headers(); + int headerCount = 0; + Iterable
iterator = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : iterator) { + headerCount += 1; + } + + if (headerCount > 0) { + headers.remove(KafkaHeaders.DELIVERY_ATTEMPT); + } + + byte[] buff = new byte[4]; // NOSONAR (magic #) + ByteBuffer bb = ByteBuffer.wrap(buff); + bb.putInt(deliveryAttempt); + record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff)); + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java new file mode 100644 index 0000000000..bee77b9c72 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java @@ -0,0 +1,309 @@ +/* + * Copyright 2019-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; + +/** + * @author Sanghyeok An + * @since 3.3.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +class DeliveryAttemptAwareRetryListenerIntegrationTest { + + static final String MAIN_TOPIC_CONTAINER_FACTORY0 = "kafkaListenerContainerFactory0"; + + static final String TEST_TOPIC0 = "myBatchDeliveryAttemptTopic0"; + + static final int MAX_ATTEMPT_COUNT0 = 3; + + static final CountDownLatch latch0 = new CountDownLatch(MAX_ATTEMPT_COUNT0 + 1); + + static final String MAIN_TOPIC_CONTAINER_FACTORY1 = "kafkaListenerContainerFactory1"; + + static final String TEST_TOPIC1 = "myBatchDeliveryAttemptTopic1"; + + static final int MAX_ATTEMPT_COUNT1 = 10; + + static final CountDownLatch latch1 = new CountDownLatch(MAX_ATTEMPT_COUNT1 + 1); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Test + void should_have_delivery_attempt_header_in_each_consumer_record(@Autowired TestTopicListener0 listener) { + + // Given + String msg1 = "1"; + String msg2 = "2"; + String msg3 = "3"; + + // When + kafkaTemplate.send(TEST_TOPIC0, msg1); + kafkaTemplate.send(TEST_TOPIC0, msg2); + kafkaTemplate.send(TEST_TOPIC0, msg3); + + // Then + assertThat(awaitLatch(latch0)).isTrue(); + + Map deliveryAttemptCountMap = convertToMap(listener.receivedHeaders); + + for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT0; attemptCnt++) { + assertThat(deliveryAttemptCountMap.get(attemptCnt)).isEqualTo(3); + } + } + + @Test + void should_have_delivery_attempt_header_in_each_consumer_record_with_more_bigger_max_attempt(@Autowired TestTopicListener1 listener) { + // Given + String msg1 = "1"; + String msg2 = "2"; + String msg3 = "3"; + + // When + kafkaTemplate.send(TEST_TOPIC1, msg1); + kafkaTemplate.send(TEST_TOPIC1, msg2); + kafkaTemplate.send(TEST_TOPIC1, msg3); + + // Then + assertThat(awaitLatch(latch1)).isTrue(); + + Map deliveryAttemptCountMap = convertToMap(listener.receivedHeaders); + + for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT1; attemptCnt++) { + assertThat(deliveryAttemptCountMap.get(attemptCnt)).isEqualTo(3); + } + } + + + private Map convertToMap(List
headers) { + Map map = new HashMap<>(); + for (Header header : headers) { + int attemptCount = ByteBuffer.wrap(header.value()).getInt(); + Integer cnt = map.getOrDefault(attemptCount, 0); + map.put(attemptCount, cnt + 1); + } + return map; + } + + + static class TestTopicListener0 { + final List
receivedHeaders = new ArrayList<>(); + + @KafkaListener( + topics = TEST_TOPIC0, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY0, + batch = "true") + public void listen(List> records) { + latch0.countDown(); + for (ConsumerRecord record : records) { + Iterable
headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : headers) { + receivedHeaders.add(header); + } + } + throw new RuntimeException("Failed."); + } + } + + static class TestTopicListener1 { + final List
receivedHeaders = new ArrayList<>(); + + @KafkaListener( + topics = TEST_TOPIC1, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY1, + batch = "true") + public void listen(List> records) { + latch1.countDown(); + for (ConsumerRecord record : records) { + Iterable
headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : headers) { + receivedHeaders.add(header); + } + } + throw new RuntimeException("Failed."); + } + } + + @Configuration + static class TestConfiguration { + + @Bean + TestTopicListener0 testTopicListener0() { + return new TestTopicListener0(); + } + + @Bean + TestTopicListener1 testTopicListener1() { + return new TestTopicListener1(); + } + } + + @Configuration + static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + configProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean("customKafkaTemplate") + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); + return new KafkaAdmin(configs); + } + + @Bean + ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + "groupId"); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + ConcurrentKafkaListenerContainerFactory + kafkaListenerContainerFactory0(ConsumerFactory consumerFactory) { + + final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT0); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); + errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(errorHandler); + + + final ContainerProperties containerProperties = factory.getContainerProperties(); + containerProperties.setDeliveryAttemptHeader(true); + + return factory; + } + + @Bean + ConcurrentKafkaListenerContainerFactory + kafkaListenerContainerFactory1(ConsumerFactory consumerFactory) { + + final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT1); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); + errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(errorHandler); + + + final ContainerProperties containerProperties = factory.getContainerProperties(); + containerProperties.setDeliveryAttemptHeader(true); + + return factory; + } + + + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java new file mode 100644 index 0000000000..e4413246c8 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.*; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.KafkaHeaders; + +/** + * @author Sanghyeok An + * @since 3.3 + * + */ + +class DeliveryAttemptAwareRetryListenerTest { + + + @Test + void should_have_single_header_and_header_value_should_be_1() { + // Given + TopicPartition tpForTopicA = new TopicPartition("topicA", 1); + TopicPartition tpForTopicB = new TopicPartition("topicB", 1); + + ConsumerRecord record1 = new ConsumerRecord<>("topicA", 1, 1, "key", "value1"); + ConsumerRecord record2 = new ConsumerRecord<>("topicA", 1, 2, "key", "value2"); + ConsumerRecord record3 = new ConsumerRecord<>("topicA", 1, 3, "key", "value3"); + + ConsumerRecord record4 = new ConsumerRecord<>("topicB", 1, 1, "key", "value4"); + ConsumerRecord record5 = new ConsumerRecord<>("topicB", 1, 2, "key", "value5"); + ConsumerRecord record6 = new ConsumerRecord<>("topicB", 1, 3, "key", "value6"); + + Map>> map = new HashMap<>(); + + List> topicARecords = List.of(record1, record2, record3); + List> topicBRecords = List.of(record4, record5, record6); + + map.put(tpForTopicA, topicARecords); + map.put(tpForTopicB, topicBRecords); + + ConsumerRecords consumerRecords = new ConsumerRecords<>(map); + final DeliveryAttemptAwareRetryListener listener = new DeliveryAttemptAwareRetryListener(); + Exception ex = new RuntimeException("Dummy Exception"); + + // Given : Expected Value + int expectedDeliveryAttemptInHeader = 1; + + // When + listener.failedDelivery(consumerRecords, ex, 1); + + // Then + for (ConsumerRecord consumerRecord : consumerRecords) { + int deliveryAttemptHeaderCount = 0; + Iterable
headers = consumerRecord.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + + for (Header header : headers) { + int deliveryAttempt = ByteBuffer.wrap(header.value()).getInt(); + deliveryAttemptHeaderCount++; + + // Assertion + assertThat(deliveryAttempt).isEqualTo(expectedDeliveryAttemptInHeader); + assertThat(deliveryAttemptHeaderCount).isEqualTo(1); + } + } + } + + @Test + void should_have_single_header_and_header_value_should_be_4() { + // Given + TopicPartition tpForTopicA = new TopicPartition("topicA", 1); + TopicPartition tpForTopicB = new TopicPartition("topicB", 1); + + ConsumerRecord record1 = new ConsumerRecord<>("topicA", 1, 1, "key", "value1"); + ConsumerRecord record2 = new ConsumerRecord<>("topicA", 1, 2, "key", "value2"); + ConsumerRecord record3 = new ConsumerRecord<>("topicA", 1, 3, "key", "value3"); + + ConsumerRecord record4 = new ConsumerRecord<>("topicB", 1, 1, "key", "value4"); + ConsumerRecord record5 = new ConsumerRecord<>("topicB", 1, 2, "key", "value5"); + ConsumerRecord record6 = new ConsumerRecord<>("topicB", 1, 3, "key", "value6"); + + Map>> map = new HashMap<>(); + + List> topicARecords = List.of(record1, record2, record3); + List> topicBRecords = List.of(record4, record5, record6); + + map.put(tpForTopicA, topicARecords); + map.put(tpForTopicB, topicBRecords); + + ConsumerRecords consumerRecords = new ConsumerRecords<>(map); + final DeliveryAttemptAwareRetryListener listener = new DeliveryAttemptAwareRetryListener(); + Exception ex = new RuntimeException("Dummy Exception"); + + // Given : Expected Value + int expectedDeliveryAttemptInHeader = 4; + + // When + for (int deliveryAttempt = 1; deliveryAttempt < 5; deliveryAttempt++) { + listener.failedDelivery(consumerRecords, ex, deliveryAttempt); + } + + // Then + for (ConsumerRecord consumerRecord : consumerRecords) { + int deliveryAttemptHeaderCount = 0; + Iterable
headers = consumerRecord.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : headers) { + int deliveryAttempt = ByteBuffer.wrap(header.value()).getInt(); + deliveryAttemptHeaderCount++; + + // Assertion + assertThat(deliveryAttempt).isEqualTo(expectedDeliveryAttemptInHeader); + assertThat(deliveryAttemptHeaderCount).isEqualTo(1); + } + } + + } + +} From 9ce3d042d6470417b22e749148f1ac6c924d6eb4 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Mon, 7 Oct 2024 20:55:13 +0900 Subject: [PATCH 2/8] Fixes lint error. --- .../DeliveryAttemptAwareRetryListener.java | 3 ++- ...emptAwareRetryListenerIntegrationTest.java | 23 +++++++++---------- ...DeliveryAttemptAwareRetryListenerTest.java | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java index a6ee9a4676..056522ba6a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java @@ -23,10 +23,11 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; + import org.springframework.kafka.support.KafkaHeaders; /** - * DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations. + * The DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations. * The DeliveryAttemptAwareRetryListener adds the {@link KafkaHeaders}.DELIVERY_ATTEMPT header * to the record's headers when batch records fail and are retried. * Note that DeliveryAttemptAwareRetryListener modifies the headers of the original record. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java index bee77b9c72..8b1268c71f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -128,7 +129,6 @@ void should_have_delivery_attempt_header_in_each_consumer_record_with_more_bigge } } - private Map convertToMap(List
headers) { Map map = new HashMap<>(); for (Header header : headers) { @@ -139,6 +139,15 @@ private Map convertToMap(List
headers) { return map; } + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } static class TestTopicListener0 { final List
receivedHeaders = new ArrayList<>(); @@ -291,19 +300,9 @@ ConsumerFactory consumerFactory() { containerProperties.setDeliveryAttemptHeader(true); return factory; - } - - } - - private boolean awaitLatch(CountDownLatch latch) { - try { - return latch.await(60, TimeUnit.SECONDS); - } - catch (Exception e) { - fail(e.getMessage()); - throw new RuntimeException(e); } + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java index e4413246c8..c28d3508c4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java @@ -16,7 +16,7 @@ package org.springframework.kafka.listener; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.nio.ByteBuffer; import java.util.HashMap; @@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.junit.jupiter.api.Test; + import org.springframework.kafka.support.KafkaHeaders; /** @@ -38,7 +39,6 @@ class DeliveryAttemptAwareRetryListenerTest { - @Test void should_have_single_header_and_header_value_should_be_1() { // Given From fd3e4ffb43563834f94a169882b1e400646cd0d8 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Mon, 7 Oct 2024 21:48:49 +0900 Subject: [PATCH 3/8] Fixes flaky integration tests. --- .../DeliveryAttemptAwareRetryListenerIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java index 8b1268c71f..c9712478d3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java @@ -157,13 +157,13 @@ static class TestTopicListener0 { containerFactory = MAIN_TOPIC_CONTAINER_FACTORY0, batch = "true") public void listen(List> records) { - latch0.countDown(); for (ConsumerRecord record : records) { Iterable
headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); for (Header header : headers) { receivedHeaders.add(header); } } + latch0.countDown(); throw new RuntimeException("Failed."); } } @@ -176,13 +176,13 @@ static class TestTopicListener1 { containerFactory = MAIN_TOPIC_CONTAINER_FACTORY1, batch = "true") public void listen(List> records) { - latch1.countDown(); for (ConsumerRecord record : records) { Iterable
headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); for (Header header : headers) { receivedHeaders.add(header); } } + latch1.countDown(); throw new RuntimeException("Failed."); } } From 47f13e5204ff7296534414bec097e0e604c99027 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 8 Oct 2024 20:52:49 +0900 Subject: [PATCH 4/8] Add adocs for new feature. --- .../kafka-headers-for-batch-listener.adoc | 18 ++++++++++++++++++ .../antora/modules/ROOT/pages/whats-new.adoc | 8 +++++++- .../DeliveryAttemptAwareRetryListener.java | 17 ++--------------- ...mptAwareRetryListenerIntegrationTests.java} | 18 +++++++++--------- ...eliveryAttemptAwareRetryListenerTests.java} | 3 +-- 5 files changed, 37 insertions(+), 27 deletions(-) create mode 100644 spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc rename spring-kafka/src/test/java/org/springframework/kafka/listener/{DeliveryAttemptAwareRetryListenerIntegrationTest.java => DeliveryAttemptAwareRetryListenerIntegrationTests.java} (92%) rename spring-kafka/src/test/java/org/springframework/kafka/listener/{DeliveryAttemptAwareRetryListenerTest.java => DeliveryAttemptAwareRetryListenerTests.java} (99%) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc new file mode 100644 index 0000000000..513cb2b1d0 --- /dev/null +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc @@ -0,0 +1,18 @@ +[[kafka-headers-for-batch-listener]] += Kafka Headers for batch listener + +When processing `ConsumerRecord` with the `BatchListener`, the `KafkaHeaders.DELIVERY_ATTEMPT` header can be present in a different way compared to `SingleRecordListener`. + +To inject the `KafkaHeaders.DELIVERY_ATTEMPT` header into `ConsumerRecord` when using the `BatchListener`, set the `DeliveryAttemptAwareRetryListener` as the `RetryListener` in the `ErrorHandler`. + +Please refer to the code below. +[source, java] +---- +final FixedBackOff fixedBackOff = new FixedBackOff(1, 10); +final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); +errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + +ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +factory.setConsumerFactory(consumerFactory); +factory.setCommonErrorHandler(errorHandler); +---- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index ad8caa2e02..a2295b63e1 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -49,4 +49,10 @@ When using `DeadLetterPublishingRecovererFactory`, the user applications can ove [[x33-customize-kafka-streams-implementation]] === Customizing The Implementation of Kafka Streams -When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. \ No newline at end of file +When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. + +[[x33-kafka-headers-for-batch-listeners]] +=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners +When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields. +If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header. +For more details, see xref:retrytopic/kafka-headers-for-batch-listener.adoc[kafka-headers-for-batch-listener]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java index 056522ba6a..3866a95d62 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java @@ -20,8 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.kafka.support.KafkaHeaders; @@ -45,10 +43,9 @@ public void failedDelivery(ConsumerRecord record, Exception ex, int delive } /** - * Called after a delivery failed for batch records. + * Invoke after delivery failure for batch records. * If the {@link KafkaHeaders}.DELIVERY_ATTEMPT header already exists in the {@link ConsumerRecord}'s headers, * it will be removed. Then, the provided `deliveryAttempt` is added to the {@link ConsumerRecord}'s headers. - * * @param records the records. * @param ex the exception. * @param deliveryAttempt the delivery attempt, if available. @@ -56,17 +53,7 @@ public void failedDelivery(ConsumerRecord record, Exception ex, int delive @Override public void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { for (ConsumerRecord record : records) { - - Headers headers = record.headers(); - int headerCount = 0; - Iterable
iterator = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); - for (Header header : iterator) { - headerCount += 1; - } - - if (headerCount > 0) { - headers.remove(KafkaHeaders.DELIVERY_ATTEMPT); - } + record.headers().remove(KafkaHeaders.DELIVERY_ATTEMPT); byte[] buff = new byte[4]; // NOSONAR (magic #) ByteBuffer bb = ByteBuffer.wrap(buff); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java similarity index 92% rename from spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java rename to spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java index c9712478d3..58efeeccaf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -63,9 +63,9 @@ @SpringJUnitConfig @DirtiesContext @EmbeddedKafka -class DeliveryAttemptAwareRetryListenerIntegrationTest { +class DeliveryAttemptAwareRetryListenerIntegrationTests { - static final String MAIN_TOPIC_CONTAINER_FACTORY0 = "kafkaListenerContainerFactory0"; + static final String MAIN_TOPIC_CONTAINER_FACTORY0 = "deliveryMyTestKafkaListenerContainerFactory0"; static final String TEST_TOPIC0 = "myBatchDeliveryAttemptTopic0"; @@ -73,7 +73,7 @@ class DeliveryAttemptAwareRetryListenerIntegrationTest { static final CountDownLatch latch0 = new CountDownLatch(MAX_ATTEMPT_COUNT0 + 1); - static final String MAIN_TOPIC_CONTAINER_FACTORY1 = "kafkaListenerContainerFactory1"; + static final String MAIN_TOPIC_CONTAINER_FACTORY1 = "deliveryMyTestKafkaListenerContainerFactory1"; static final String TEST_TOPIC1 = "myBatchDeliveryAttemptTopic1"; @@ -103,7 +103,7 @@ void should_have_delivery_attempt_header_in_each_consumer_record(@Autowired Test Map deliveryAttemptCountMap = convertToMap(listener.receivedHeaders); for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT0; attemptCnt++) { - assertThat(deliveryAttemptCountMap.get(attemptCnt)).isEqualTo(3); + assertThat(deliveryAttemptCountMap.get(attemptCnt)).isGreaterThan(0); } } @@ -125,7 +125,7 @@ void should_have_delivery_attempt_header_in_each_consumer_record_with_more_bigge Map deliveryAttemptCountMap = convertToMap(listener.receivedHeaders); for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT1; attemptCnt++) { - assertThat(deliveryAttemptCountMap.get(attemptCnt)).isEqualTo(3); + assertThat(deliveryAttemptCountMap.get(attemptCnt)).isGreaterThan(0); } } @@ -266,9 +266,9 @@ ConsumerFactory consumerFactory() { @Bean ConcurrentKafkaListenerContainerFactory - kafkaListenerContainerFactory0(ConsumerFactory consumerFactory) { + deliveryMyTestKafkaListenerContainerFactory0(ConsumerFactory consumerFactory) { - final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT0); + final FixedBackOff fixedBackOff = new FixedBackOff(1, MAX_ATTEMPT_COUNT0); DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); @@ -285,9 +285,9 @@ ConsumerFactory consumerFactory() { @Bean ConcurrentKafkaListenerContainerFactory - kafkaListenerContainerFactory1(ConsumerFactory consumerFactory) { + deliveryMyTestKafkaListenerContainerFactory1(ConsumerFactory consumerFactory) { - final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT1); + final FixedBackOff fixedBackOff = new FixedBackOff(1, MAX_ATTEMPT_COUNT1); DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTests.java similarity index 99% rename from spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java rename to spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTests.java index c28d3508c4..6d79218e2a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTests.java @@ -34,10 +34,9 @@ /** * @author Sanghyeok An * @since 3.3 - * */ -class DeliveryAttemptAwareRetryListenerTest { +class DeliveryAttemptAwareRetryListenerTests { @Test void should_have_single_header_and_header_value_should_be_1() { From 0d852f1aecd9504d4db8d63b606a76826d390ff0 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Wed, 9 Oct 2024 08:14:40 +0900 Subject: [PATCH 5/8] Update adocs, test code refactoring, remove useless comment. --- .../kafka/annotation-error-handling.adoc | 22 ++++- .../kafka-headers-for-batch-listener.adoc | 18 ---- .../antora/modules/ROOT/pages/whats-new.adoc | 2 +- .../DeliveryAttemptAwareRetryListener.java | 1 - ...mptAwareRetryListenerIntegrationTests.java | 96 +++++++------------ 5 files changed, 57 insertions(+), 82 deletions(-) delete mode 100644 spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 9ea0f20391..777f46589a 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -562,6 +562,27 @@ It is disabled by default to avoid the (small) overhead of looking up the state The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature. +[[delivery-attempts-header-for-batch-listener]] +== Delivery Attempts Header for batch listener + +When processing `ConsumerRecord` with the `BatchListener`, the `KafkaHeaders.DELIVERY_ATTEMPT` header can be present in a different way compared to `SingleRecordListener`. + +Starting with version 3.3, if you want to inject the `KafkaHeaders.DELIVERY_ATTEMPT` header into the `ConsumerRecord` when using the `BatchListener`, set the `DeliveryAttemptAwareRetryListener` as the `RetryListener` in the `ErrorHandler`. + +Please refer to the code below. +[source, java] +---- +final FixedBackOff fixedBackOff = new FixedBackOff(1, 10); +final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); +errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + +ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +factory.setConsumerFactory(consumerFactory); +factory.setCommonErrorHandler(errorHandler); +---- + +Then, whenever a batch fails to complete, the `DeliveryAttemptAwareRetryListener` will inject a `KafkaHeaders.DELIVERY_ATTMPT` header into the `ConsumerRecord`. + [[li-header]] == Listener Info Header @@ -796,4 +817,3 @@ DefaultErrorHandler handler() { ---- This will retry after `1, 2, 4, 8, 10, 10` seconds, before calling the recoverer. - diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc deleted file mode 100644 index 513cb2b1d0..0000000000 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/kafka-headers-for-batch-listener.adoc +++ /dev/null @@ -1,18 +0,0 @@ -[[kafka-headers-for-batch-listener]] -= Kafka Headers for batch listener - -When processing `ConsumerRecord` with the `BatchListener`, the `KafkaHeaders.DELIVERY_ATTEMPT` header can be present in a different way compared to `SingleRecordListener`. - -To inject the `KafkaHeaders.DELIVERY_ATTEMPT` header into `ConsumerRecord` when using the `BatchListener`, set the `DeliveryAttemptAwareRetryListener` as the `RetryListener` in the `ErrorHandler`. - -Please refer to the code below. -[source, java] ----- -final FixedBackOff fixedBackOff = new FixedBackOff(1, 10); -final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); -errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); - -ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -factory.setConsumerFactory(consumerFactory); -factory.setCommonErrorHandler(errorHandler); ----- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index a2295b63e1..dc2de09474 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -55,4 +55,4 @@ When using `KafkaStreamsCustomizer` it is now possible to return a custom implem === KafkaHeaders.DELIVERY_ATTEMPT for batch listeners When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields. If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header. -For more details, see xref:retrytopic/kafka-headers-for-batch-listener.adoc[kafka-headers-for-batch-listener]. +For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[kafka-headers-for-batch-listener]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java index 3866a95d62..b35fbb371d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java @@ -32,7 +32,6 @@ * * @author Sanghyeok An * @since 3.3 - * @see KafkaConsumerBackoffManager */ public class DeliveryAttemptAwareRetryListener implements RetryListener { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java index 58efeeccaf..eec10815bc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -51,6 +51,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.util.backoff.FixedBackOff; @@ -209,17 +210,11 @@ static class KafkaProducerConfig { @Bean ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Map props = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); - configProps.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class); - configProps.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class); - return new DefaultKafkaProducerFactory<>(configProps); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); } @Bean("customKafkaTemplate") @@ -228,6 +223,25 @@ KafkaTemplate kafkaTemplate() { } } + private static CommonErrorHandler createErrorHandler(int interval, int maxAttemptCount) { + final FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); + errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + return errorHandler; + } + + private static ConcurrentKafkaListenerContainerFactory createListenerContainerFactory( + ConsumerFactory consumerFactory, CommonErrorHandler errorHandler) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(errorHandler); + + final ContainerProperties containerProperties = factory.getContainerProperties(); + containerProperties.setDeliveryAttemptHeader(true); + return factory; + } + @EnableKafka @Configuration static class KafkaConsumerConfig { @@ -235,30 +249,15 @@ static class KafkaConsumerConfig { @Autowired EmbeddedKafkaBroker broker; - @Bean - KafkaAdmin kafkaAdmin() { - Map configs = new HashMap<>(); - configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); - return new KafkaAdmin(configs); - } - @Bean ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); - props.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put( - ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "DeliveryAttemptAwareRetryListenerIntegrationTestsGroupId", + "true"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props); @@ -267,40 +266,15 @@ ConsumerFactory consumerFactory() { @Bean ConcurrentKafkaListenerContainerFactory deliveryMyTestKafkaListenerContainerFactory0(ConsumerFactory consumerFactory) { - - final FixedBackOff fixedBackOff = new FixedBackOff(1, MAX_ATTEMPT_COUNT0); - DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); - errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); - - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - factory.setCommonErrorHandler(errorHandler); - - - final ContainerProperties containerProperties = factory.getContainerProperties(); - containerProperties.setDeliveryAttemptHeader(true); - - return factory; + CommonErrorHandler errorHandler = createErrorHandler(1, MAX_ATTEMPT_COUNT0); + return createListenerContainerFactory(consumerFactory, errorHandler); } @Bean ConcurrentKafkaListenerContainerFactory deliveryMyTestKafkaListenerContainerFactory1(ConsumerFactory consumerFactory) { - - final FixedBackOff fixedBackOff = new FixedBackOff(1, MAX_ATTEMPT_COUNT1); - DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); - errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); - - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - factory.setCommonErrorHandler(errorHandler); - - - final ContainerProperties containerProperties = factory.getContainerProperties(); - containerProperties.setDeliveryAttemptHeader(true); - - return factory; - + CommonErrorHandler errorHandler = createErrorHandler(1, MAX_ATTEMPT_COUNT1); + return createListenerContainerFactory(consumerFactory, errorHandler); } } From 5a9a68f5769062b9d96c48989560d7a83311b8a7 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Wed, 9 Oct 2024 14:12:12 +0900 Subject: [PATCH 6/8] Fixes lint error --- ...mptAwareRetryListenerIntegrationTests.java | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java index eec10815bc..2ddde9d0af 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -27,7 +27,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; @@ -45,7 +44,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.KafkaHeaders; @@ -150,6 +148,25 @@ private boolean awaitLatch(CountDownLatch latch) { } } + private static CommonErrorHandler createErrorHandler(int interval, int maxAttemptCount) { + final FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); + errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + return errorHandler; + } + + private static ConcurrentKafkaListenerContainerFactory createListenerContainerFactory( + ConsumerFactory consumerFactory, CommonErrorHandler errorHandler) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(errorHandler); + + final ContainerProperties containerProperties = factory.getContainerProperties(); + containerProperties.setDeliveryAttemptHeader(true); + return factory; + } + static class TestTopicListener0 { final List
receivedHeaders = new ArrayList<>(); @@ -223,25 +240,6 @@ KafkaTemplate kafkaTemplate() { } } - private static CommonErrorHandler createErrorHandler(int interval, int maxAttemptCount) { - final FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount); - DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); - errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); - return errorHandler; - } - - private static ConcurrentKafkaListenerContainerFactory createListenerContainerFactory( - ConsumerFactory consumerFactory, CommonErrorHandler errorHandler) { - - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - factory.setCommonErrorHandler(errorHandler); - - final ContainerProperties containerProperties = factory.getContainerProperties(); - containerProperties.setDeliveryAttemptHeader(true); - return factory; - } - @EnableKafka @Configuration static class KafkaConsumerConfig { From 6ab72b9cabaf58652050c52ddcb3736b2c33722e Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Thu, 10 Oct 2024 07:06:39 +0900 Subject: [PATCH 7/8] Remove useless code. --- ...veryAttemptAwareRetryListenerIntegrationTests.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java index 2ddde9d0af..e3d7d1a8db 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -149,7 +149,7 @@ private boolean awaitLatch(CountDownLatch latch) { } private static CommonErrorHandler createErrorHandler(int interval, int maxAttemptCount) { - final FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount); + FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount); DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); return errorHandler; @@ -162,7 +162,7 @@ private static ConcurrentKafkaListenerContainerFactory createLis factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); - final ContainerProperties containerProperties = factory.getContainerProperties(); + ContainerProperties containerProperties = factory.getContainerProperties(); containerProperties.setDeliveryAttemptHeader(true); return factory; } @@ -229,8 +229,6 @@ static class KafkaProducerConfig { ProducerFactory producerFactory() { Map props = KafkaTestUtils.producerProps( this.broker.getBrokersAsString()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @@ -253,11 +251,6 @@ ConsumerFactory consumerFactory() { this.broker.getBrokersAsString(), "DeliveryAttemptAwareRetryListenerIntegrationTestsGroupId", "true"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return new DefaultKafkaConsumerFactory<>(props); } From f33aa4920f59ed27e6d7bb6b958d3f54078814c3 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Thu, 10 Oct 2024 07:11:00 +0900 Subject: [PATCH 8/8] Fixes lint error --- .../DeliveryAttemptAwareRetryListenerIntegrationTests.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java index e3d7d1a8db..20b7cea8fb 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -27,12 +27,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired;