Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.aot.hint.annotation.RegisterReflectionForBinding;
import ca.bc.gov.nrs.vdyp.batch.model.BatchRecord;
import ca.bc.gov.nrs.vdyp.batch.controller.BatchJobRequest;
Expand All @@ -15,17 +17,46 @@ public class VdypBatchApplication {
private static final Logger logger = LoggerFactory.getLogger(VdypBatchApplication.class);

public static void main(String[] args) {
// Override VDYP properties before any VdypComponent initialization
// This ensures VdypComponent uses correct values regardless of classpath order
overrideVdypProperties();

SpringApplication.run(VdypBatchApplication.class, args);
}

/**
* This method is called once when the application is fully started and ready.
* Using ApplicationReadyEvent ensures
* the startup message is shown only once, not on every batch job execution.
*/
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
String separator = "============================================================";
logger.info(separator);
logger.info("VDYP Batch Processing Service Started!");
logger.info("API Endpoints:");
logger.info(" POST /api/batch/start - Start batch job");
logger.info(" GET /api/batch/status/{id} - Check job status");
logger.info(" GET /api/batch/status/{{id}} - Check job status");
logger.info(" GET /api/batch/jobs - List recent jobs");
logger.info(" GET /api/batch/metrics/{id} - Get detailed job metrics");
logger.info(" GET /api/batch/statistics - Get batch statistics");
logger.info(" GET /api/batch/metrics/{{id}} - Get detailed job metrics");
logger.info(" GET /api/batch/health - Health check");
logger.info(separator);
}

