Skip to content

Commit 2da2a2b

Browse files
cditchergithubmamathainfstar
authored
Merging changes from hotfix 1.23.3 (#532)
* Update update-configmap.sh * Update pom.xml * GRAD2-2968: enable multi-partitioning to regenerate school reports in parallel. (#531) GRAD2-2968: enable multi-partitioning to regenerate school reports in parallel. --------- Co-authored-by: githubmamatha <106563495+githubmamatha@users.noreply.github.com> Co-authored-by: Jinil Sung <infstar@gmail.com>
1 parent aef7df2 commit 2da2a2b

9 files changed

+102
-114
lines changed

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/config/BatchJobConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,7 @@ public RegenerateSchoolReportsWriter itemWriterSchoolReportsRegen() {
10741074
@Bean
10751075
public Step schoolReportsRegenJobStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, SkipSQLTransactionExceptionsListener skipListener) {
10761076
return new StepBuilder("schoolReportsRegenJobStep", jobRepository)
1077-
.<List<String>, List<String>>chunk(1, transactionManager)
1077+
.<String, String>chunk(1, transactionManager)
10781078
.reader(itemReaderSchoolReportsRegen())
10791079
.processor(itemProcessorSchoolReportsRegen())
10801080
.writer(itemWriterSchoolReportsRegen())
@@ -1086,10 +1086,11 @@ public Step schoolReportsRegenJobStep(JobRepository jobRepository, PlatformTrans
10861086

10871087
@Bean
10881088
public Step masterStepSchoolReportsRegen(JobRepository jobRepository, PlatformTransactionManager transactionManager, EducGradBatchGraduationApiConstants constants, SkipSQLTransactionExceptionsListener skipListener) {
1089+
int partitionSize = constants.getNumberOfPartitions() / 2;
10891090
return new StepBuilder("masterStepSchoolReportsRegen", jobRepository)
10901091
.partitioner(schoolReportsRegenJobStep(jobRepository, transactionManager, skipListener).getName(), partitionerSchoolReportsRegen())
10911092
.step(schoolReportsRegenJobStep(jobRepository, transactionManager, skipListener))
1092-
.gridSize(constants.getNumberOfPartitions())
1093+
.gridSize(partitionSize != 0? partitionSize : 1)
10931094
.taskExecutor(taskExecutor())
10941095
.build();
10951096
}
Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package ca.bc.gov.educ.api.batchgraduation.listener;
22

3+
import ca.bc.gov.educ.api.batchgraduation.entity.BatchGradAlgorithmJobHistoryEntity;
4+
import ca.bc.gov.educ.api.batchgraduation.model.BaseSummaryDTO;
35
import ca.bc.gov.educ.api.batchgraduation.model.SchoolReportsRegenSummaryDTO;
46
import ca.bc.gov.educ.api.batchgraduation.util.DateUtils;
57
import lombok.extern.slf4j.Slf4j;
68
import org.springframework.batch.core.BatchStatus;
79
import org.springframework.batch.core.JobExecution;
8-
import org.springframework.batch.core.JobParameters;
910
import org.springframework.batch.item.ExecutionContext;
1011
import org.springframework.stereotype.Component;
1112

1213
import java.util.Date;
1314

14-
import static ca.bc.gov.educ.api.batchgraduation.util.EducGradBatchGraduationApiConstants.SEARCH_REQUEST;
15-
1615
@Slf4j
1716
@Component
1817
public class RegenSchoolReportsCompletionNotificationListener extends BaseRegenSchoolReportsCompletionNotificationListener {
@@ -22,34 +21,38 @@ public void afterJob(JobExecution jobExecution) {
2221
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
2322
long elapsedTimeMillis = getElapsedTimeMillis(jobExecution);
2423
log.info("=======================================================================================");
25-
JobParameters jobParameters = jobExecution.getJobParameters();
2624
ExecutionContext jobContext = jobExecution.getExecutionContext();
2725
Long jobExecutionId = jobExecution.getId();
28-
String jobType = jobParameters.getString("jobType");
29-
log.info("{} Regen School Reports Job {} completed in {} s with jobExecution status {}", jobType, jobExecutionId, elapsedTimeMillis / 1000, jobExecution.getStatus());
26+
log.info("Regen School Reports Job {} completed in {} s with jobExecution status {}", jobExecutionId, elapsedTimeMillis / 1000, jobExecution.getStatus());
3027

3128
String status = jobExecution.getStatus().toString();
32-
Date startTime = DateUtils.toDate(jobExecution.getStartTime());
3329
Date endTime = DateUtils.toDate(jobExecution.getEndTime());
34-
String jobTrigger = jobParameters.getString("jobTrigger");
3530

3631
SchoolReportsRegenSummaryDTO summaryDTO = (SchoolReportsRegenSummaryDTO) jobContext.get("schoolReportsRegenSummaryDTO");
3732

38-
String studentSearchRequest = jobParameters.getString(SEARCH_REQUEST, "{}");
3933
// display Summary Details
40-
log.info("Records read : {}", summaryDTO.getReadCount());
34+
assert summaryDTO != null;
35+
log.info("Records read : {}", summaryDTO.getReadCount());
4136
log.info("Processed count: {}", summaryDTO.getProcessedCount());
4237
log.info(" --------------------------------------------------------------------------------------");
4338
log.info("Errors:{}", summaryDTO.getErrors().size());
39+
log.info(" --------------------------------------------------------------------------------------");
40+
summaryDTO.getSchools().forEach(value -> log.debug("School Report regenerated for {}", value.getMincode()));
41+
// save batch job & error history
42+
saveBatchJobHistory(summaryDTO, jobExecutionId, status, endTime);
4443

45-
updateUserSchedulingJobs(jobParameters);
44+
}
45+
}
4646

47-
String jobParametersDTO = buildJobParametersDTO(jobType, studentSearchRequest, null, null);
48-
// save batch job & error history
49-
processBatchJobHistory(summaryDTO, jobExecutionId, status, jobTrigger, jobType, startTime, endTime, jobParametersDTO);
50-
log.info(" --------------------------------------------------------------------------------------");
51-
summaryDTO.getSchools().forEach((value) -> log.info("School {} number of Regen School Reports : {}", value.getMincode(), value.getNumberOfSchoolReports()));
47+
private void saveBatchJobHistory(BaseSummaryDTO summaryDTO, Long jobExecutionId, String status, Date endTime) {
48+
BatchGradAlgorithmJobHistoryEntity ent = gradBatchHistoryService.getGradAlgorithmJobHistory(jobExecutionId);
49+
if (ent != null) {
50+
ent.setActualStudentsProcessed(summaryDTO.getProcessedCount());
51+
ent.setFailedStudentsProcessed((int) summaryDTO.getErroredCount());
52+
ent.setEndTime(DateUtils.toLocalDateTime(endTime));
53+
ent.setStatus(status);
5254

55+
gradBatchHistoryService.saveGradAlgorithmJobHistory(ent);
5356
}
5457
}
5558
}

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/model/SchoolReportsRegenSummaryDTO.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import lombok.Data;
44
import lombok.NoArgsConstructor;
5-
import org.apache.commons.lang3.StringUtils;
65

76
import java.util.ArrayList;
87
import java.util.List;
@@ -11,9 +10,11 @@
1110
@NoArgsConstructor
1211
public class SchoolReportsRegenSummaryDTO extends BaseSummaryDTO {
1312

13+
private String reportBatchType; // REGALG or TVRRUN
14+
1415
private List<ProcessError> errors = new ArrayList<>();
1516
private List<School> globalList = new ArrayList<>();
1617
private List<School> schools = new ArrayList<>();
17-
private StudentSearchRequest studentSearchRequest;
1818

1919
}
20+

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/processor/RegenerateSchoolReportsProcessor.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,39 +7,28 @@
77
import org.springframework.beans.factory.annotation.Autowired;
88
import org.springframework.beans.factory.annotation.Value;
99

10-
import java.util.List;
11-
import java.util.stream.Collectors;
12-
1310
@Slf4j
14-
public class RegenerateSchoolReportsProcessor implements ItemProcessor<List<String>, List<String>> {
11+
public class RegenerateSchoolReportsProcessor implements ItemProcessor<String, String> {
1512

1613
@Autowired
1714
RestUtils restUtils;
1815

1916
@Value("#{stepExecutionContext['summary']}")
2017
SchoolReportsRegenSummaryDTO summaryDTO;
2118

19+
@Value("#{stepExecution.jobExecution.id}")
20+
Long batchId;
21+
2222
@Override
23-
public List<String> process(List<String> minCodes) throws Exception {
24-
Long batchId = summaryDTO.getBatchId();
25-
StudentSearchRequest searchRequest = summaryDTO.getStudentSearchRequest();
26-
long countRegeneratedSchoolReports = 0l;
27-
List<String> reportTypes = searchRequest.getReportTypes();
23+
public String process(String minCode) throws Exception {
24+
summaryDTO.setBatchId(batchId);
2825
if(log.isDebugEnabled()) {
29-
log.debug("Process Schools: {}", !minCodes.isEmpty() ? String.join(",", minCodes) : summaryDTO.getSchools().stream().map(School::getMincode).collect(Collectors.joining(",")));
26+
log.debug("Processing {} School Report: {} ", summaryDTO.getReportBatchType(), minCode);
3027
}
3128

32-
String reportType;
33-
if(reportTypes != null && !reportTypes.isEmpty() && "NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0)
34-
reportType = "TVRRUN";
35-
else
36-
reportType = "REGALG";
37-
38-
for (String minCode : minCodes) {
39-
countRegeneratedSchoolReports += restUtils.createAndStoreSchoolReports(minCode, reportType, summaryDTO);
40-
}
29+
long countRegeneratedSchoolReports = restUtils.createAndStoreSchoolReports(minCode, summaryDTO.getReportBatchType(), summaryDTO);
4130

42-
summaryDTO.setProcessedCount(countRegeneratedSchoolReports);
43-
return minCodes;
31+
summaryDTO.setProcessedCount(summaryDTO.getProcessedCount() + countRegeneratedSchoolReports);
32+
return minCode;
4433
}
4534
}

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/BaseSchoolReader.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package ca.bc.gov.educ.api.batchgraduation.reader;
22

3-
import ca.bc.gov.educ.api.batchgraduation.model.EdwSnapshotSchoolSummaryDTO;
4-
import ca.bc.gov.educ.api.batchgraduation.model.ResponseObj;
53
import ca.bc.gov.educ.api.batchgraduation.rest.RestUtils;
64
import org.springframework.batch.core.JobExecution;
75
import org.springframework.batch.item.ItemReader;
@@ -22,16 +20,7 @@ public abstract class BaseSchoolReader implements ItemReader<String> {
2220
@Value("#{stepExecutionContext['data']}")
2321
List<String> schools;
2422

25-
@Value("#{stepExecutionContext['summary']}")
26-
EdwSnapshotSchoolSummaryDTO summaryDTO;
27-
2823
@Value("#{stepExecution.jobExecution}")
2924
JobExecution jobExecution;
3025

31-
protected void fetchAccessToken() {
32-
ResponseObj res = restUtils.getTokenResponseObject();
33-
if (res != null) {
34-
summaryDTO.setAccessToken(res.getAccess_token());
35-
}
36-
}
3726
}

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/EDWSnapshotSchoolReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66
import org.springframework.batch.item.ItemReader;
7+
import org.springframework.beans.factory.annotation.Value;
78

89
public class EDWSnapshotSchoolReader extends BaseSchoolReader implements ItemReader<String> {
910

1011
private static final Logger LOGGER = LoggerFactory.getLogger(EDWSnapshotSchoolReader.class);
1112

13+
@Value("#{stepExecutionContext['summary']}")
14+
EdwSnapshotSchoolSummaryDTO summaryDTO;
15+
1216
@Override
1317
public String read() throws Exception {
1418
String nextSchool = null;
1519
if (nextSchoolForProcessing < schools.size()) {
16-
fetchAccessToken();
1720
nextSchool = schools.get(nextSchoolForProcessing);
1821
LOGGER.info("School: {} - {} of {}", nextSchool, nextSchoolForProcessing + 1, summaryDTO.getReadCount());
1922
nextSchoolForProcessing++;

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsPartitioner.java

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import java.util.*;
1212
import java.util.stream.Collectors;
1313

14-
import static ca.bc.gov.educ.api.batchgraduation.util.EducGradBatchGraduationApiConstants.SEARCH_REQUEST;
15-
1614
@Slf4j
1715
public class RegenerateSchoolReportsPartitioner extends BasePartitioner {
1816

@@ -51,22 +49,19 @@ public Map<String, ExecutionContext> partition(int gridSize) {
5149
}
5250

5351
summaryDTO.setBatchId(jobExecution.getId());
54-
summaryDTO.setStudentSearchRequest(searchRequest);
52+
summaryDTO.setReportBatchType(determineReportBatchType(searchRequest.getReportTypes()));
5553

5654
Long totalSchoolReportsCount = 0L;
57-
List<String> reportTypes = searchRequest.getReportTypes();
5855
Long schoolReportsCount = 0L;
5956

6057
List<String> finalSchoolDistricts = new ArrayList<>();
61-
List<SchoolReport> schoolReportsLite = new ArrayList<>();
58+
List<SchoolReport> schoolReportsLite;
6259

6360
if(processAllReports) {
64-
if (reportTypes != null && !reportTypes.isEmpty()) {
65-
if ("NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) {
66-
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType("NONGRADPRJ", summaryDTO);
67-
} else {
68-
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType( "GRADREG", summaryDTO);
69-
}
61+
if ("TVRRUN".compareToIgnoreCase(summaryDTO.getReportBatchType()) == 0) {
62+
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType("NONGRADPRJ", summaryDTO);
63+
} else {
64+
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType( "GRADREG", summaryDTO);
7065
}
7166

7267
if (schoolReportsLite != null && !schoolReportsLite.isEmpty()) {
@@ -81,41 +76,57 @@ public Map<String, ExecutionContext> partition(int gridSize) {
8176
totalSchoolReportsCount += finalSchoolDistricts.size();
8277
} else {
8378
for (String schoolOfRecord : schoolDistricts) {
84-
if (reportTypes != null && !reportTypes.isEmpty()) {
85-
if ("NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) {
86-
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "NONGRADPRJ", summaryDTO);
87-
} else {
88-
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "GRADREG", summaryDTO);
89-
}
90-
if (schoolReportsCount > 0) {
91-
finalSchoolDistricts.add(schoolOfRecord);
92-
School school = new School(schoolOfRecord);
93-
school.setNumberOfSchoolReports(schoolReportsCount);
94-
summaryDTO.getSchools().add(school);
95-
totalSchoolReportsCount += schoolReportsCount;
96-
}
97-
schoolReportsCount = 0L;
79+
if ("TVRRUN".compareToIgnoreCase(summaryDTO.getReportBatchType()) == 0) {
80+
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "NONGRADPRJ", summaryDTO);
81+
} else {
82+
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "GRADREG", summaryDTO);
83+
}
84+
if (schoolReportsCount > 0) {
85+
finalSchoolDistricts.add(schoolOfRecord);
86+
School school = new School(schoolOfRecord);
87+
school.setNumberOfSchoolReports(schoolReportsCount);
88+
summaryDTO.getSchools().add(school);
89+
totalSchoolReportsCount += schoolReportsCount;
9890
}
91+
schoolReportsCount = 0L;
9992
}
10093
}
10194

10295
long endTime = System.currentTimeMillis();
10396
long diff = (endTime - startTime)/1000;
10497
log.debug("Total {} schools after filters in {} sec", finalSchoolDistricts.size(), diff);
10598

106-
updateBatchJobHistory(createBatchJobHistory(), totalSchoolReportsCount);
10799
summaryDTO.setReadCount(totalSchoolReportsCount);
108100
summaryDTO.setProcessedCount(0);
109101

110-
Map<String, ExecutionContext> map = new HashMap<>();
111-
ExecutionContext executionContext = new ExecutionContext();
112-
executionContext.put(SEARCH_REQUEST, searchRequest);
113-
executionContext.put("data", finalSchoolDistricts);
114-
executionContext.put("summary", summaryDTO);
115-
executionContext.put("readCount", 0);
116-
map.put("partition0", executionContext);
102+
if (!finalSchoolDistricts.isEmpty()) {
103+
updateBatchJobHistory(createBatchJobHistory(), totalSchoolReportsCount);
104+
int partitionSize = finalSchoolDistricts.size()/gridSize + 1;
105+
List<List<String>> partitions = new LinkedList<>();
106+
for (int i = 0; i < finalSchoolDistricts.size(); i += partitionSize) {
107+
partitions.add(finalSchoolDistricts.subList(i, Math.min(i + partitionSize, finalSchoolDistricts.size())));
108+
}
109+
Map<String, ExecutionContext> map = new HashMap<>(partitions.size());
110+
for (int i = 0; i < partitions.size(); i++) {
111+
ExecutionContext executionContext = new ExecutionContext();
112+
SchoolReportsRegenSummaryDTO partitionSummaryDTO = new SchoolReportsRegenSummaryDTO();
113+
partitionSummaryDTO.setReportBatchType(summaryDTO.getReportBatchType());
114+
List<String> data = partitions.get(i);
115+
executionContext.put("data", data);
116+
partitionSummaryDTO.setReadCount(data.size());
117+
executionContext.put("summary", partitionSummaryDTO);
118+
executionContext.put("index",0);
119+
String key = "partition" + i;
120+
map.put(key, executionContext);
121+
}
122+
log.info("Found {} in total running on {} partitions",finalSchoolDistricts.size(),map.size());
123+
return map;
124+
}
125+
log.info("No Schools Found for School Reports Regeneration");
126+
return new HashMap<>();
127+
}
117128

118-
log.info("Found {} in total running on 1 partitions", totalSchoolReportsCount);
119-
return map;
129+
private String determineReportBatchType(List<String> reportTypes) {
130+
return reportTypes != null && !reportTypes.isEmpty() && "NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0 ? "TVRRUN" : "REGALG";
120131
}
121132
}

api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsReader.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,26 @@
22

33
import ca.bc.gov.educ.api.batchgraduation.model.SchoolReportsRegenSummaryDTO;
44
import lombok.extern.slf4j.Slf4j;
5-
import org.springframework.batch.core.JobExecution;
65
import org.springframework.batch.item.ItemReader;
76
import org.springframework.beans.factory.annotation.Value;
87

9-
import java.util.List;
10-
118
@Slf4j
12-
public class RegenerateSchoolReportsReader implements ItemReader<List<String>> {
13-
14-
@Value("#{stepExecutionContext['data']}")
15-
List<String> schools;
9+
public class RegenerateSchoolReportsReader extends BaseSchoolReader implements ItemReader<String> {
1610

1711
@Value("#{stepExecutionContext['summary']}")
1812
SchoolReportsRegenSummaryDTO summaryDTO;
1913

20-
@Value("#{stepExecution.jobExecution}")
21-
JobExecution jobExecution;
22-
23-
@Value("#{stepExecutionContext['readCount']}")
24-
Long readCount;
25-
2614
@Override
27-
public List<String> read() throws Exception {
28-
if(readCount > 0) return null;
29-
readCount++;
30-
if(log.isDebugEnabled()) {
31-
log.info("Read schools Codes -> {} of {} schools", readCount, schools.size());
15+
public String read() throws Exception {
16+
String nextSchool = null;
17+
if (nextSchoolForProcessing < schools.size()) {
18+
nextSchool = schools.get(nextSchoolForProcessing);
19+
log.info("School: {} - {} of {}", nextSchool, nextSchoolForProcessing + 1, summaryDTO.getReadCount());
20+
nextSchoolForProcessing++;
21+
} else {
22+
aggregate("schoolReportsRegenSummaryDTO");
3223
}
33-
aggregate("schoolReportsRegenSummaryDTO");
34-
return schools;
24+
return nextSchool;
3525
}
3626

3727
private void aggregate(String summaryContextName) {
@@ -41,7 +31,7 @@ private void aggregate(String summaryContextName) {
4131
jobExecution.getExecutionContext().put(summaryContextName, totalSummaryDTO);
4232
}
4333
totalSummaryDTO.setBatchId(summaryDTO.getBatchId());
44-
totalSummaryDTO.setReadCount(summaryDTO.getReadCount());
34+
totalSummaryDTO.setProcessedCount(totalSummaryDTO.getProcessedCount() + summaryDTO.getProcessedCount());
4535
totalSummaryDTO.getGlobalList().addAll(summaryDTO.getGlobalList());
4636
}
4737
}

0 commit comments

Comments
 (0)