Skip to content

Commit a5e43a0

Browse files
jinwoobaejinwoobae
authored andcommitted
Fix KafkaItemReader TopicPartition serialization issue with JobRepository (spring-projects#3797)
- Replace Map<TopicPartition, Long> with List<Map<String, Object>> for offset storage - Ensure compatibility with Jackson serialization used by JobRepository - Add tests to verify proper serialization/deserialization of execution context - Maintain backward compatibility while fixing ClassCastException on job restart Resolves spring-projects#3797
1 parent 08c4cb1 commit a5e43a0

File tree

2 files changed

+174
-13
lines changed

2 files changed

+174
-13
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,18 @@
4848
*
4949
* @author Mathieu Ouellet
5050
* @author Mahmoud Ben Hassine
51+
* @author Jinwoo Bae
5152
* @since 4.2
5253
*/
5354
public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
5455

5556
private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";
57+
58+
private static final String KEY_TOPIC = "topic";
59+
60+
private static final String KEY_PARTITION = "partition";
61+
62+
private static final String KEY_OFFSET = "offset";
5663

5764
private static final long DEFAULT_POLL_TIMEOUT = 30L;
5865

@@ -167,21 +174,45 @@ public void setPartitionOffsets(Map<TopicPartition, Long> partitionOffsets) {
167174
@Override
168175
public void open(ExecutionContext executionContext) {
169176
this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
177+
initializePartitionOffsets();
178+
179+
if (this.saveState && executionContext.containsKey(getExecutionContextKey(TOPIC_PARTITION_OFFSETS))) {
180+
List<Map<String, Object>> storedOffsets = (List<Map<String, Object>>) executionContext.get(
181+
getExecutionContextKey(TOPIC_PARTITION_OFFSETS));
182+
restorePartitionOffsets(storedOffsets);
183+
}
184+
185+
this.kafkaConsumer.assign(this.topicPartitions);
186+
this.partitionOffsets.forEach(this.kafkaConsumer::seek);
187+
}
188+
189+
/**
190+
* Initialize partition offsets with default values if not already set.
191+
*/
192+
private void initializePartitionOffsets() {
170193
if (this.partitionOffsets == null) {
171194
this.partitionOffsets = new HashMap<>();
172195
for (TopicPartition topicPartition : this.topicPartitions) {
173196
this.partitionOffsets.put(topicPartition, 0L);
174197
}
175198
}
176-
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
177-
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext
178-
.get(TOPIC_PARTITION_OFFSETS);
179-
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
180-
this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
181-
}
199+
}
200+
201+
/**
202+
* Restore partition offsets from the stored list.
203+
* Each entry in the list contains topic, partition, and offset information.
204+
* @param storedOffsets the offsets stored in execution context
205+
*/
206+
private void restorePartitionOffsets(List<Map<String, Object>> storedOffsets) {
207+
for (Map<String, Object> offsetEntry : storedOffsets) {
208+
String topic = (String) offsetEntry.get(KEY_TOPIC);
209+
Number partition = (Number) offsetEntry.get(KEY_PARTITION);
210+
Number offset = (Number) offsetEntry.get(KEY_OFFSET);
211+
212+
TopicPartition topicPartition = new TopicPartition(topic, partition.intValue());
213+
long offsetValue = offset.longValue();
214+
this.partitionOffsets.put(topicPartition, offsetValue == 0 ? 0 : offsetValue + 1);
182215
}
183-
this.kafkaConsumer.assign(this.topicPartitions);
184-
this.partitionOffsets.forEach(this.kafkaConsumer::seek);
185216
}
186217

187218
@Nullable
@@ -202,8 +233,18 @@ public V read() {
202233

203234
@Override
204235
public void update(ExecutionContext executionContext) {
205-
if (this.saveState) {
206-
executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
236+
if (this.saveState && this.partitionOffsets != null) {
237+
List<Map<String, Object>> offsetsToStore = new ArrayList<>();
238+
239+
this.partitionOffsets.forEach((topicPartition, offset) -> {
240+
Map<String, Object> offsetEntry = new HashMap<>();
241+
offsetEntry.put(KEY_TOPIC, topicPartition.topic());
242+
offsetEntry.put(KEY_PARTITION, topicPartition.partition());
243+
offsetEntry.put(KEY_OFFSET, offset);
244+
offsetsToStore.add(offsetEntry);
245+
});
246+
247+
executionContext.put(getExecutionContextKey(TOPIC_PARTITION_OFFSETS), offsetsToStore);
207248
}
208249
this.kafkaConsumer.commitSync();
209250
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

Lines changed: 123 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717
package org.springframework.batch.item.kafka;
1818

1919
import java.time.Duration;
20-
import java.util.Properties;
20+
import java.util.*;
2121

22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.apache.kafka.common.TopicPartition;
2225
import org.apache.kafka.common.serialization.StringDeserializer;
2326
import org.junit.jupiter.api.Test;
27+
import org.mockito.MockedConstruction;
2428

25-
import static org.junit.jupiter.api.Assertions.assertEquals;
26-
import static org.junit.jupiter.api.Assertions.assertThrows;
29+
import org.springframework.batch.item.ExecutionContext;
30+
31+
import static org.junit.jupiter.api.Assertions.*;
32+
import static org.mockito.Mockito.mockConstruction;
2733

2834
/**
2935
* @author Mathieu Ouellet
@@ -77,4 +83,118 @@ void testValidation() {
7783
assertEquals("pollTimeout must not be negative", exception.getMessage());
7884
}
7985

86+
@Test
87+
void testExecutionContextSerializationWithJackson() throws Exception {
88+
Properties consumerProperties = new Properties();
89+
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mockServer");
90+
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
91+
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
92+
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
93+
94+
KafkaItemReader<String, String> reader = new KafkaItemReader<>(consumerProperties, "testTopic", 0, 1);
95+
reader.setName("kafkaItemReader");
96+
97+
// Simulate how Jackson would serialize/deserialize the offset data
98+
ExecutionContext executionContext = new ExecutionContext();
99+
List<Map<String, Object>> offsets = new ArrayList<>();
100+
101+
Map<String, Object> offset1 = new HashMap<>();
102+
offset1.put("topic", "testTopic");
103+
offset1.put("partition", 0);
104+
offset1.put("offset", 100L);
105+
offsets.add(offset1);
106+
107+
Map<String, Object> offset2 = new HashMap<>();
108+
offset2.put("topic", "testTopic");
109+
offset2.put("partition", 1);
110+
offset2.put("offset", 200L);
111+
offsets.add(offset2);
112+
113+
// Simulate Jackson serialization/deserialization
114+
ObjectMapper objectMapper = new ObjectMapper();
115+
String serialized = objectMapper.writeValueAsString(offsets);
116+
List<Map<String, Object>> deserializedOffsets = objectMapper.readValue(serialized, List.class);
117+
118+
executionContext.put("kafkaItemReader.topic.partition.offsets", deserializedOffsets);
119+
120+
try (MockedConstruction<org.apache.kafka.clients.consumer.KafkaConsumer> mockedConstruction = mockConstruction(
121+
org.apache.kafka.clients.consumer.KafkaConsumer.class)) {
122+
123+
reader.open(executionContext);
124+
125+
ExecutionContext newContext = new ExecutionContext();
126+
reader.update(newContext);
127+
128+
List<Map<String, Object>> savedOffsets = (List<Map<String, Object>>) newContext.get("kafkaItemReader.topic.partition.offsets");
129+
assertNotNull(savedOffsets);
130+
assertEquals(2, savedOffsets.size());
131+
132+
boolean foundPartition0 = false;
133+
boolean foundPartition1 = false;
134+
for (Map<String, Object> offsetEntry : savedOffsets) {
135+
String topic = (String) offsetEntry.get("topic");
136+
Integer partition = (Integer) offsetEntry.get("partition");
137+
Long offset = (Long) offsetEntry.get("offset");
138+
139+
assertEquals("testTopic", topic);
140+
assertNotNull(offset);
141+
142+
if (partition == 0) {
143+
foundPartition0 = true;
144+
assertEquals(101L, offset); // restored offset + 1
145+
} else if (partition == 1) {
146+
foundPartition1 = true;
147+
assertEquals(201L, offset); // restored offset + 1
148+
}
149+
}
150+
151+
assertTrue(foundPartition0);
152+
assertTrue(foundPartition1);
153+
}
154+
}
155+
156+
@Test
157+
void testExecutionContextWithStringKeys() throws Exception {
158+
Properties consumerProperties = new Properties();
159+
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "mockServer");
160+
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
161+
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
162+
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
163+
164+
KafkaItemReader<String, String> reader = new KafkaItemReader<>(consumerProperties, "testTopic", 0, 1);
165+
reader.setName("kafkaItemReader");
166+
167+
// Create ExecutionContext with list of maps (as it would be after Jackson
168+
// deserialization)
169+
ExecutionContext executionContext = new ExecutionContext();
170+
List<Map<String, Object>> storedOffsets = new ArrayList<>();
171+
172+
Map<String, Object> offset1 = new HashMap<>();
173+
offset1.put("topic", "testTopic");
174+
offset1.put("partition", 0);
175+
offset1.put("offset", 100L);
176+
storedOffsets.add(offset1);
177+
178+
Map<String, Object> offset2 = new HashMap<>();
179+
offset2.put("topic", "testTopic");
180+
offset2.put("partition", 1);
181+
offset2.put("offset", 200L);
182+
storedOffsets.add(offset2);
183+
184+
executionContext.put("kafkaItemReader.topic.partition.offsets", storedOffsets);
185+
186+
try (MockedConstruction<org.apache.kafka.clients.consumer.KafkaConsumer> mockedConstruction = mockConstruction(
187+
org.apache.kafka.clients.consumer.KafkaConsumer.class)) {
188+
189+
reader.open(executionContext);
190+
191+
// Verify that offsets are saved correctly
192+
ExecutionContext newContext = new ExecutionContext();
193+
reader.update(newContext);
194+
195+
List<Map<String, Object>> savedOffsets = (List<Map<String, Object>>) newContext.get("kafkaItemReader.topic.partition.offsets");
196+
assertNotNull(savedOffsets);
197+
assertEquals(2, savedOffsets.size());
198+
}
199+
}
80200
}

0 commit comments

Comments
 (0)