Skip to content

Commit ebae43a

Browse files
GH-3407 Draft
1 parent c44c339 commit ebae43a

File tree

3 files changed

+526
-0
lines changed

3 files changed

+526
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2021-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.nio.ByteBuffer;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.common.header.Header;
24+
import org.apache.kafka.common.header.Headers;
25+
import org.apache.kafka.common.header.internals.RecordHeader;
26+
import org.springframework.kafka.support.KafkaHeaders;
27+
28+
/**
29+
* DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations.
30+
* The DeliveryAttemptAwareRetryListener adds the {@link KafkaHeaders}.DELIVERY_ATTEMPT header
31+
* to the record's headers when batch records fail and are retried.
32+
* Note that DeliveryAttemptAwareRetryListener modifies the headers of the original record.
33+
*
34+
* @author Sanghyeok An
35+
* @since 3.3
36+
* @see KafkaConsumerBackoffManager
37+
*/
38+
39+
public class DeliveryAttemptAwareRetryListener implements RetryListener {
40+
41+
@Override
42+
public void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt) {
43+
// Pass
44+
}
45+
46+
/**
47+
* Called after a delivery failed for batch records.
48+
* If the {@link KafkaHeaders}.DELIVERY_ATTEMPT header already exists in the {@link ConsumerRecord}'s headers,
49+
* it will be removed. Then, the provided `deliveryAttempt` is added to the {@link ConsumerRecord}'s headers.
50+
*
51+
* @param records the records.
52+
* @param ex the exception.
53+
* @param deliveryAttempt the delivery attempt, if available.
54+
*/
55+
@Override
56+
public void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
57+
for (ConsumerRecord<?, ?> record : records) {
58+
59+
Headers headers = record.headers();
60+
int headerCount = 0;
61+
Iterable<Header> iterator = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT);
62+
for (Header header : iterator) {
63+
headerCount += 1;
64+
}
65+
66+
if (headerCount > 0) {
67+
headers.remove(KafkaHeaders.DELIVERY_ATTEMPT);
68+
}
69+
70+
byte[] buff = new byte[4]; // NOSONAR (magic #)
71+
ByteBuffer bb = ByteBuffer.wrap(buff);
72+
bb.putInt(deliveryAttempt);
73+
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
74+
}
75+
}
76+
77+
}
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/*
2+
* Copyright 2019-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.fail;
21+
22+
import java.nio.ByteBuffer;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import org.apache.kafka.clients.admin.AdminClientConfig;
31+
import org.apache.kafka.clients.consumer.ConsumerConfig;
32+
import org.apache.kafka.clients.consumer.ConsumerRecord;
33+
import org.apache.kafka.clients.producer.ProducerConfig;
34+
import org.apache.kafka.common.header.Header;
35+
import org.apache.kafka.common.serialization.StringDeserializer;
36+
import org.apache.kafka.common.serialization.StringSerializer;
37+
import org.junit.jupiter.api.Test;
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.annotation.Bean;
40+
import org.springframework.context.annotation.Configuration;
41+
import org.springframework.kafka.annotation.EnableKafka;
42+
import org.springframework.kafka.annotation.KafkaListener;
43+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
44+
import org.springframework.kafka.core.ConsumerFactory;
45+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
46+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
47+
import org.springframework.kafka.core.KafkaAdmin;
48+
import org.springframework.kafka.core.KafkaTemplate;
49+
import org.springframework.kafka.core.ProducerFactory;
50+
import org.springframework.kafka.support.KafkaHeaders;
51+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
52+
import org.springframework.kafka.test.context.EmbeddedKafka;
53+
import org.springframework.test.annotation.DirtiesContext;
54+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
55+
import org.springframework.util.backoff.FixedBackOff;
56+
57+
/**
58+
* @author Sanghyeok An
59+
* @since 3.3.0
60+
*/
61+
62+
@SpringJUnitConfig
63+
@DirtiesContext
64+
@EmbeddedKafka
65+
class DeliveryAttemptAwareRetryListenerIntegrationTest {
66+
67+
static final String MAIN_TOPIC_CONTAINER_FACTORY0 = "kafkaListenerContainerFactory0";
68+
69+
static final String TEST_TOPIC0 = "myBatchDeliveryAttemptTopic0";
70+
71+
static final int MAX_ATTEMPT_COUNT0 = 3;
72+
73+
static final CountDownLatch latch0 = new CountDownLatch(MAX_ATTEMPT_COUNT0 + 1);
74+
75+
static final String MAIN_TOPIC_CONTAINER_FACTORY1 = "kafkaListenerContainerFactory1";
76+
77+
static final String TEST_TOPIC1 = "myBatchDeliveryAttemptTopic1";
78+
79+
static final int MAX_ATTEMPT_COUNT1 = 10;
80+
81+
static final CountDownLatch latch1 = new CountDownLatch(MAX_ATTEMPT_COUNT1 + 1);
82+
83+
@Autowired
84+
private KafkaTemplate<String, String> kafkaTemplate;
85+
86+
@Test
87+
void should_have_delivery_attempt_header_in_each_consumer_record(@Autowired TestTopicListener0 listener) {
88+
89+
// Given
90+
String msg1 = "1";
91+
String msg2 = "2";
92+
String msg3 = "3";
93+
94+
// When
95+
kafkaTemplate.send(TEST_TOPIC0, msg1);
96+
kafkaTemplate.send(TEST_TOPIC0, msg2);
97+
kafkaTemplate.send(TEST_TOPIC0, msg3);
98+
99+
// Then
100+
assertThat(awaitLatch(latch0)).isTrue();
101+
102+
Map<Integer, Integer> deliveryAttemptCountMap = convertToMap(listener.receivedHeaders);
103+
104+
for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT0; attemptCnt++) {
105+
assertThat(deliveryAttemptCountMap.get(attemptCnt)).isEqualTo(3);
106+
}
107+
}
108+
109+
@Test
110+
void should_have_delivery_attempt_header_in_each_consumer_record_with_more_bigger_max_attempt(@Autowired TestTopicListener1 listener) {
111+
// Given
112+
String msg1 = "1";
113+
String msg2 = "2";
114+
String msg3 = "3";
115+
116+
// When
117+
kafkaTemplate.send(TEST_TOPIC1, msg1);
118+
kafkaTemplate.send(TEST_TOPIC1, msg2);
119+
kafkaTemplate.send(TEST_TOPIC1, msg3);
120+
121+
// Then
122+
assertThat(awaitLatch(latch1)).isTrue();
123+
124+
Map<Integer, Integer> deliveryAttemptCountMap = convertToMap(listener.receivedHeaders);
125+
126+
for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT1; attemptCnt++) {
127+
assertThat(deliveryAttemptCountMap.get(attemptCnt)).isEqualTo(3);
128+
}
129+
}
130+
131+
132+
private Map<Integer, Integer> convertToMap(List<Header> headers) {
133+
Map<Integer, Integer> map = new HashMap<>();
134+
for (Header header : headers) {
135+
int attemptCount = ByteBuffer.wrap(header.value()).getInt();
136+
Integer cnt = map.getOrDefault(attemptCount, 0);
137+
map.put(attemptCount, cnt + 1);
138+
}
139+
return map;
140+
}
141+
142+
143+
static class TestTopicListener0 {
144+
final List<Header> receivedHeaders = new ArrayList<>();
145+
146+
@KafkaListener(
147+
topics = TEST_TOPIC0,
148+
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY0,
149+
batch = "true")
150+
public void listen(List<ConsumerRecord<?, ?>> records) {
151+
latch0.countDown();
152+
for (ConsumerRecord<?, ?> record : records) {
153+
Iterable<Header> headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT);
154+
for (Header header : headers) {
155+
receivedHeaders.add(header);
156+
}
157+
}
158+
throw new RuntimeException("Failed.");
159+
}
160+
}
161+
162+
static class TestTopicListener1 {
163+
final List<Header> receivedHeaders = new ArrayList<>();
164+
165+
@KafkaListener(
166+
topics = TEST_TOPIC1,
167+
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY1,
168+
batch = "true")
169+
public void listen(List<ConsumerRecord<?, ?>> records) {
170+
latch1.countDown();
171+
for (ConsumerRecord<?, ?> record : records) {
172+
Iterable<Header> headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT);
173+
for (Header header : headers) {
174+
receivedHeaders.add(header);
175+
}
176+
}
177+
throw new RuntimeException("Failed.");
178+
}
179+
}
180+
181+
@Configuration
182+
static class TestConfiguration {
183+
184+
@Bean
185+
TestTopicListener0 testTopicListener0() {
186+
return new TestTopicListener0();
187+
}
188+
189+
@Bean
190+
TestTopicListener1 testTopicListener1() {
191+
return new TestTopicListener1();
192+
}
193+
}
194+
195+
@Configuration
196+
static class KafkaProducerConfig {
197+
198+
@Autowired
199+
EmbeddedKafkaBroker broker;
200+
201+
@Bean
202+
ProducerFactory<String, String> producerFactory() {
203+
Map<String, Object> configProps = new HashMap<>();
204+
configProps.put(
205+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
206+
this.broker.getBrokersAsString());
207+
configProps.put(
208+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
209+
StringSerializer.class);
210+
configProps.put(
211+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
212+
StringSerializer.class);
213+
return new DefaultKafkaProducerFactory<>(configProps);
214+
}
215+
216+
@Bean("customKafkaTemplate")
217+
KafkaTemplate<String, String> kafkaTemplate() {
218+
return new KafkaTemplate<>(producerFactory());
219+
}
220+
}
221+
222+
@EnableKafka
223+
@Configuration
224+
static class KafkaConsumerConfig {
225+
226+
@Autowired
227+
EmbeddedKafkaBroker broker;
228+
229+
@Bean
230+
KafkaAdmin kafkaAdmin() {
231+
Map<String, Object> configs = new HashMap<>();
232+
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
233+
return new KafkaAdmin(configs);
234+
}
235+
236+
@Bean
237+
ConsumerFactory<String, String> consumerFactory() {
238+
Map<String, Object> props = new HashMap<>();
239+
props.put(
240+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
241+
this.broker.getBrokersAsString());
242+
props.put(
243+
ConsumerConfig.GROUP_ID_CONFIG,
244+
"groupId");
245+
props.put(
246+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
247+
StringDeserializer.class);
248+
props.put(
249+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
250+
StringDeserializer.class);
251+
props.put(
252+
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
253+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
254+
255+
return new DefaultKafkaConsumerFactory<>(props);
256+
}
257+
258+
@Bean
259+
ConcurrentKafkaListenerContainerFactory<String, String>
260+
kafkaListenerContainerFactory0(ConsumerFactory<String, String> consumerFactory) {
261+
262+
final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT0);
263+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
264+
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
265+
266+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
267+
factory.setConsumerFactory(consumerFactory);
268+
factory.setCommonErrorHandler(errorHandler);
269+
270+
271+
final ContainerProperties containerProperties = factory.getContainerProperties();
272+
containerProperties.setDeliveryAttemptHeader(true);
273+
274+
return factory;
275+
}
276+
277+
@Bean
278+
ConcurrentKafkaListenerContainerFactory<String, String>
279+
kafkaListenerContainerFactory1(ConsumerFactory<String, String> consumerFactory) {
280+
281+
final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT1);
282+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
283+
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
284+
285+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
286+
factory.setConsumerFactory(consumerFactory);
287+
factory.setCommonErrorHandler(errorHandler);
288+
289+
290+
final ContainerProperties containerProperties = factory.getContainerProperties();
291+
containerProperties.setDeliveryAttemptHeader(true);
292+
293+
return factory;
294+
}
295+
296+
297+
}
298+
299+
private boolean awaitLatch(CountDownLatch latch) {
300+
try {
301+
return latch.await(60, TimeUnit.SECONDS);
302+
}
303+
catch (Exception e) {
304+
fail(e.getMessage());
305+
throw new RuntimeException(e);
306+
}
307+
}
308+
309+
}

0 commit comments

Comments
 (0)