Skip to content

Commit 4f38421

Browse files
committed
Apply optimistic locking for MongoDB repository
Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent 2bd5b84 commit 4f38421

File tree

6 files changed

+70
-6
lines changed

6 files changed

+70
-6
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoJobExecutionDao.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.batch.core.JobInstance;
2424
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2525
import org.springframework.batch.core.repository.persistence.converter.JobInstanceConverter;
26+
import org.springframework.dao.OptimisticLockingFailureException;
2627
import org.springframework.data.domain.Sort;
2728
import org.springframework.data.mongodb.core.MongoOperations;
2829
import org.springframework.data.mongodb.core.query.Query;
@@ -33,6 +34,7 @@
3334

3435
/**
3536
* @author Mahmoud Ben Hassine
37+
* @author Yanming Zhou
3638
* @since 5.2.0
3739
*/
3840
public class MongoJobExecutionDao implements JobExecutionDao {
@@ -72,10 +74,16 @@ public void saveJobExecution(JobExecution jobExecution) {
7274

7375
@Override
7476
public void updateJobExecution(JobExecution jobExecution) {
75-
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
77+
Query query = query(
78+
where("jobExecutionId").is(jobExecution.getId()).and("version").is(jobExecution.getVersion()));
7679
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter
7780
.fromJobExecution(jobExecution);
78-
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
81+
jobExecutionToUpdate.incrementVersion();
82+
if (this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME) == null) {
83+
throw new OptimisticLockingFailureException("Attempt to update step execution id=" + jobExecution.getId()
84+
+ " with wrong version (" + jobExecution.getVersion() + ")");
85+
}
86+
jobExecution.incrementVersion();
7987
}
8088

8189
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.batch.core.StepExecution;
2727
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
2828
import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter;
29+
import org.springframework.dao.OptimisticLockingFailureException;
2930
import org.springframework.data.mongodb.core.MongoOperations;
3031
import org.springframework.data.mongodb.core.query.Query;
3132
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
@@ -35,6 +36,7 @@
3536

3637
/**
3738
* @author Mahmoud Ben Hassine
39+
* @author Yanming Zhou
3840
* @since 5.2.0
3941
*/
4042
public class MongoStepExecutionDao implements StepExecutionDao {
@@ -81,10 +83,17 @@ public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
8183

8284
@Override
8385
public void updateStepExecution(StepExecution stepExecution) {
84-
Query query = query(where("stepExecutionId").is(stepExecution.getId()));
86+
Query query = query(
87+
where("stepExecutionId").is(stepExecution.getId()).and("version").is(stepExecution.getVersion()));
8588
org.springframework.batch.core.repository.persistence.StepExecution stepExecutionToUpdate = this.stepExecutionConverter
8689
.fromStepExecution(stepExecution);
87-
this.mongoOperations.findAndReplace(query, stepExecutionToUpdate, STEP_EXECUTIONS_COLLECTION_NAME);
90+
stepExecutionToUpdate.incrementVersion();
91+
if (this.mongoOperations.findAndReplace(query, stepExecutionToUpdate,
92+
STEP_EXECUTIONS_COLLECTION_NAME) == null) {
93+
throw new OptimisticLockingFailureException("Attempt to update step execution id=" + stepExecution.getId()
94+
+ " with wrong version (" + stepExecution.getVersion() + ")");
95+
}
96+
stepExecution.incrementVersion();
8897
}
8998

9099
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/JobExecution.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
/**
2727
* @author Mahmoud Ben Hassine
28+
* @author Yanming Zhou
2829
* @since 5.2.0
2930
*/
3031
public class JobExecution {
@@ -53,6 +54,8 @@ public class JobExecution {
5354

5455
private ExecutionContext executionContext;
5556

57+
private Integer version;
58+
5659
public JobExecution() {
5760
}
5861

@@ -148,13 +151,30 @@ public void setExecutionContext(ExecutionContext executionContext) {
148151
this.executionContext = executionContext;
149152
}
150153

154+
public Integer getVersion() {
155+
return version;
156+
}
157+
158+
public void setVersion(Integer version) {
159+
this.version = version;
160+
}
161+
162+
public void incrementVersion() {
163+
if (version == null) {
164+
version = 0;
165+
}
166+
else {
167+
version = version + 1;
168+
}
169+
}
170+
151171
@Override
152172
public String toString() {
153173
return "JobExecution{" + "id='" + id + '\'' + ", jobExecutionId=" + jobExecutionId + ", jobInstanceId="
154174
+ jobInstanceId + ", jobParameters=" + jobParameters + ", stepExecutions=" + stepExecutions
155175
+ ", status=" + status + ", startTime=" + startTime + ", createTime=" + createTime + ", endTime="
156176
+ endTime + ", lastUpdated=" + lastUpdated + ", exitStatus=" + exitStatus + ", executionContext="
157-
+ executionContext + '}';
177+
+ executionContext + ", version=" + version + '}';
158178
}
159179

160180
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/StepExecution.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
/**
2323
* @author Mahmoud Ben Hassine
24+
* @author Yanming Zhou
2425
* @since 5.2.0
2526
*/
2627
public class StepExecution {
@@ -65,6 +66,8 @@ public class StepExecution {
6566

6667
private boolean terminateOnly;
6768

69+
private Integer version;
70+
6871
public StepExecution() {
6972
}
7073

@@ -224,6 +227,23 @@ public void setTerminateOnly(boolean terminateOnly) {
224227
this.terminateOnly = terminateOnly;
225228
}
226229

230+
public Integer getVersion() {
231+
return version;
232+
}
233+
234+
public void setVersion(Integer version) {
235+
this.version = version;
236+
}
237+
238+
public void incrementVersion() {
239+
if (version == null) {
240+
version = 0;
241+
}
242+
else {
243+
version = version + 1;
244+
}
245+
}
246+
227247
@Override
228248
public String toString() {
229249
return "StepExecution{" + "id='" + id + '\'' + ", stepExecutionId=" + stepExecutionId + ", jobExecutionId='"
@@ -232,7 +252,8 @@ public String toString() {
232252
+ ", readSkipCount=" + readSkipCount + ", processSkipCount=" + processSkipCount + ", writeSkipCount="
233253
+ writeSkipCount + ", filterCount=" + filterCount + ", startTime=" + startTime + ", createTime="
234254
+ createTime + ", endTime=" + endTime + ", lastUpdated=" + lastUpdated + ", executionContext="
235-
+ executionContext + ", exitStatus=" + exitStatus + ", terminateOnly=" + terminateOnly + '}';
255+
+ executionContext + ", exitStatus=" + exitStatus + ", terminateOnly=" + terminateOnly + ", version="
256+
+ version + '}';
236257
}
237258

238259
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/JobExecutionConverter.java

+3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
/**
2929
* @author Mahmoud Ben Hassine
30+
* @author Yanming Zhou
3031
* @since 5.2.0
3132
*/
3233
public class JobExecutionConverter {
@@ -54,6 +55,7 @@ public org.springframework.batch.core.JobExecution toJobExecution(JobExecution s
5455
source.getExitStatus().exitDescription()));
5556
jobExecution.setExecutionContext(
5657
new org.springframework.batch.item.ExecutionContext(source.getExecutionContext().map()));
58+
jobExecution.setVersion(source.getVersion());
5759
return jobExecution;
5860
}
5961

@@ -77,6 +79,7 @@ public JobExecution fromJobExecution(org.springframework.batch.core.JobExecution
7779
new ExitStatus(source.getExitStatus().getExitCode(), source.getExitStatus().getExitDescription()));
7880
org.springframework.batch.item.ExecutionContext executionContext = source.getExecutionContext();
7981
jobExecution.setExecutionContext(new ExecutionContext(executionContext.toMap(), executionContext.isDirty()));
82+
jobExecution.setVersion(source.getVersion());
8083
return jobExecution;
8184
}
8285

spring-batch-core/src/main/java/org/springframework/batch/core/repository/persistence/converter/StepExecutionConverter.java

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
/**
2424
* @author Mahmoud Ben Hassine
25+
* @author Yanming Zhou
2526
* @since 5.2.0
2627
*/
2728
public class StepExecutionConverter {
@@ -50,6 +51,7 @@ public org.springframework.batch.core.StepExecution toStepExecution(StepExecutio
5051
if (source.isTerminateOnly()) {
5152
stepExecution.setTerminateOnly();
5253
}
54+
stepExecution.setVersion(source.getVersion());
5355
return stepExecution;
5456
}
5557

@@ -77,6 +79,7 @@ public StepExecution fromStepExecution(org.springframework.batch.core.StepExecut
7779
org.springframework.batch.item.ExecutionContext executionContext = source.getExecutionContext();
7880
stepExecution.setExecutionContext(new ExecutionContext(executionContext.toMap(), executionContext.isDirty()));
7981
stepExecution.setTerminateOnly(source.isTerminateOnly());
82+
stepExecution.setVersion(source.getVersion());
8083
return stepExecution;
8184
}
8285

0 commit comments

Comments
 (0)