private static void overrideVdypProperties() {
// Create a ClassLoader that prioritizes the application.properties
Thread.currentThread().setContextClassLoader(new ClassLoader(Thread.currentThread().getContextClassLoader()) {
@Override
public java.io.InputStream getResourceAsStream(String name) {
if ("application.properties".equals(name)) {
// Return the batch module's application.properties first
java.io.InputStream stream = VdypBatchApplication.class.getClassLoader().getResourceAsStream(name);
if (stream != null) {
return stream;
}
}
return super.getResourceAsStream(name);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
public class BatchProperties {

private Job job = new Job();
private Input input = new Input();
private Output output = new Output();
private Partitioning partitioning = new Partitioning();
private ThreadPool threadPool = new ThreadPool();
private Validation validation = new Validation();
private Error error = new Error();
private Retry retry = new Retry();
private Skip skip = new Skip();
private Vdyp vdyp = new Vdyp();

public static class Job {
private boolean autoCreate = true;
Expand All @@ -33,22 +32,8 @@ public void setAutoCreate(boolean autoCreate) {
}
}

public static class Input {
private String filePath;

public String getFilePath() {
return filePath;
}

public void setFilePath(String filePath) {
this.filePath = filePath;
}
}

public static class Output {
private Directory directory = new Directory();
private String filePrefix;
private String csvHeader;

public static class Directory {
private String defaultPath;
Expand All @@ -70,35 +55,10 @@ public void setDirectory(Directory directory) {
this.directory = directory;
}

public String getFilePrefix() {
return filePrefix;
}

public void setFilePrefix(String filePrefix) {
this.filePrefix = filePrefix;
}

public String getCsvHeader() {
return csvHeader;
}

public void setCsvHeader(String csvHeader) {
this.csvHeader = csvHeader;
}
}

public static class Partitioning {
private boolean enabled = true;
private int gridSize;
private int chunkSize;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public int getGridSize() {
return gridSize;
Expand All @@ -107,14 +67,6 @@ public int getGridSize() {
public void setGridSize(int gridSize) {
this.gridSize = gridSize;
}

public int getChunkSize() {
return chunkSize;
}

public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
}

public static class Retry {
Expand Down Expand Up @@ -198,36 +150,57 @@ public void setMaxPolygonIdLength(int maxPolygonIdLength) {
}
}

public static class Error {
private String transientPatterns;
private int maxConsecutiveFailures;
public static class Skip {
private int maxCount;

public String getTransientPatterns() {
return transientPatterns;
public int getMaxCount() {
return maxCount;
}

public void setTransientPatterns(String transientPatterns) {
this.transientPatterns = transientPatterns;
public void setMaxCount(int maxCount) {
this.maxCount = maxCount;
}
}

public int getMaxConsecutiveFailures() {
return maxConsecutiveFailures;
}
public static class Vdyp {
private Projection projection = new Projection();

public void setMaxConsecutiveFailures(int maxConsecutiveFailures) {
this.maxConsecutiveFailures = maxConsecutiveFailures;
}
}
public static class Projection {
private String polygonFile;
private String layerFile;
private String parametersFile;

public static class Skip {
private int maxCount;
public String getPolygonFile() {
return polygonFile;
}

public int getMaxCount() {
return maxCount;
public void setPolygonFile(String polygonFile) {
this.polygonFile = polygonFile;
}

public String getLayerFile() {
return layerFile;
}

public void setLayerFile(String layerFile) {
this.layerFile = layerFile;
}

public String getParametersFile() {
return parametersFile;
}

public void setParametersFile(String parametersFile) {
this.parametersFile = parametersFile;
}
}

public void setMaxCount(int maxCount) {
this.maxCount = maxCount;
public Projection getProjection() {
return projection;
}

public void setProjection(Projection projection) {
this.projection = projection;
}
}

Expand All @@ -239,14 +212,6 @@ public void setJob(Job job) {
this.job = job;
}

public Input getInput() {
return input;
}

public void setInput(Input input) {
this.input = input;
}

public Output getOutput() {
return output;
}
Expand Down Expand Up @@ -287,19 +252,19 @@ public void setValidation(Validation validation) {
this.validation = validation;
}

public Error getError() {
return error;
}

public void setError(Error error) {
this.error = error;
}

public Skip getSkip() {
return skip;
}

public void setSkip(Skip skip) {
this.skip = skip;
}

public Vdyp getVdyp() {
return vdyp;
}

public void setVdyp(Vdyp vdyp) {
this.vdyp = vdyp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ private BatchRecord extractRecord(Throwable t) {
return cachedRecord;
}

// Fallback: create a basic record with the extracted ID for tracking
// Fallback: create a basic record with the extracted recordId as featureId for tracking
BatchRecord batchRecord = new BatchRecord();
batchRecord.setId(recordId);
batchRecord.setFeatureId(String.valueOf(recordId));
return batchRecord;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import java.util.Collection;

/**
* Dynamic partition handler for VDYP batch processing that adjusts grid size based on job parameters.
* FEATURE_ID-based dynamic partition handler for VDYP batch processing.
*
* This handler manages the FEATURE_ID-based partitioning strategy, setting up polygon and layer resources for the
* DynamicPartitioner to ensure complete polygon data integrity across all partitions.
*
* This handler manages the parallel execution of VDYP processing partitions, allowing runtime configuration of
* partition count.
* partition count while maintaining FEATURE_ID-based data integrity.
*/
public class DynamicPartitionHandler implements PartitionHandler {

Expand Down Expand Up @@ -49,8 +52,6 @@ public DynamicPartitionHandler(
// Get dynamic parameters from job parameters
JobParameters jobParameters = masterStepExecution.getJobExecution().getJobParameters();
Long partitionSize = jobParameters.getLong("partitionSize");
Long chunkSize = jobParameters.getLong("chunkSize");
String inputFilePath = jobParameters.getString("inputFilePath");

// Get grid size
int actualGridSize;
Expand All @@ -62,35 +63,26 @@ public DynamicPartitionHandler(
throw new IllegalStateException("No grid size specified in job parameters or properties. ");
}

// Get input file path
String actualInputFilePath = inputFilePath;
if (actualInputFilePath == null || actualInputFilePath.trim().isEmpty()) {
actualInputFilePath = batchProperties.getInput().getFilePath();
}
if (actualInputFilePath == null || actualInputFilePath.trim().isEmpty()) {
throw new IllegalStateException("No input file path specified in job parameters or properties. ");
}
// Create polygon resource for FEATURE_ID-based partitioning
String polygonFilePath = batchProperties.getVdyp().getProjection().getPolygonFile();

// Create input resource from file path
Resource inputResource;
if (actualInputFilePath.startsWith("classpath:")) {
inputResource = new ClassPathResource(actualInputFilePath.substring(10));
Resource polygonResource;
if (polygonFilePath.startsWith("classpath:")) {
polygonResource = new ClassPathResource(polygonFilePath.substring(10));
} else {
inputResource = new FileSystemResource(actualInputFilePath);
polygonResource = new FileSystemResource(polygonFilePath);
}

dynamicPartitioner.setInputResource(inputResource);
logger.info("[VDYP Partition Handler] Using input file: {}", actualInputFilePath);
// Set polygon resource for FEATURE_ID-based partitioning
dynamicPartitioner.setPolygonResource(polygonResource);

logger.info("[VDYP FEATURE_ID Partition Handler] Using polygon file: {}", polygonFilePath);

logger.info(
"VDYP dynamic partitioning: Using {} partitions (requested: {}, from properties: {})", actualGridSize,
partitionSize, batchProperties.getPartitioning().getGridSize()
"VDYP FEATURE_ID-based partitioning: Using {} partitions (requested: {}, from properties: {})",
actualGridSize, partitionSize, batchProperties.getPartitioning().getGridSize()
);

if (chunkSize != null) {
logger.info("VDYP dynamic chunk size: Requested {}", chunkSize.intValue());
}

// Create and configure TaskExecutorPartitionHandler with dynamic grid size
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(taskExecutor);
Expand Down
Loading
Loading