Skip to content

Cannot deserialize TopicPartition from JobRepository #3797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
MinJunKweon opened this issue Nov 2, 2020 · 4 comments · May be fixed by #4863
Open

Cannot deserialize TopicPartition from JobRepository #3797

MinJunKweon opened this issue Nov 2, 2020 · 4 comments · May be fixed by #4863

Comments

@MinJunKweon
Copy link
Contributor

MinJunKweon commented Nov 2, 2020

Hi.

I use MySQL for JobRepository. It serialize ExecutionContext as String by JacksonObjectMapper.
It seems to forcing to Map's key type must be String. (Map<String, Object>)
You can see this.

For Example, SHORT_CONTEXT in BATCH_STEP_EXECUTION_CONTEXT:

{"batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet","topic.partition.offsets":["java.util.HashMap",{"test-topic":["java.lang.Long",42]}],"batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"}

However, KafkaItemReader uses TopicPartition as key. So It has problem in deserializing ExecutionContext. You can see this.

        @Override
	public void open(ExecutionContext executionContext) {
		...
		if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
			Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
			for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
				this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
			}
		}
                ...
	}
2020-11-02 14:30:50 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step testStep in job testJob
java.lang.ClassCastException: java.lang.String incompatible with org.apache.kafka.common.TopicPartition
	at org.springframework.batch.item.kafka.KafkaItemReader$$Lambda$911/00000000EF270020.accept(Unknown Source)
	at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
	at org.springframework.batch.item.kafka.KafkaItemReader.open(KafkaItemReader.java:174)
	at org.springframework.batch.item.kafka.KafkaItemReader$$FastClassBySpringCGLIB$$9111feb4.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
	at org.springframework.batch.item.kafka.KafkaItemReader$$EnhancerBySpringCGLIB$$314cf4f9.open(<generated>)
	at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:104)
	at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy92.execute(Unknown Source)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
	at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
	at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy129.run(Unknown Source)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153)
	at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148)
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
        ...

I think It should be deserialize by String first.
And then, convert String to TopicPartition in KafkaItemReader.

Like this,

Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
        TopicPartition topicPartition = getTopicPartitionFromString(entry.getKey());
	this.partitionOffsets.put(topicPartition, entry.getValue() == 0 ? 0 : entry.getValue() + 1);
}
@MinJunKweon MinJunKweon added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Nov 2, 2020
@langzime
Copy link

same question

@noojung
Copy link

noojung commented Dec 23, 2024

same question too

@fmbenhassine fmbenhassine added this to the 5.2.2 milestone Dec 23, 2024
@fmbenhassine fmbenhassine added in: core and removed status: waiting-for-triage Issues that we did not analyse yet labels Dec 23, 2024
@fmbenhassine fmbenhassine modified the milestones: 5.2.2, 5.2.3 Mar 18, 2025
@noojung
Copy link

noojung commented Jun 2, 2025

Jackson2ExecutionContextStringSerializer always forces all map keys to be String.
So we can't use Map<TopicPartition, Long> directly.

Instead, I think we can store only the partition number (as a String) in update(),nd then reconstruct the full TopicPartition in open() by using the topic name provided to the constructor.

For example:

	@Override
	public void update(ExecutionContext executionContext) {
		if (this.saveState) {
			Map<String, Long> offsets = new HashMap<>();
			for (Map.Entry<TopicPartition, Long> entry : this.partitionOffsets.entrySet()) {
				offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue());
			}
			executionContext.put(TOPIC_PARTITION_OFFSETS, offsets);
		}
		this.kafkaConsumer.commitSync();
	}

	@Override
	public void open(ExecutionContext executionContext) {
		this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
		if (this.partitionOffsets == null) {
			this.partitionOffsets = new HashMap<>();
			for (TopicPartition topicPartition : this.topicPartitions) {
				this.partitionOffsets.put(topicPartition, 0L);
			}
		}
		if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
			Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
			for (Map.Entry<String, Long> entry : offsets.entrySet()) {
				String topicName = this.topicPartitions.get(0).topic();
				this.partitionOffsets.put(new TopicPartition(topicName, Integer.parseInt(entry.getKey())), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
			}
		}
		this.kafkaConsumer.assign(this.topicPartitions);
		this.partitionOffsets.forEach(this.kafkaConsumer::seek);
	}

baezzys pushed a commit to baezzys/spring-batch that referenced this issue Jun 3, 2025
- Convert TopicPartition keys to String format (topic-partition) when saving to ExecutionContext
- Support both String and TopicPartition keys when restoring from ExecutionContext
- Prevent ClassCastException when using JobRepository with Jackson serialization

Resolves spring-projects#3797
baezzys pushed a commit to baezzys/spring-batch that referenced this issue Jun 3, 2025
…tory (spring-projects#3797)

- Replace Map<TopicPartition, Long> with List<Map<String, Object>> for offset storage
- Ensure compatibility with Jackson serialization used by JobRepository
- Add tests to verify proper serialization/deserialization of execution context
- Maintain backward compatibility while fixing ClassCastException on job restart

Resolves spring-projects#3797
@noojung
Copy link

noojung commented Jun 4, 2025

Could I work on this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants