diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index c9fcf36b92..f1d9265fc8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -53,6 +53,7 @@ * @author Yvette Quinby * @author Adrian Chlebosz * @author Omer Celik + * @author Hyunggeol Lee * @since 2.7 * */ @@ -256,10 +257,16 @@ private void validateDestinations(List destinationsToAdd) { for (int i = 0; i < destinationsToAdd.size(); i++) { DestinationTopic destination = destinationsToAdd.get(i); if (destination.isReusableRetryTopic()) { - Assert.isTrue((i == (destinationsToAdd.size() - 1) || - ((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))), - String.format("In the destination topic chain, the type %s can only be " - + "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC)); + // Allow multiple DLTs after REUSABLE_RETRY_TOPIC + boolean isLastOrFollowedOnlyByDlts = (i == destinationsToAdd.size() - 1) || + destinationsToAdd.subList(i + 1, destinationsToAdd.size()) + .stream() + .allMatch(DestinationTopic::isDltTopic); + + Assert.isTrue(isLastOrFollowedOnlyByDlts, + () -> String.format("In the destination topic chain, the type %s can only be " + + "specified as the last retry topic (followed only by DLT topics).", + Type.REUSABLE_RETRY_TOPIC)); } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index a6093c0317..ff186ae304 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -39,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatNullPointerException; /** @@ -46,6 +47,7 @@ * @author Yvette Quinby * @author Gary Russell * @author Adrian Chlebosz + * @author Hyunggeol Lee * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -290,4 +292,103 @@ void shouldNotMarkContainerRefeshedOnOtherContextRefresh() { assertThat(defaultDestinationTopicContainer.isContextRefreshed()).isFalse(); } + @Test + void shouldAllowReusableRetryTopicWithSingleDlt() { + assertThatNoException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", allFifthDestinationTopics)); + } + + @Test + void shouldAllowReusableRetryTopicWithMultipleDlts() { + assertThatNoException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", allSixthDestinationTopics)); + } + + @Test + void shouldAllowReusableRetryTopicAsLastTopic() { + List topics = Arrays.asList( + mainDestinationTopic5, + reusableRetryDestinationTopic5 + ); + + assertThatNoException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", topics)); + } + + @Test + void shouldRejectReusableRetryTopicFollowedByRegularRetry() { + List topics = Arrays.asList( + mainDestinationTopic6, + reusableRetryDestinationTopic6, + invalidRetryDestinationTopic6, + dltDestinationTopic6 + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", topics)) + .withMessageContaining("REUSABLE_RETRY_TOPIC") + .withMessageContaining("followed only by DLT topics"); + } + + @Test + void shouldRejectReusableRetryTopicFollowedByNoOps() { + List topics = Arrays.asList( + mainDestinationTopic6, + reusableRetryDestinationTopic6, + noOpsDestinationTopic6, + dltDestinationTopic6 + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", topics)) + .withMessageContaining("REUSABLE_RETRY_TOPIC") + .withMessageContaining("followed only by DLT topics"); + } + + @Test + void shouldAllowReusableRetryTopicWithTwoDlts() { + List topics = Arrays.asList( + mainDestinationTopic6, + reusableRetryDestinationTopic6, + customDltDestinationTopic6, + dltDestinationTopic6 + ); + + assertThatNoException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", topics)); + } + + @Test + void shouldAllowReusableRetryTopicWithDifferentDltCombinations() { + List topics = Arrays.asList( + mainDestinationTopic6, + reusableRetryDestinationTopic6, + validationDltDestinationTopic6, + dltDestinationTopic6 + ); + + assertThatNoException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", topics)); + } + + @Test + void shouldRejectReusableRetryTopicFollowedByMainTopic() { + List topics = Arrays.asList( + mainDestinationTopic6, + reusableRetryDestinationTopic6, + mainDestinationTopic5 + ); + + assertThatIllegalArgumentException() + .isThrownBy(() -> defaultDestinationTopicContainer + .addDestinationTopics("id", topics)) + .withMessageContaining("REUSABLE_RETRY_TOPIC"); + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 69653a24c5..3e692aa804 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -31,6 +31,7 @@ /** * @author Tomaz Fernandes * @author Adrian Chlebosz + * @author Hyunggeol Lee * @since 2.7 */ public class DestinationTopicTests { @@ -144,6 +145,33 @@ public class DestinationTopicTests { new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); + protected DestinationTopic.Properties mainTopicProps6 = + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties reusableRetryTopicProps6 = + new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties customDltTopicProps6 = + new DestinationTopic.Properties(0, "-custom" + dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, + Set.of(IllegalStateException.class)); + + protected DestinationTopic.Properties validationDltTopicProps6 = + new DestinationTopic.Properties(0, "-validation" + dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, + Set.of(IllegalArgumentException.class)); + + protected DestinationTopic.Properties dltTopicProps6 = + new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, + Collections.emptySet()); + + protected DestinationTopic.Properties invalidRetryProps6 = + new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + // Holders protected final static String FIRST_TOPIC = "firstTopic"; @@ -285,6 +313,38 @@ public class DestinationTopicTests { protected List allFifthDestinationTopics = Arrays .asList(mainDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5); + protected final static String SIXTH_TOPIC = "sixthTopic"; + + protected DestinationTopic mainDestinationTopic6 = + new DestinationTopic(SIXTH_TOPIC + mainTopicProps6.suffix(), mainTopicProps6); + + protected DestinationTopic reusableRetryDestinationTopic6 = + new DestinationTopic(SIXTH_TOPIC + reusableRetryTopicProps6.suffix(), reusableRetryTopicProps6); + + protected DestinationTopic customDltDestinationTopic6 = + new DestinationTopic(SIXTH_TOPIC + customDltTopicProps6.suffix(), customDltTopicProps6); + + protected DestinationTopic validationDltDestinationTopic6 = + new DestinationTopic(SIXTH_TOPIC + validationDltTopicProps6.suffix(), validationDltTopicProps6); + + protected DestinationTopic dltDestinationTopic6 = + new DestinationTopic(SIXTH_TOPIC + dltTopicProps6.suffix(), dltTopicProps6); + + protected DestinationTopic invalidRetryDestinationTopic6 = + new DestinationTopic(SIXTH_TOPIC + invalidRetryProps6.suffix(), invalidRetryProps6); + + protected DestinationTopic noOpsDestinationTopic6 = + new DestinationTopic(dltDestinationTopic6.getDestinationName() + "-noOps", + new DestinationTopic.Properties(dltTopicProps6, "-noOps", DestinationTopic.Type.NO_OPS)); + + protected List allSixthDestinationTopics = Arrays.asList( + mainDestinationTopic6, + reusableRetryDestinationTopic6, + customDltDestinationTopic6, + validationDltDestinationTopic6, + dltDestinationTopic6 + ); + // Exception matchers private final ExceptionMatcher allowListExceptionMatcher = ExceptionMatcher.forAllowList().add(IllegalArgumentException.class).build(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ReusableRetryTopicMultipleDltIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ReusableRetryTopicMultipleDltIntegrationTests.java new file mode 100644 index 0000000000..ed0e79c6a5 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ReusableRetryTopicMultipleDltIntegrationTests.java @@ -0,0 +1,331 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.BackOff; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** + * @author Hyunggeol Lee + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +public class ReusableRetryTopicMultipleDltIntegrationTests { + + private static final Logger logger = LoggerFactory.getLogger(ReusableRetryTopicMultipleDltIntegrationTests.class); + + public static final String SINGLE_DLT_TOPIC = "reusableRetryWithSingleDlt"; + + public static final String MULTI_DLT_TOPIC = "reusableRetryWithMultiDlt"; + + public static final String NO_DLT_TOPIC = "reusableRetryWithNoDlt"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Test + void shouldStartContextWithReusableRetryTopicAndMultipleDlts() { + assertThat(registry.getListenerContainer("multiDltListenerId")).isNotNull(); + assertThat(registry.getListenerContainer("multiDltListenerId").isRunning()).isTrue(); + + kafkaTemplate.send(MULTI_DLT_TOPIC, "Testing multiple DLTs with custom exception"); + assertThat(awaitLatch(latchContainer.multiDltLatch)).isTrue(); + assertThat(latchContainer.multiDltInvocations.get()).isEqualTo(3); + } + + @Test + void shouldMaintainBackwardCompatibilityWithSingleDlt() { + assertThat(registry.getListenerContainer("singleDltListenerId")).isNotNull(); + assertThat(registry.getListenerContainer("singleDltListenerId").isRunning()).isTrue(); + + kafkaTemplate.send(SINGLE_DLT_TOPIC, "Testing single DLT"); + assertThat(awaitLatch(latchContainer.singleDltLatch)).isTrue(); + assertThat(latchContainer.singleDltInvocations.get()).isEqualTo(3); + } + + @Test + void shouldWorkWithReusableRetryTopicAndNoDlt() { + assertThat(registry.getListenerContainer("noDltListenerId")).isNotNull(); + assertThat(registry.getListenerContainer("noDltListenerId").isRunning()).isTrue(); + + kafkaTemplate.send(NO_DLT_TOPIC, "Testing reusable retry with no DLT"); + assertThat(awaitLatch(latchContainer.noDltProcessed)).isTrue(); + assertThat(latchContainer.noDltInvocations.get()).isEqualTo(3); + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + fail(e.getMessage()); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + static class SingleDltListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic( + attempts = "3", + backOff = @BackOff(50), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC + ) + @KafkaListener(id = "singleDltListenerId", topics = SINGLE_DLT_TOPIC) + public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Single DLT listener: message {} received in topic {}", message, receivedTopic); + container.singleDltInvocations.incrementAndGet(); + throw new RuntimeException("Test exception for single DLT"); + } + + @DltHandler + public void handleDlt(Object message) { + logger.debug("Single DLT handler received message"); + container.singleDltLatch.countDown(); + } + } + + static class MultiDltListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic( + attempts = "3", + backOff = @BackOff(50), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC, + exceptionBasedDltRouting = { + @ExceptionBasedDltDestination(suffix = "-custom", exceptions = { CustomRetryException.class }), + @ExceptionBasedDltDestination(suffix = "-validation", exceptions = { ValidationRetryException.class }) + } + ) + @KafkaListener(id = "multiDltListenerId", topics = MULTI_DLT_TOPIC) + public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("Multi DLT listener: message {} received in topic {}", message, receivedTopic); + container.multiDltInvocations.incrementAndGet(); + + if (message.contains("custom exception")) { + throw new CustomRetryException("Testing custom DLT routing"); + } + else if (message.contains("validation exception")) { + throw new ValidationRetryException("Testing validation DLT routing"); + } + else { + throw new RuntimeException("Testing default DLT routing"); + } + } + + @DltHandler + public void handleDlt(Object message) { + logger.debug("Multi DLT handler received message"); + container.multiDltLatch.countDown(); + } + } + + static class NoDltListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic( + attempts = "3", + backOff = @BackOff(50), + dltStrategy = DltStrategy.NO_DLT, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC + ) + @KafkaListener(id = "noDltListenerId", topics = NO_DLT_TOPIC) + public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + logger.debug("No DLT listener: message {} received in topic {}", message, receivedTopic); + container.noDltInvocations.incrementAndGet(); + if (container.noDltInvocations.get() == 3) { + container.noDltProcessed.countDown(); + } + throw new RuntimeException("Test exception for no DLT"); + } + + @DltHandler + public void shouldNotBeInvoked() { + fail("DLT handler should not be invoked when dltStrategy = NO_DLT"); + } + } + + static class CountDownLatchContainer { + + final CountDownLatch singleDltLatch = new CountDownLatch(1); + + final CountDownLatch multiDltLatch = new CountDownLatch(1); + + final CountDownLatch noDltProcessed = new CountDownLatch(1); + + final AtomicInteger singleDltInvocations = new AtomicInteger(0); + + final AtomicInteger multiDltInvocations = new AtomicInteger(0); + + final AtomicInteger noDltInvocations = new AtomicInteger(0); + } + + @SuppressWarnings("serial") + public static class CustomRetryException extends RuntimeException { + public CustomRetryException(String msg) { + super(msg); + } + } + + @SuppressWarnings("serial") + public static class ValidationRetryException extends RuntimeException { + public ValidationRetryException(String msg) { + super(msg); + } + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + @Bean + public SingleDltListener singleDltListener() { + return new SingleDltListener(); + } + + @Bean + public MultiDltListener multiDltListener() { + return new MultiDltListener(); + } + + @Bean + public NoDltListener noDltListener() { + return new NoDltListener(); + } + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } + } + + @Configuration + public static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = KafkaTestUtils.producerProps(broker); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + public static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + return new KafkaAdmin(configs); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = KafkaTestUtils.consumerProps(broker.getBrokersAsString(), "test-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + return factory; + } + } +}