Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* @author Yvette Quinby
* @author Adrian Chlebosz
* @author Omer Celik
* @author Hyunggeol Lee
* @since 2.7
*
*/
Expand Down Expand Up @@ -256,10 +257,16 @@ private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
for (int i = 0; i < destinationsToAdd.size(); i++) {
DestinationTopic destination = destinationsToAdd.get(i);
if (destination.isReusableRetryTopic()) {
Assert.isTrue((i == (destinationsToAdd.size() - 1) ||
((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))),
String.format("In the destination topic chain, the type %s can only be "
+ "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC));
// Allow multiple DLTs after REUSABLE_RETRY_TOPIC
boolean isLastOrFollowedOnlyByDlts = (i == destinationsToAdd.size() - 1) ||
destinationsToAdd.subList(i + 1, destinationsToAdd.size())
.stream()
.allMatch(DestinationTopic::isDltTopic);

Assert.isTrue(isLastOrFollowedOnlyByDlts,
() -> String.format("In the destination topic chain, the type %s can only be " +
"specified as the last retry topic (followed only by DLT topics).",
Type.REUSABLE_RETRY_TOPIC));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;

/**
* @author Tomaz Fernandes
* @author Yvette Quinby
* @author Gary Russell
* @author Adrian Chlebosz
* @author Hyunggeol Lee
* @since 2.7
*/
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -290,4 +292,103 @@ void shouldNotMarkContainerRefeshedOnOtherContextRefresh() {
assertThat(defaultDestinationTopicContainer.isContextRefreshed()).isFalse();
}

@Test
void shouldAllowReusableRetryTopicWithSingleDlt() {
assertThatNoException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", allFifthDestinationTopics));
}

@Test
void shouldAllowReusableRetryTopicWithMultipleDlts() {
assertThatNoException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", allSixthDestinationTopics));
}

@Test
void shouldAllowReusableRetryTopicAsLastTopic() {
List<DestinationTopic> topics = Arrays.asList(
mainDestinationTopic5,
reusableRetryDestinationTopic5
);

assertThatNoException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", topics));
}

@Test
void shouldRejectReusableRetryTopicFollowedByRegularRetry() {
List<DestinationTopic> topics = Arrays.asList(
mainDestinationTopic6,
reusableRetryDestinationTopic6,
invalidRetryDestinationTopic6,
dltDestinationTopic6
);

assertThatIllegalArgumentException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", topics))
.withMessageContaining("REUSABLE_RETRY_TOPIC")
.withMessageContaining("followed only by DLT topics");
}

@Test
void shouldRejectReusableRetryTopicFollowedByNoOps() {
List<DestinationTopic> topics = Arrays.asList(
mainDestinationTopic6,
reusableRetryDestinationTopic6,
noOpsDestinationTopic6,
dltDestinationTopic6
);

assertThatIllegalArgumentException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", topics))
.withMessageContaining("REUSABLE_RETRY_TOPIC")
.withMessageContaining("followed only by DLT topics");
}

@Test
void shouldAllowReusableRetryTopicWithTwoDlts() {
List<DestinationTopic> topics = Arrays.asList(
mainDestinationTopic6,
reusableRetryDestinationTopic6,
customDltDestinationTopic6,
dltDestinationTopic6
);

assertThatNoException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", topics));
}

@Test
void shouldAllowReusableRetryTopicWithDifferentDltCombinations() {
List<DestinationTopic> topics = Arrays.asList(
mainDestinationTopic6,
reusableRetryDestinationTopic6,
validationDltDestinationTopic6,
dltDestinationTopic6
);

assertThatNoException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", topics));
}

@Test
void shouldRejectReusableRetryTopicFollowedByMainTopic() {
List<DestinationTopic> topics = Arrays.asList(
mainDestinationTopic6,
reusableRetryDestinationTopic6,
mainDestinationTopic5
);

assertThatIllegalArgumentException()
.isThrownBy(() -> defaultDestinationTopicContainer
.addDestinationTopics("id", topics))
.withMessageContaining("REUSABLE_RETRY_TOPIC");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/**
* @author Tomaz Fernandes
* @author Adrian Chlebosz
* @author Hyunggeol Lee
* @since 2.7
*/
public class DestinationTopicTests {
Expand Down Expand Up @@ -144,6 +145,33 @@ public class DestinationTopicTests {
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet());

protected DestinationTopic.Properties mainTopicProps6 =
new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);

protected DestinationTopic.Properties reusableRetryTopicProps6 =
new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);

protected DestinationTopic.Properties customDltTopicProps6 =
new DestinationTopic.Properties(0, "-custom" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
Set.of(IllegalStateException.class));

protected DestinationTopic.Properties validationDltTopicProps6 =
new DestinationTopic.Properties(0, "-validation" + dltSuffix, DestinationTopic.Type.DLT, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
Set.of(IllegalArgumentException.class));

protected DestinationTopic.Properties dltTopicProps6 =
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null,
Collections.emptySet());

protected DestinationTopic.Properties invalidRetryProps6 =
new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1,
DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout);

// Holders

protected final static String FIRST_TOPIC = "firstTopic";
Expand Down Expand Up @@ -285,6 +313,38 @@ public class DestinationTopicTests {
protected List<DestinationTopic> allFifthDestinationTopics = Arrays
.asList(mainDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5);

protected final static String SIXTH_TOPIC = "sixthTopic";

protected DestinationTopic mainDestinationTopic6 =
new DestinationTopic(SIXTH_TOPIC + mainTopicProps6.suffix(), mainTopicProps6);

protected DestinationTopic reusableRetryDestinationTopic6 =
new DestinationTopic(SIXTH_TOPIC + reusableRetryTopicProps6.suffix(), reusableRetryTopicProps6);

protected DestinationTopic customDltDestinationTopic6 =
new DestinationTopic(SIXTH_TOPIC + customDltTopicProps6.suffix(), customDltTopicProps6);

protected DestinationTopic validationDltDestinationTopic6 =
new DestinationTopic(SIXTH_TOPIC + validationDltTopicProps6.suffix(), validationDltTopicProps6);

protected DestinationTopic dltDestinationTopic6 =
new DestinationTopic(SIXTH_TOPIC + dltTopicProps6.suffix(), dltTopicProps6);

protected DestinationTopic invalidRetryDestinationTopic6 =
new DestinationTopic(SIXTH_TOPIC + invalidRetryProps6.suffix(), invalidRetryProps6);

protected DestinationTopic noOpsDestinationTopic6 =
new DestinationTopic(dltDestinationTopic6.getDestinationName() + "-noOps",
new DestinationTopic.Properties(dltTopicProps6, "-noOps", DestinationTopic.Type.NO_OPS));

protected List<DestinationTopic> allSixthDestinationTopics = Arrays.asList(
mainDestinationTopic6,
reusableRetryDestinationTopic6,
customDltDestinationTopic6,
validationDltDestinationTopic6,
dltDestinationTopic6
);

// Exception matchers

private final ExceptionMatcher allowListExceptionMatcher = ExceptionMatcher.forAllowList().add(IllegalArgumentException.class).build();
Expand Down
Loading