diff --git a/batch/pom.xml b/batch/pom.xml
index b394b0d2f..7b139c541 100644
--- a/batch/pom.xml
+++ b/batch/pom.xml
@@ -133,16 +133,22 @@
net.revelc.code.formatter
formatter-maven-plugin
+
+ true
+
-
- format
-
-
- ../buildtools/src/main/resources/eclipse/formatter.xml
-
+ default
+ none
+
+
+ ca.bc.gov.nrs.vdyp
+ vdyp-buildtools
+ ${project.version}
+
+
@@ -224,4 +230,4 @@
-
\ No newline at end of file
+
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/VdypBatchApplication.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/VdypBatchApplication.java
index 929d506d0..dce4f3c7c 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/VdypBatchApplication.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/VdypBatchApplication.java
@@ -4,6 +4,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.aot.hint.annotation.RegisterReflectionForBinding;
import ca.bc.gov.nrs.vdyp.batch.model.BatchRecord;
import ca.bc.gov.nrs.vdyp.batch.controller.BatchJobRequest;
@@ -15,17 +17,46 @@ public class VdypBatchApplication {
private static final Logger logger = LoggerFactory.getLogger(VdypBatchApplication.class);
public static void main(String[] args) {
+ // Override VDYP properties before any VdypComponent initialization
+ // This ensures VdypComponent uses correct values regardless of classpath order
+ overrideVdypProperties();
+
SpringApplication.run(VdypBatchApplication.class, args);
+ }
+
+ /**
+ * This method is called once when the application is fully started and ready.
+ * Using ApplicationReadyEvent ensures
+ * the startup message is shown only once, not on every batch job execution.
+ */
+ @EventListener(ApplicationReadyEvent.class)
+ public void onApplicationReady() {
String separator = "============================================================";
logger.info(separator);
logger.info("VDYP Batch Processing Service Started!");
logger.info("API Endpoints:");
logger.info(" POST /api/batch/start - Start batch job");
- logger.info(" GET /api/batch/status/{id} - Check job status");
+ logger.info(" GET /api/batch/status/{{id}} - Check job status");
logger.info(" GET /api/batch/jobs - List recent jobs");
- logger.info(" GET /api/batch/metrics/{id} - Get detailed job metrics");
- logger.info(" GET /api/batch/statistics - Get batch statistics");
+ logger.info(" GET /api/batch/metrics/{{id}} - Get detailed job metrics");
logger.info(" GET /api/batch/health - Health check");
logger.info(separator);
}
+
+ private static void overrideVdypProperties() {
+ // Create a ClassLoader that prioritizes the application.properties
+ Thread.currentThread().setContextClassLoader(new ClassLoader(Thread.currentThread().getContextClassLoader()) {
+ @Override
+ public java.io.InputStream getResourceAsStream(String name) {
+ if ("application.properties".equals(name)) {
+ // Return the batch module's application.properties first
+ java.io.InputStream stream = VdypBatchApplication.class.getClassLoader().getResourceAsStream(name);
+ if (stream != null) {
+ return stream;
+ }
+ }
+ return super.getResourceAsStream(name);
+ }
+ });
+ }
}
\ No newline at end of file
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchProperties.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchProperties.java
index 067cbebb1..540a818ac 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchProperties.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchProperties.java
@@ -4,8 +4,8 @@
import org.springframework.stereotype.Component;
/**
- * Configuration properties for VDYP batch processing. This class handles all custom batch.* properties to eliminate
- * unknown property warnings.
+ * Configuration properties for VDYP batch processing. This class handles all
+ * custom batch.* properties to eliminate unknown property warnings.
*/
@Component
@ConfigurationProperties(prefix = "batch")
@@ -17,9 +17,9 @@ public class BatchProperties {
private Partitioning partitioning = new Partitioning();
private ThreadPool threadPool = new ThreadPool();
private Validation validation = new Validation();
- private Error error = new Error();
private Retry retry = new Retry();
private Skip skip = new Skip();
+ private Reader reader = new Reader();
public static class Job {
private boolean autoCreate = true;
@@ -34,21 +34,7 @@ public void setAutoCreate(boolean autoCreate) {
}
public static class Input {
- private String filePath;
-
- public String getFilePath() {
- return filePath;
- }
-
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
- }
-
- public static class Output {
private Directory directory = new Directory();
- private String filePrefix;
- private String csvHeader;
public static class Directory {
private String defaultPath;
@@ -70,35 +56,34 @@ public void setDirectory(Directory directory) {
this.directory = directory;
}
- public String getFilePrefix() {
- return filePrefix;
- }
+ }
+
+ public static class Output {
+ private Directory directory = new Directory();
+
+ public static class Directory {
+ private String defaultPath;
+
+ public String getDefaultPath() {
+ return defaultPath;
+ }
- public void setFilePrefix(String filePrefix) {
- this.filePrefix = filePrefix;
+ public void setDefaultPath(String defaultPath) {
+ this.defaultPath = defaultPath;
+ }
}
- public String getCsvHeader() {
- return csvHeader;
+ public Directory getDirectory() {
+ return directory;
}
- public void setCsvHeader(String csvHeader) {
- this.csvHeader = csvHeader;
+ public void setDirectory(Directory directory) {
+ this.directory = directory;
}
}
public static class Partitioning {
- private boolean enabled = true;
private int gridSize;
- private int chunkSize;
-
- public boolean isEnabled() {
- return enabled;
- }
-
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
- }
public int getGridSize() {
return gridSize;
@@ -107,14 +92,6 @@ public int getGridSize() {
public void setGridSize(int gridSize) {
this.gridSize = gridSize;
}
-
- public int getChunkSize() {
- return chunkSize;
- }
-
- public void setChunkSize(int chunkSize) {
- this.chunkSize = chunkSize;
- }
}
public static class Retry {
@@ -198,27 +175,6 @@ public void setMaxPolygonIdLength(int maxPolygonIdLength) {
}
}
- public static class Error {
- private String transientPatterns;
- private int maxConsecutiveFailures;
-
- public String getTransientPatterns() {
- return transientPatterns;
- }
-
- public void setTransientPatterns(String transientPatterns) {
- this.transientPatterns = transientPatterns;
- }
-
- public int getMaxConsecutiveFailures() {
- return maxConsecutiveFailures;
- }
-
- public void setMaxConsecutiveFailures(int maxConsecutiveFailures) {
- this.maxConsecutiveFailures = maxConsecutiveFailures;
- }
- }
-
public static class Skip {
private int maxCount;
@@ -231,6 +187,18 @@ public void setMaxCount(int maxCount) {
}
}
+ public static class Reader {
+ private int chunkSize = 10;
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+ }
+
public Job getJob() {
return job;
}
@@ -287,14 +255,6 @@ public void setValidation(Validation validation) {
this.validation = validation;
}
- public Error getError() {
- return error;
- }
-
- public void setError(Error error) {
- this.error = error;
- }
-
public Skip getSkip() {
return skip;
}
@@ -302,4 +262,13 @@ public Skip getSkip() {
public void setSkip(Skip skip) {
this.skip = skip;
}
-}
\ No newline at end of file
+
+ public Reader getReader() {
+ return reader;
+ }
+
+ public void setReader(Reader reader) {
+ this.reader = reader;
+ }
+
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchSkipPolicy.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchSkipPolicy.java
index 4a45ade57..30d320ed6 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchSkipPolicy.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/BatchSkipPolicy.java
@@ -215,9 +215,9 @@ private BatchRecord extractRecord(Throwable t) {
return cachedRecord;
}
- // Fallback: create a basic record with the extracted ID for tracking
+ // Fallback: create a basic record with the extracted recordId as featureId for tracking
BatchRecord batchRecord = new BatchRecord();
- batchRecord.setId(recordId);
+ batchRecord.setFeatureId(String.valueOf(recordId));
return batchRecord;
}
return null;
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/ChunkBasedPolygonItemReader.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/ChunkBasedPolygonItemReader.java
new file mode 100644
index 000000000..858602ac4
--- /dev/null
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/ChunkBasedPolygonItemReader.java
@@ -0,0 +1,392 @@
+package ca.bc.gov.nrs.vdyp.batch.configuration;
+
+import ca.bc.gov.nrs.vdyp.batch.model.BatchRecord;
+import ca.bc.gov.nrs.vdyp.batch.service.BatchMetricsCollector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.lang.NonNull;
+import org.springframework.batch.item.ItemStreamReader;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+
+/**
+ * Memory-efficient ItemReader that processes polygon data in configurable chunks.
+ *
+ * This reader implements chunked processing to handle large datasets efficiently:
+ * - Reads polygon and layer CSV files in memory-bounded chunks
+ * - Groups data by FEATURE_ID for complete polygon processing
+ * - Stores raw CSV data to minimize memory usage and parsing overhead
+ * - Supports lazy loading with configurable chunk sizes
+ */
+public class ChunkBasedPolygonItemReader implements ItemStreamReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(ChunkBasedPolygonItemReader.class);
+
+ private final String partitionName;
+ private final BatchMetricsCollector metricsCollector;
+ private final Long jobExecutionId;
+ private final int chunkSize;
+
+ // File-based reading
+ private Path partitionDir;
+ private BufferedReader polygonReader;
+ private BufferedReader layerReader;
+ private String polygonHeader;
+ private String layerHeader;
+
+ // Chunk processing state
+ private List currentChunk;
+ private Iterator chunkIterator;
+ private Set currentChunkFeatureIds;
+ private Map> currentChunkLayers;
+
+ // State tracking
+ private boolean readerOpened = false;
+ private int processedCount = 0;
+ private int skippedCount = 0;
+
+ public ChunkBasedPolygonItemReader(String partitionName, BatchMetricsCollector metricsCollector,
+ Long jobExecutionId, int chunkSize) {
+ this.partitionName = partitionName != null ? partitionName : "unknown";
+ this.metricsCollector = metricsCollector;
+ this.jobExecutionId = jobExecutionId;
+ this.chunkSize = Math.max(chunkSize, 1); // Ensure minimum chunk size of 1
+ }
+
+ @Override
+ public BatchRecord read() throws Exception {
+ if (!readerOpened) {
+ throw new IllegalStateException("Reader not opened. Call open() first.");
+ }
+
+ if (!ensureChunkAvailable()) {
+ return null; // End of data
+ }
+
+ String polygonLine = chunkIterator.next();
+ logPolygonProcessing(polygonLine);
+
+ try {
+ return processPolygonLine(polygonLine);
+ } catch (Exception e) {
+ return handlePolygonProcessingException(polygonLine, e);
+ }
+ }
+
+ @Override
+ public void open(@NonNull ExecutionContext executionContext) throws ItemStreamException {
+ logger.info("[{}] Opening ChunkBasedPolygonItemReader with chunk size: {}", partitionName, chunkSize);
+
+ try {
+ // Get partition directory from job parameters
+ String partitionBaseDir = executionContext.getString("partitionBaseDir");
+ if (partitionBaseDir.trim().isEmpty()) {
+ throw new ItemStreamException("partitionBaseDir not found or empty in ExecutionContext");
+ }
+
+ partitionDir = Paths.get(partitionBaseDir, partitionName);
+ if (!Files.exists(partitionDir)) {
+ throw new ItemStreamException("Partition directory does not exist: " + partitionDir);
+ }
+
+ logger.info("[{}] Reading from partition directory: {}", partitionName, partitionDir);
+
+ initializeReaders();
+
+ readerOpened = true;
+ logger.info("[{}] ChunkBasedPolygonItemReader opened successfully", partitionName);
+
+ } catch (ItemStreamException ise) {
+ throw ise;
+ } catch (Exception e) {
+ throw handleReaderInitializationFailure(e, "Failed to initialize ChunkBasedPolygonItemReader");
+ }
+ }
+
+ @Override
+ public void update(@NonNull ExecutionContext executionContext) throws ItemStreamException {
+ executionContext.putInt(partitionName + ".processed", processedCount);
+ executionContext.putInt(partitionName + ".skipped", skippedCount);
+ }
+
+ @Override
+ public void close() throws ItemStreamException {
+ logger.info("[{}] Closing ChunkBasedPolygonItemReader. Processed: {}, Skipped: {}",
+ partitionName, processedCount, skippedCount);
+
+ closeReader(polygonReader, "polygon");
+ closeReader(layerReader, "layer");
+
+ clearChunkData();
+
+ readerOpened = false;
+ }
+
+ /**
+ * Initialize BufferedReaders for polygon and layer files.
+ */
+ private void initializeReaders() throws IOException {
+ Path polygonFile = partitionDir.resolve("polygons.csv");
+ Path layerFile = partitionDir.resolve("layers.csv");
+
+ if (!Files.exists(polygonFile)) {
+ throw new IOException("Polygon file not found: " + polygonFile);
+ }
+
+ // Initialize polygon reader and read header
+ polygonReader = new BufferedReader(new FileReader(polygonFile.toFile()));
+ polygonHeader = polygonReader.readLine();
+ if (polygonHeader == null) {
+ throw new IOException("Polygon file is empty or has no header");
+ }
+
+ // Initialize layer reader and read header (if file exists)
+ if (Files.exists(layerFile)) {
+ layerReader = new BufferedReader(new FileReader(layerFile.toFile()));
+ layerHeader = layerReader.readLine();
+ } else {
+ logger.warn("[{}] Layer file does not exist: {}", partitionName, layerFile);
+ layerHeader = ""; // Empty header for missing layer file
+ }
+
+ logger.info("[{}] Initialized readers - Polygon header: present, Layer header present: {}",
+ partitionName, layerHeader != null);
+ }
+
+ /**
+ * Load next chunk of polygon data and associated layers.
+ *
+ * @return true if chunk was loaded, false if no more data
+ */
+ private boolean loadNextChunk() throws IOException {
+ clearChunkData();
+
+ currentChunk = new ArrayList<>();
+ currentChunkFeatureIds = new HashSet<>();
+ currentChunkLayers = new HashMap<>();
+
+ // Read polygon lines for current chunk
+ String line;
+ int linesInChunk = 0;
+ while (linesInChunk < chunkSize && (line = polygonReader.readLine()) != null) {
+ if (!line.trim().isEmpty()) {
+ currentChunk.add(line);
+ String featureId = extractFeatureIdFromLine(line);
+ if (featureId != null) {
+ currentChunkFeatureIds.add(featureId);
+ }
+ linesInChunk++;
+ }
+ }
+
+ if (currentChunk.isEmpty()) {
+ logger.debug("[{}] No more polygon data to load", partitionName);
+ return false;
+ }
+
+ // Load associated layers for current chunk's FEATURE_IDs
+ loadLayersForCurrentChunk();
+
+ // Initialize chunk iterator
+ chunkIterator = currentChunk.iterator();
+
+ logger.debug("[{}] Loaded chunk with {} polygons and {} unique FEATURE_IDs",
+ partitionName, currentChunk.size(), currentChunkFeatureIds.size());
+
+ return true;
+ }
+
+ /**
+ * Load layers associated with FEATURE_IDs in current chunk.
+ */
+ private void loadLayersForCurrentChunk() throws IOException {
+ if (layerReader == null || currentChunkFeatureIds.isEmpty()) {
+ return;
+ }
+
+ // Reset layer reader to beginning (after header)
+ layerReader.close();
+ Path layerFile = partitionDir.resolve("layers.csv");
+ layerReader = new BufferedReader(new FileReader(layerFile.toFile()));
+ String header = layerReader.readLine(); // Skip header
+ if (header == null) {
+ logger.warn("[{}] Layer file has no header", partitionName);
+ }
+
+ String line;
+ while ((line = layerReader.readLine()) != null) {
+ if (!line.trim().isEmpty()) {
+ String featureId = extractFeatureIdFromLine(line);
+ if (featureId != null && currentChunkFeatureIds.contains(featureId)) {
+ currentChunkLayers.computeIfAbsent(featureId, k -> new ArrayList<>()).add(line);
+ }
+ }
+ }
+
+ logger.debug("[{}] Loaded layers for {} FEATURE_IDs in current chunk",
+ partitionName, currentChunkLayers.size());
+ }
+
+ /**
+ * Clear current chunk data to free memory.
+ */
+ private void clearChunkData() {
+ if (currentChunk != null) {
+ currentChunk.clear();
+ }
+ if (currentChunkFeatureIds != null) {
+ currentChunkFeatureIds.clear();
+ }
+ if (currentChunkLayers != null) {
+ currentChunkLayers.clear();
+ }
+ chunkIterator = null;
+ }
+
+ /**
+ * Extract FEATURE_ID from CSV line
+ */
+ private String extractFeatureIdFromLine(String line) {
+ if (line == null || line.trim().isEmpty()) {
+ return null;
+ }
+ int commaIndex = line.indexOf(',');
+ return commaIndex > 0 ? line.substring(0, commaIndex).trim() : line.trim();
+ }
+
+ /**
+ * Close a BufferedReader safely.
+ */
+ private void closeReader(BufferedReader reader, String readerType) {
+ if (reader != null) {
+ try {
+ reader.close();
+ logger.debug("[{}] Closed {} reader", partitionName, readerType);
+ } catch (IOException e) {
+ logger.warn("[{}] Failed to close {} reader", partitionName, readerType, e);
+ }
+ }
+ }
+
+ /**
+ * Handle reader initialization failures
+ */
+ private ItemStreamException handleReaderInitializationFailure(Exception cause, String errorDescription) {
+ performReaderCleanupAfterFailure();
+
+ String contextualMessage = String.format(
+ "[%s] %s. Partition: %s, Job execution: %s, Chunk size: %d, Exception type: %s, Root cause: %s",
+ partitionName, errorDescription, partitionName, jobExecutionId, chunkSize,
+ cause.getClass().getSimpleName(),
+ cause.getMessage() != null ? cause.getMessage() : "No error message available");
+
+ logger.error(contextualMessage, cause);
+
+ if (cause instanceof ItemStreamException itemStreamException) {
+ return itemStreamException;
+ }
+ return new ItemStreamException(contextualMessage, cause);
+ }
+
+ /**
+ * Ensure chunk is available for reading. Load new chunk if needed.
+ */
+ private boolean ensureChunkAvailable() throws IOException {
+ if ((chunkIterator == null || !chunkIterator.hasNext()) && !loadNextChunk()) {
+ logger.info("[{}] No more chunks to process - returning null", partitionName);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Log polygon processing details.
+ */
+ private void logPolygonProcessing(String polygonLine) {
+ logger.debug("[{}] Processing polygon line from chunk: {}", partitionName,
+ polygonLine.length() > 100 ? polygonLine.substring(0, 100) + "..." : polygonLine);
+ }
+
+ /**
+ * Process a polygon line and create BatchRecord.
+ */
+ private BatchRecord processPolygonLine(String polygonLine) throws Exception {
+ String featureId = extractFeatureIdFromLine(polygonLine);
+ if (featureId == null || featureId.trim().isEmpty()) {
+ logger.warn("[{}] Skipping polygon with null/empty FEATURE_ID", partitionName);
+ skippedCount++;
+ return read(); // Try next
+ }
+
+ return createBatchRecord(polygonLine, featureId);
+ }
+
+ /**
+ * Create a BatchRecord from polygon line and feature ID.
+ */
+ private BatchRecord createBatchRecord(String polygonLine, String featureId) {
+ List layerLines = currentChunkLayers.getOrDefault(featureId, new ArrayList<>());
+
+ BatchRecord batchRecord = new BatchRecord();
+ batchRecord.setFeatureId(featureId);
+ batchRecord.setRawPolygonData(polygonLine);
+ batchRecord.setRawLayerData(layerLines);
+ batchRecord.setPolygonHeader(polygonHeader);
+ batchRecord.setLayerHeader(layerHeader);
+ batchRecord.setPartitionName(partitionName);
+
+ processedCount++;
+ logger.debug("[{}] Created BatchRecord for FEATURE_ID: {} with {} layers",
+ partitionName, featureId, layerLines.size());
+
+ return batchRecord;
+ }
+
+ /**
+ * Handle exceptions during polygon processing.
+ */
+ private BatchRecord handlePolygonProcessingException(String polygonLine, Exception e) throws Exception {
+ String featureId = extractFeatureIdFromLine(polygonLine);
+ logger.error("[{}] Exception processing polygon FEATURE_ID: {} - Exception: {}",
+ partitionName, featureId, e.getMessage(), e);
+
+ recordSkipMetrics(featureId, e);
+ skippedCount++;
+ return read(); // Try next
+ }
+
+ /**
+ * Record skip metrics for failed polygon processing.
+ */
+ private void recordSkipMetrics(String featureId, Exception e) {
+ if (metricsCollector != null && jobExecutionId != null) {
+ try {
+ Long featureIdLong = featureId != null ? Long.parseLong(featureId) : null;
+ metricsCollector.recordSkip(jobExecutionId, featureIdLong, null, e, partitionName, null);
+ } catch (NumberFormatException nfe) {
+ metricsCollector.recordSkip(jobExecutionId, null, null, e, partitionName, null);
+ }
+ }
+ }
+
+ /**
+ * Perform cleanup after initialization failure.
+ */
+ private void performReaderCleanupAfterFailure() {
+ try {
+ close();
+ } catch (Exception cleanupException) {
+ logger.warn("[{}] Failed to cleanup after initialization failure for job execution: {}",
+ partitionName, jobExecutionId, cleanupException);
+ }
+ }
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitionHandler.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitionHandler.java
index 403531b9a..466db23c6 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitionHandler.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitionHandler.java
@@ -8,20 +8,11 @@
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.FileSystemResource;
-import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.NonNull;
import java.util.Collection;
-/**
- * Dynamic partition handler for VDYP batch processing that adjusts grid size based on job parameters.
- *
- * This handler manages the parallel execution of VDYP processing partitions, allowing runtime configuration of
- * partition count.
- */
public class DynamicPartitionHandler implements PartitionHandler {
private static final Logger logger = LoggerFactory.getLogger(DynamicPartitionHandler.class);
@@ -33,8 +24,7 @@ public class DynamicPartitionHandler implements PartitionHandler {
public DynamicPartitionHandler(
TaskExecutor taskExecutor, Step workerStep, DynamicPartitioner dynamicPartitioner,
- BatchProperties batchProperties
- ) {
+ BatchProperties batchProperties) {
this.taskExecutor = taskExecutor;
this.workerStep = workerStep;
this.dynamicPartitioner = dynamicPartitioner;
@@ -43,14 +33,12 @@ public DynamicPartitionHandler(
@Override
@NonNull
- public Collection
- handle(@NonNull StepExecutionSplitter stepSplitter, @NonNull StepExecution masterStepExecution)
- throws Exception {
+ public Collection handle(@NonNull StepExecutionSplitter stepSplitter,
+ @NonNull StepExecution masterStepExecution)
+ throws Exception {
// Get dynamic parameters from job parameters
JobParameters jobParameters = masterStepExecution.getJobExecution().getJobParameters();
Long partitionSize = jobParameters.getLong("partitionSize");
- Long chunkSize = jobParameters.getLong("chunkSize");
- String inputFilePath = jobParameters.getString("inputFilePath");
// Get grid size
int actualGridSize;
@@ -62,34 +50,18 @@ public DynamicPartitionHandler(
throw new IllegalStateException("No grid size specified in job parameters or properties. ");
}
- // Get input file path
- String actualInputFilePath = inputFilePath;
- if (actualInputFilePath == null || actualInputFilePath.trim().isEmpty()) {
- actualInputFilePath = batchProperties.getInput().getFilePath();
- }
- if (actualInputFilePath == null || actualInputFilePath.trim().isEmpty()) {
- throw new IllegalStateException("No input file path specified in job parameters or properties. ");
- }
-
- // Create input resource from file path
- Resource inputResource;
- if (actualInputFilePath.startsWith("classpath:")) {
- inputResource = new ClassPathResource(actualInputFilePath.substring(10));
+ // Set partition base directory for uploaded CSV files
+ String partitionBaseDir = jobParameters.getString("partitionBaseDir");
+ if (partitionBaseDir != null) {
+ dynamicPartitioner.setPartitionBaseDir(partitionBaseDir);
+ logger.info("[VDYP Uploaded File Partition Handler] Using partition base directory: {}", partitionBaseDir);
} else {
- inputResource = new FileSystemResource(actualInputFilePath);
+ logger.warn("[VDYP Uploaded File Partition Handler] No partition base directory found in job parameters");
}
- dynamicPartitioner.setInputResource(inputResource);
- logger.info("[VDYP Partition Handler] Using input file: {}", actualInputFilePath);
-
logger.info(
- "VDYP dynamic partitioning: Using {} partitions (requested: {}, from properties: {})", actualGridSize,
- partitionSize, batchProperties.getPartitioning().getGridSize()
- );
-
- if (chunkSize != null) {
- logger.info("VDYP dynamic chunk size: Requested {}", chunkSize.intValue());
- }
+ "VDYP FEATURE_ID-based partitioning: Using {} partitions (requested: {}, from properties: {})",
+ actualGridSize, partitionSize, batchProperties.getPartitioning().getGridSize());
// Create and configure TaskExecutorPartitionHandler with dynamic grid size
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
@@ -102,4 +74,4 @@ public DynamicPartitionHandler(
// Delegate to the configured handler
return handler.handle(stepSplitter, masterStepExecution);
}
-}
\ No newline at end of file
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitioner.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitioner.java
index d967fd2eb..6fea9da3a 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitioner.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/DynamicPartitioner.java
@@ -1,153 +1,63 @@
package ca.bc.gov.nrs.vdyp.batch.configuration;
+import java.util.HashMap;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
-import org.springframework.core.io.Resource;
import org.springframework.lang.NonNull;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Dynamic partitioner for VDYP batch processing that divides CSV file processing by record position ranges.
- *
- * This partitioner determines the total record count and creates partitions based on sequential record positions.
- */
public class DynamicPartitioner implements Partitioner {
private static final Logger logger = LoggerFactory.getLogger(DynamicPartitioner.class);
- private static final String START_LINE = "startLine";
- private static final String END_LINE = "endLine";
private static final String PARTITION_NAME = "partitionName";
- private static final String PARTITION_0 = "partition0";
+ private static final String PARTITION_BASE_DIR = "partitionBaseDir";
+ private static final String ASSIGNED_FEATURE_IDS = "assignedFeatureIds";
- // Input resource will be set dynamically during execution
- private Resource inputResource;
+ private String partitionBaseDir;
- public void setInputResource(Resource inputResource) {
- this.inputResource = inputResource;
+ public void setPartitionBaseDir(String partitionBaseDir) {
+ this.partitionBaseDir = partitionBaseDir;
}
+ /**
+ * @param gridSize Number of partitions to create
+ * @return Map of partition execution contexts for existing partitions
+ */
@Override
@NonNull
public Map partition(int gridSize) {
Map partitions = new HashMap<>();
- // Check if input resource is available
- if (inputResource == null) {
- logger.warn("[VDYP Partitioner] Warning: Input resource not set. Using default single partition.");
- // Create single empty partition
- ExecutionContext context = new ExecutionContext();
- context.putLong(START_LINE, 2);
- context.putLong(END_LINE, 2);
- context.putString(PARTITION_NAME, PARTITION_0);
- partitions.put(PARTITION_0, context);
- return partitions;
- }
-
- // Calculate total record count by reading the actual CSV file
- long totalRecords = calculateTotalRecords();
-
- if (totalRecords <= 0) {
- logger.warn("[VDYP Partitioner] Warning: No records found in CSV file. Using single partition.");
- // Fallback: create single partition
- ExecutionContext context = new ExecutionContext();
- context.putLong(START_LINE, 2); // Skip header (line 1)
- context.putLong(END_LINE, 2);
- context.putString(PARTITION_NAME, PARTITION_0);
- partitions.put(PARTITION_0, context);
- return partitions;
- }
-
- // Divide records by position, not by ID values
- long recordsPerPartition = totalRecords / gridSize;
- long remainder = totalRecords % gridSize;
-
- long currentStartLine = 2; // Start after header (line 1)
+ logger.info(
+ "[VDYP Uploaded File Partitioner] Creating execution contexts for {} uploaded file partitions",
+ gridSize);
+ // Create execution contexts for existing partition directories
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
- // Calculate line range for this partition
- long recordsInThisPartition = recordsPerPartition;
-
- // Add remainder to the last partition
- if (i == gridSize - 1) {
- recordsInThisPartition += remainder;
+ // Set partition parameters
+ context.putString(PARTITION_NAME, "partition" + i);
+ if (partitionBaseDir != null) {
+ context.putString(PARTITION_BASE_DIR, partitionBaseDir);
}
- long currentEndLine = currentStartLine + recordsInThisPartition - 1;
-
- // Set partition parameters - using line-based ranges
- context.putLong(START_LINE, currentStartLine);
- context.putLong(END_LINE, currentEndLine);
- context.putString(PARTITION_NAME, "partition" + i);
+ // Set empty FEATURE_IDs since they're already distributed in partition files
+ context.putString(ASSIGNED_FEATURE_IDS, "");
partitions.put("partition" + i, context);
logger.info(
- "VDYP partition{} created: lines {}-{} ({} records)", i, currentStartLine, currentEndLine,
- recordsInThisPartition
- );
-
- currentStartLine = currentEndLine + 1;
+ "VDYP partition{} execution context created for uploaded partition directory", i);
}
logger.info(
- "VDYP total partitions: {}, covering {} records (lines 2-{})", gridSize, totalRecords,
- currentStartLine - 1
- );
+ "Uploaded file partitioner created {} execution contexts for uploaded partitions", partitions.size());
return partitions;
}
-
- /**
- * Calculate total record count by reading the VDYP CSV file and counting data lines.
- *
- * This method counts the number of data records (excluding header) for position-based partitioning of VDYP data.
- *
- * @return Total number of data records
- */
- private long calculateTotalRecords() {
- logger.info("[VDYP Partitioner] Calculating total records from VDYP CSV file...");
-
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputResource.getInputStream()))) {
- String line;
- long recordCount = 0;
- int lineNumber = 0;
-
- // Skip header line
- String headerLine = reader.readLine();
- if (headerLine != null) {
- lineNumber = 1;
- logger.info("[VDYP Partitioner] Header: {}", headerLine);
- }
-
- // Count data records
- while ( (line = reader.readLine()) != null) {
- lineNumber++;
-
- if (!line.trim().isEmpty()) {
- recordCount++;
- }
- }
-
- logger.info("[VDYP Partitioner] CSV Analysis Complete:");
- logger.info(" - Total lines in file: {}", lineNumber);
- logger.info(" - VDYP data records found: {}", recordCount);
- logger.info(" - Using position-based partitioning for efficient parallel VDYP processing");
-
- return recordCount;
-
- } catch (IOException e) {
- logger.error("[VDYP Partitioner] Error reading CSV file: {}", e.getMessage(), e);
- return 0;
- }
- }
-}
\ No newline at end of file
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedBatchConfiguration.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedBatchConfiguration.java
index 0dae7ac25..dfbe01974 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedBatchConfiguration.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedBatchConfiguration.java
@@ -1,7 +1,10 @@
package ca.bc.gov.nrs.vdyp.batch.configuration;
+import ca.bc.gov.nrs.vdyp.batch.exception.ResultAggregationException;
import ca.bc.gov.nrs.vdyp.batch.model.BatchRecord;
import ca.bc.gov.nrs.vdyp.batch.service.BatchMetricsCollector;
+import ca.bc.gov.nrs.vdyp.batch.service.ResultAggregationService;
+import ca.bc.gov.nrs.vdyp.batch.service.VdypProjectionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
@@ -16,7 +19,8 @@
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
-import org.springframework.batch.item.file.FlatFileItemWriter;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@@ -26,14 +30,11 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
-import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
import java.util.stream.Collectors;
-/**
- * VDYP Batch Configuration with partitioning, error handling, and detailed metrics collection.
- */
@Configuration
public class PartitionedBatchConfiguration {
@@ -42,42 +43,33 @@ public class PartitionedBatchConfiguration {
private final JobRepository jobRepository;
private final BatchMetricsCollector metricsCollector;
private final BatchProperties batchProperties;
+ private final ResultAggregationService resultAggregationService;
public PartitionedBatchConfiguration(
- JobRepository jobRepository, BatchMetricsCollector metricsCollector, BatchProperties batchProperties
- ) {
+ JobRepository jobRepository, BatchMetricsCollector metricsCollector, BatchProperties batchProperties,
+ ResultAggregationService resultAggregationService) {
this.jobRepository = jobRepository;
this.metricsCollector = metricsCollector;
this.batchProperties = batchProperties;
+ this.resultAggregationService = resultAggregationService;
}
private static final String UNKNOWN = "unknown";
@Bean
@StepScope
- public BatchRetryPolicy retryPolicy(
- @Value("#{jobParameters['maxRetryAttempts']}") Long maxRetryAttemptsParam,
- @Value("#{jobParameters['retryBackoffPeriod']}") Long retryBackoffPeriodParam
- ) {
-
- // Get max attempts
- int maxAttempts;
- if (maxRetryAttemptsParam != null && maxRetryAttemptsParam > 0) {
- maxAttempts = maxRetryAttemptsParam.intValue();
- } else if (batchProperties.getRetry().getMaxAttempts() > 0) {
- maxAttempts = batchProperties.getRetry().getMaxAttempts();
- } else {
- throw new IllegalStateException("No max retry attempts specified in job parameters or properties. ");
+ public BatchRetryPolicy retryPolicy() {
+ // Get retry configuration from application.properties
+ int maxAttempts = batchProperties.getRetry().getMaxAttempts();
+ if (maxAttempts <= 0) {
+ throw new IllegalStateException(
+ "batch.retry.max-attempts must be configured with a positive value in application.properties");
}
- // Get backoff period
- int backoffPeriod;
- if (retryBackoffPeriodParam != null && retryBackoffPeriodParam > 0) {
- backoffPeriod = retryBackoffPeriodParam.intValue();
- } else if (batchProperties.getRetry().getBackoffPeriod() > 0) {
- backoffPeriod = batchProperties.getRetry().getBackoffPeriod();
- } else {
- throw new IllegalStateException("No retry backoff period specified in job parameters or properties. ");
+ int backoffPeriod = batchProperties.getRetry().getBackoffPeriod();
+ if (backoffPeriod <= 0) {
+ throw new IllegalStateException(
+ "batch.retry.backoff-period must be configured with a positive value in application.properties");
}
BatchRetryPolicy policy = new BatchRetryPolicy(maxAttempts, backoffPeriod);
@@ -86,19 +78,16 @@ public BatchRetryPolicy retryPolicy(
}
/**
- * Batch Skip policy with metrics - step scoped to access job parameters
+ * Batch Skip policy with metrics - configuration from application.properties
*/
@Bean
@StepScope
- public BatchSkipPolicy skipPolicy(@Value("#{jobParameters['maxSkipCount']}") Long maxSkipCountParam) {
- // Get max skip count
- int maxSkipCount;
- if (maxSkipCountParam != null && maxSkipCountParam > 0) {
- maxSkipCount = maxSkipCountParam.intValue();
- } else if (batchProperties.getSkip().getMaxCount() > 0) {
- maxSkipCount = batchProperties.getSkip().getMaxCount();
- } else {
- throw new IllegalStateException("No max skip count specified in job parameters or properties. ");
+ public BatchSkipPolicy skipPolicy() {
+ // Get skip configuration from application.properties
+ int maxSkipCount = batchProperties.getSkip().getMaxCount();
+ if (maxSkipCount <= 0) {
+ throw new IllegalStateException(
+ "batch.skip.max-count must be configured with a positive value in application.properties");
}
return new BatchSkipPolicy(maxSkipCount, metricsCollector);
@@ -112,22 +101,19 @@ public TaskExecutor taskExecutor() {
int corePoolSize = batchProperties.getThreadPool().getCorePoolSize();
if (corePoolSize <= 0) {
throw new IllegalStateException(
- "batch.thread-pool.core-pool-size must be configured with a positive value in application.properties"
- );
+ "batch.thread-pool.core-pool-size must be configured with a positive value in application.properties");
}
int maxPoolSizeMultiplier = batchProperties.getThreadPool().getMaxPoolSizeMultiplier();
if (maxPoolSizeMultiplier <= 0) {
throw new IllegalStateException(
- "batch.thread-pool.max-pool-size-multiplier must be configured with a positive value in application.properties"
- );
+ "batch.thread-pool.max-pool-size-multiplier must be configured with a positive value in application.properties");
}
String threadNamePrefix = batchProperties.getThreadPool().getThreadNamePrefix();
if (threadNamePrefix == null || threadNamePrefix.trim().isEmpty()) {
throw new IllegalStateException(
- "batch.thread-pool.thread-name-prefix must be configured in application.properties"
- );
+ "batch.thread-pool.thread-name-prefix must be configured in application.properties");
}
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
@@ -148,16 +134,14 @@ public DynamicPartitioner dynamicPartitioner() {
@Bean
public DynamicPartitionHandler dynamicPartitionHandler(
TaskExecutor taskExecutor, Step workerStep, DynamicPartitioner dynamicPartitioner,
- BatchProperties batchProperties
- ) {
+ BatchProperties batchProperties) {
return new DynamicPartitionHandler(taskExecutor, workerStep, dynamicPartitioner, batchProperties);
}
@Bean
public Step masterStep(
TaskExecutor taskExecutor, Step workerStep, DynamicPartitioner dynamicPartitioner,
- DynamicPartitionHandler dynamicPartitionHandler
- ) {
+ DynamicPartitionHandler dynamicPartitionHandler) {
return new StepBuilder("masterStep", jobRepository).partitioner("workerStep", dynamicPartitioner)
.partitionHandler(dynamicPartitionHandler).build();
}
@@ -168,20 +152,23 @@ public Step masterStep(
@Bean
public Step workerStep(
BatchRetryPolicy retryPolicy, BatchSkipPolicy skipPolicy, PlatformTransactionManager transactionManager,
- BatchMetricsCollector metricsCollector, BatchProperties batchProperties
- ) {
- int chunkSize = batchProperties.getPartitioning().getChunkSize();
- if (chunkSize <= 0) {
- throw new IllegalStateException(
- "batch.partitioning.chunk-size must be configured with a positive value in application.properties"
- );
- }
+ BatchMetricsCollector metricsCollector, BatchProperties batchProperties,
+ VdypProjectionService vdypProjectionService,
+ org.springframework.batch.item.ItemStreamReader partitionReader) {
+
+ int chunkSize = Math.max(batchProperties.getReader().getChunkSize(), 1);
+ logger.info("Worker step configured with chunk size: {}", chunkSize);
+
+ VdypChunkProjectionWriter writer = new VdypChunkProjectionWriter(vdypProjectionService, metricsCollector);
return new StepBuilder("workerStep", jobRepository)
.chunk(chunkSize, transactionManager)
- .reader(partitionReader(metricsCollector, batchProperties))
- .processor(vdypProjectionProcessor(retryPolicy, metricsCollector)).writer(partitionWriter(null, null))
- .faultTolerant().retryPolicy(retryPolicy).skipPolicy(skipPolicy).listener(new StepExecutionListener() {
+ .reader(partitionReader)
+ .processor(vdypProjectionProcessor(retryPolicy, metricsCollector))
+ .writer(writer)
+ .listener(writer) // Add writer as step listener
+ .faultTolerant().retryPolicy(retryPolicy).skipPolicy(skipPolicy)
+ .listener(new StepExecutionListener() {
@Override
public void beforeStep(@NonNull StepExecution stepExecution) {
String partitionName = stepExecution.getExecutionContext().getString("partitionName", UNKNOWN);
@@ -193,8 +180,7 @@ public void beforeStep(@NonNull StepExecution stepExecution) {
metricsCollector.initializePartitionMetrics(jobExecutionId, partitionName, startLine, endLine);
logger.info(
- "[{}] VDYP Worker step starting for range {}-{}", partitionName, startLine, endLine
- );
+ "[{}] VDYP Worker step starting for range {}-{}", partitionName, startLine, endLine);
}
@Override
@@ -205,14 +191,12 @@ public ExitStatus afterStep(@NonNull StepExecution stepExecution) {
// Complete partition metrics
metricsCollector.completePartitionMetrics(
jobExecutionId, partitionName, stepExecution.getWriteCount(),
- stepExecution.getExitStatus().getExitCode()
- );
+ stepExecution.getExitStatus().getExitCode());
logger.info(
"[{}] VDYP Worker step completed. Read: {}, Written: {}, Skipped: {}", partitionName,
stepExecution.getReadCount(), stepExecution.getWriteCount(),
- stepExecution.getSkipCount()
- );
+ stepExecution.getSkipCount());
return stepExecution.getExitStatus();
}
@@ -220,13 +204,16 @@ public ExitStatus afterStep(@NonNull StepExecution stepExecution) {
}
/**
- * VDYP Batch Job with metrics initialization Only created when explicitly enabled via property
+ * VDYP Batch Job with metrics initialization Only created when explicitly
+ * enabled via property
*/
@Bean
@ConditionalOnProperty(name = "batch.job.auto-create", havingValue = "true", matchIfMissing = false)
- public Job partitionedJob(PartitionedJobExecutionListener jobExecutionListener, Step masterStep) {
+ public Job partitionedJob(
+ PartitionedJobExecutionListener jobExecutionListener, Step masterStep, Step postProcessingStep,
+ PlatformTransactionManager transactionManager) {
return new JobBuilder("VdypPartitionedJob", jobRepository).incrementer(new RunIdIncrementer()).start(masterStep)
- .listener(new JobExecutionListener() {
+ .next(postProcessingStep).listener(new JobExecutionListener() {
@Override
public void beforeJob(@NonNull JobExecution jobExecution) {
// Initialize job metrics
@@ -245,20 +232,16 @@ public void afterJob(@NonNull JobExecution jobExecution) {
.filter(stepExecution -> stepExecution.getStepName().startsWith("workerStep:"))
.mapToLong(StepExecution::getWriteCount).sum();
- // Debug logging for metrics validation
logger.debug(
"[VDYP Metrics Debug] Job {} - All steps: [{}]", jobExecution.getId(),
jobExecution.getStepExecutions().stream().map(StepExecution::getStepName)
- .collect(Collectors.joining(", "))
- );
+ .collect(Collectors.joining(", ")));
metricsCollector.finalizeJobMetrics(
- jobExecution.getId(), jobExecution.getStatus().toString(), totalRead, totalWritten
- );
+ jobExecution.getId(), jobExecution.getStatus().toString(), totalRead, totalWritten);
jobExecutionListener.afterJob(jobExecution);
- // Clean up old metrics
metricsCollector.cleanupOldMetrics(20);
logger.info("=== VDYP Batch Job Completed ===");
@@ -268,71 +251,128 @@ public void afterJob(@NonNull JobExecution jobExecution) {
@Bean
@StepScope
- public RangeAwareItemReader
- partitionReader(BatchMetricsCollector metricsCollector, BatchProperties batchProperties) {
- return new RangeAwareItemReader(null, metricsCollector, batchProperties);
+ public org.springframework.batch.item.ItemStreamReader partitionReader(
+ BatchMetricsCollector metricsCollector,
+ @Value("#{stepExecutionContext['partitionName']}") String partitionName,
+ @Value("#{stepExecution.jobExecutionId}") Long jobExecutionId,
+ BatchProperties batchProperties) {
+
+ logger.info("[{}] Using ChunkBasedPolygonItemReader with chunk size: {}",
+ partitionName, batchProperties.getReader().getChunkSize());
+ return new ChunkBasedPolygonItemReader(partitionName, metricsCollector, jobExecutionId,
+ batchProperties.getReader().getChunkSize());
}
@Bean
@StepScope
- public FlatFileItemWriter partitionWriter(
- @Value("#{stepExecutionContext['partitionName']}") String partitionName,
- @Value("#{jobParameters['outputFilePath']}") String outputFilePath
- ) {
-
- String actualPartitionName = partitionName != null ? partitionName : UNKNOWN;
+ public VdypProjectionProcessor vdypProjectionProcessor(
+ BatchRetryPolicy retryPolicy, BatchMetricsCollector metricsCollector) {
+ return new VdypProjectionProcessor(retryPolicy, metricsCollector);
+ }
- String actualOutputDirectory = outputFilePath;
- if (actualOutputDirectory == null) {
- actualOutputDirectory = batchProperties.getOutput().getDirectory().getDefaultPath();
- }
- if (actualOutputDirectory == null) {
- actualOutputDirectory = System.getProperty("java.io.tmpdir");
- logger.warn("No output directory specified, using system temp directory: {}", actualOutputDirectory);
- }
+ /**
+ * Post-processing step that aggregates results from all partitions into a
+ * single consolidated ZIP file. This step runs after all worker partitions have
+ * completed successfully.
+ */
+ @Bean
+ public Step postProcessingStep(PlatformTransactionManager transactionManager) {
+ return new StepBuilder("postProcessingStep", jobRepository)
+ .tasklet(resultAggregationTasklet(), transactionManager).build();
+ }
- String filePrefix = batchProperties.getOutput().getFilePrefix();
- if (filePrefix == null) {
- throw new IllegalStateException("batch.output.file-prefix must be configured in application.properties");
- }
+ /**
+ * Tasklet that performs result aggregation by collecting all partition results
+ * and creating a single consolidated output ZIP file.
+ */
+ @Bean
+ @StepScope
+ public Tasklet resultAggregationTasklet() {
+ return (contribution, chunkContext) -> {
+ Long jobExecutionId = chunkContext.getStepContext().getStepExecution().getJobExecutionId();
+ String baseOutputPath = batchProperties.getOutput().getDirectory().getDefaultPath();
+
+ if (baseOutputPath == null) {
+ baseOutputPath = System.getProperty("java.io.tmpdir");
+ logger.warn("No output directory configured, using system temp directory: {}", baseOutputPath);
+ }
+
+ logger.info(
+ "Starting result aggregation for job execution: {} from path: {}", jobExecutionId, baseOutputPath);
+
+ try {
+ // Aggregate all partition results into consolidated ZIP
+ Path consolidatedZip = resultAggregationService.aggregateResults(jobExecutionId, baseOutputPath);
+
+ // Store the final ZIP path in the execution context for potential retrieval
+ chunkContext.getStepContext().getStepExecution().getExecutionContext()
+ .putString("consolidatedOutputPath", consolidatedZip.toString());
+
+ logger.info("Result aggregation completed successfully. Consolidated output: {}", consolidatedZip);
+
+ return RepeatStatus.FINISHED;
+
+ } catch (IOException ioException) {
+ // Handle I/O specific failures: perform cleanup and wrap with enhanced context
+ throw handleResultAggregationFailure(
+ jobExecutionId, baseOutputPath, ioException, "I/O operation failed during result aggregation");
+ } catch (Exception generalException) {
+ // Handle all other failures: perform cleanup and wrap with enhanced context
+ throw handleResultAggregationFailure(
+ jobExecutionId, baseOutputPath, generalException, "Unexpected error during result aggregation");
+ }
+ };
+ }
- String csvHeader = batchProperties.getOutput().getCsvHeader();
- if (csvHeader == null || csvHeader.trim().isEmpty()) {
- throw new IllegalStateException("batch.output.csv-header must be configured in application.properties");
- }
+ /**
+ * Handles result aggregation failures by performing cleanup, logging, and
+ * creating appropriate exception.
+ */
+ private ResultAggregationException handleResultAggregationFailure(
+ Long jobExecutionId, String baseOutputPath, Exception cause, String errorDescription) {
+ // Perform cleanup of partial aggregation results
+ performAggregationCleanup(jobExecutionId, baseOutputPath);
- String partitionOutputPath = actualOutputDirectory + File.separator + filePrefix + "_" + actualPartitionName
- + ".csv";
+ String contextualMessage = String.format(
+ "%s for job execution: %d, Output path: %s, Exception type: %s, Root cause: %s", errorDescription,
+ jobExecutionId, baseOutputPath, cause.getClass().getSimpleName(),
+ cause.getMessage() != null ? cause.getMessage() : "No error message available");
- try {
- Files.createDirectories(Paths.get(actualOutputDirectory));
- } catch (Exception e) {
- logger.error("Failed to create output directory: {}", e.getMessage());
- }
+ // Log the failure with full context
+ logger.error(contextualMessage, cause);
- FlatFileItemWriter writer = new FlatFileItemWriter<>();
- writer.setResource(new org.springframework.core.io.FileSystemResource(partitionOutputPath));
- writer.setName("VdypItemWriter_" + actualPartitionName);
- writer.setHeaderCallback(w -> {
- logger.info("[{}] VDYP Writer: Writing header to file {}", actualPartitionName, partitionOutputPath);
- w.write(csvHeader);
- });
- writer.setLineAggregator(
- item -> item.getId() + ","
- + (item.getData() != null ? "\"" + item.getData().replace("\"", "\"\"") + "\"" : "") + ","
- + (item.getPolygonId() != null ? item.getPolygonId() : "") + ","
- + (item.getLayerId() != null ? item.getLayerId() : "") + "," + "PROCESSED"
- );
-
- logger.info("[{}] VDYP Writer configured for output path: {}", actualPartitionName, partitionOutputPath);
-
- return writer;
+ return new ResultAggregationException(contextualMessage, cause);
}
- @Bean
- @StepScope
- public VdypProjectionProcessor
- vdypProjectionProcessor(BatchRetryPolicy retryPolicy, BatchMetricsCollector metricsCollector) {
- return new VdypProjectionProcessor(retryPolicy, metricsCollector);
+ /**
+ * Performs cleanup of partial aggregation results when aggregation fails. This
+ * method safely handles cleanup without throwing exceptions.
+ *
+ * @param jobExecutionId The job execution ID for context
+ * @param baseOutputPath The base output path where cleanup should occur
+ */
+ private void performAggregationCleanup(Long jobExecutionId, String baseOutputPath) {
+ try {
+ // Attempt to clean up any partial files created during aggregation
+ java.nio.file.Path outputDir = java.nio.file.Paths.get(baseOutputPath);
+ if (Files.exists(outputDir)) {
+ // Clean up temporary files related to this job execution
+ String jobPrefix = "job_" + jobExecutionId;
+ try (java.util.stream.Stream pathStream = Files.list(outputDir)) {
+ pathStream.filter(path -> path.getFileName().toString().startsWith(jobPrefix)).forEach(path -> {
+ try {
+ Files.deleteIfExists(path);
+ logger.debug("Cleaned up partial aggregation file: {}", path);
+ } catch (Exception cleanupException) {
+ logger.warn("Failed to cleanup file: {}", path, cleanupException);
+ }
+ });
+ }
+ }
+ } catch (Exception cleanupException) {
+ // Log cleanup failure but don't throw exception to avoid masking original error
+ logger.warn(
+ "Failed to perform aggregation cleanup for job execution: {}", jobExecutionId, cleanupException);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedJobExecutionListener.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedJobExecutionListener.java
index 0cd0f1b86..4b75a5943 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedJobExecutionListener.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/PartitionedJobExecutionListener.java
@@ -7,15 +7,9 @@
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
-import java.util.List;
/**
* Job execution listener for partitioned VDYP batch job.
@@ -45,7 +39,6 @@ public void beforeJob(@NonNull JobExecution jobExecution) {
logger.info("VDYP PARTITIONED JOB STARTING");
Long partitionSize = jobExecution.getJobParameters().getLong("partitionSize");
- Long chunkSize = jobExecution.getJobParameters().getLong("chunkSize");
int actualPartitionSize;
if (partitionSize != null) {
@@ -54,23 +47,10 @@ public void beforeJob(@NonNull JobExecution jobExecution) {
actualPartitionSize = batchProperties.getPartitioning().getGridSize();
} else {
throw new IllegalStateException(
- "batch.partitioning.grid-size must be configured in application.properties"
- );
- }
-
- int actualChunkSize;
- if (chunkSize != null) {
- actualChunkSize = chunkSize.intValue();
- } else if (batchProperties.getPartitioning().getChunkSize() > 0) {
- actualChunkSize = batchProperties.getPartitioning().getChunkSize();
- } else {
- throw new IllegalStateException(
- "batch.partitioning.chunk-size must be configured in application.properties"
- );
+ "batch.partitioning.grid-size must be configured in application.properties");
}
logger.info("VDYP Grid Size: {}", actualPartitionSize);
- logger.info("VDYP Chunk Size: {}", actualChunkSize);
logger.info("Expected Partitions: {}", actualPartitionSize);
logger.info("Job Execution ID: {}", jobExecution.getId());
logger.info(separator);
@@ -114,36 +94,8 @@ public void afterJob(@NonNull JobExecution jobExecution) {
logger.warn("Duration: Unable to calculate (missing time information)");
}
- // Merge partition files
- try {
- Long partitionSize = jobExecution.getJobParameters().getLong("partitionSize");
- String outputDirectory = jobExecution.getJobParameters().getString("outputFilePath");
-
- String actualOutputDirectory = outputDirectory;
- if (actualOutputDirectory == null) {
- actualOutputDirectory = batchProperties.getOutput().getDirectory().getDefaultPath();
- }
- if (actualOutputDirectory == null) {
- actualOutputDirectory = System.getProperty("java.io.tmpdir");
- logger.warn(
- "No output directory specified, using system temp directory: {}", actualOutputDirectory
- );
- }
-
- int actualPartitionSize;
- if (partitionSize != null) {
- actualPartitionSize = partitionSize.intValue();
- } else if (batchProperties.getPartitioning().getGridSize() > 0) {
- actualPartitionSize = batchProperties.getPartitioning().getGridSize();
- } else {
- throw new IllegalStateException(
- "batch.partitioning.grid-size must be configured in application.properties"
- );
- }
- mergePartitionFiles(actualPartitionSize, jobExecutionId, actualOutputDirectory);
- } catch (Exception e) {
- logger.error("Failed to merge VDYP partition files: {}", e.getMessage(), e);
- }
+ // Note: Partition file merging has been disabled as results are now aggregated
+ // through the ResultAggregationService in the post-processing step
logger.info(separator);
}
@@ -159,76 +111,4 @@ private void cleanupOldJobTracker(Long currentJobId) {
jobCompletionTracker.entrySet().removeIf(entry -> entry.getKey() < currentJobId - 5);
}
}
-
- /**
- * Merges all VDYP partition output files into a single file.
- */
- private void mergePartitionFiles(int partitionCount, Long jobExecutionId, String outputDirectory)
- throws IOException {
- String filePrefix = batchProperties.getOutput().getFilePrefix();
- if (filePrefix == null) {
- throw new IllegalStateException("batch.output.file-prefix must be configured in application.properties");
- }
-
- String csvHeader = batchProperties.getOutput().getCsvHeader();
- if (csvHeader == null || csvHeader.trim().isEmpty()) {
- throw new IllegalStateException("batch.output.csv-header must be configured in application.properties");
- }
-
- String finalOutputPath = outputDirectory + File.separator + filePrefix + "_merged.csv";
-
- // Add job execution ID to avoid conflicts in concurrent executions
- String tempMergeFile = outputDirectory + File.separator + filePrefix + "_merged_temp_" + jobExecutionId
- + ".csv";
-
- logger.info("Starting VDYP file merge for {} partitions...", partitionCount);
-
- try (java.io.BufferedWriter writer = Files.newBufferedWriter(Paths.get(tempMergeFile))) {
- // Write VDYP header
- writer.write(csvHeader);
- writer.newLine();
-
- int mergedFiles = 0;
- long totalLines = 0;
-
- // Merge partition files
- for (int i = 0; i < partitionCount; i++) {
- String partitionFile = outputDirectory + File.separator + filePrefix + "_partition" + i + ".csv";
- if (Files.exists(Paths.get(partitionFile))) {
- try (Stream lines = Files.lines(Paths.get(partitionFile))) {
- List partitionLinesList = lines.skip(1).toList();
-
- for (String line : partitionLinesList) {
- try {
- writer.write(line);
- writer.newLine();
- } catch (Exception e) {
- logger.error("Error writing VDYP line: {}", e.getMessage());
- }
- }
-
- long partitionLines = partitionLinesList.size();
- totalLines += partitionLines;
- mergedFiles++;
- logger.info("Merged VDYP partition file: {} ({} records)", partitionFile, partitionLines);
- }
- } else {
- logger.warn("VDYP partition file not found: {}", partitionFile);
- }
- }
-
- logger.info("Merged {} VDYP partition files with total {} data records", mergedFiles, totalLines);
- }
-
- // Atomically move temp file to final location
- Files.move(
- Paths.get(tempMergeFile), Paths.get(finalOutputPath), java.nio.file.StandardCopyOption.REPLACE_EXISTING
- );
-
- logger.info("Final merged VDYP output created: {}", finalOutputPath);
- try (Stream lines = Files.lines(Paths.get(finalOutputPath))) {
- long lineCount = lines.count();
- logger.info("Total lines in merged VDYP file: {} (including header)", lineCount);
- }
- }
}
\ No newline at end of file
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/RangeAwareItemReader.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/RangeAwareItemReader.java
deleted file mode 100644
index eb02fc9fe..000000000
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/RangeAwareItemReader.java
+++ /dev/null
@@ -1,384 +0,0 @@
-package ca.bc.gov.nrs.vdyp.batch.configuration;
-
-import org.springframework.batch.core.StepExecution;
-import org.springframework.batch.core.annotation.BeforeStep;
-import org.springframework.batch.item.ExecutionContext;
-import org.springframework.batch.item.ItemReader;
-import org.springframework.batch.item.ItemStream;
-import org.springframework.batch.item.ItemStreamException;
-import org.springframework.batch.item.file.FlatFileItemReader;
-import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
-import org.springframework.batch.item.file.FlatFileParseException;
-import org.springframework.core.io.Resource;
-import org.springframework.lang.NonNull;
-
-import ca.bc.gov.nrs.vdyp.batch.model.BatchRecord;
-import ca.bc.gov.nrs.vdyp.batch.service.BatchMetricsCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * ItemReader that reads only records within a specified line range. Used for partitioned processing to ensure each
- * partition processes only its assigned range of data lines.
- */
-public class RangeAwareItemReader implements ItemReader, ItemStream {
-
- private static final Logger logger = LoggerFactory.getLogger(RangeAwareItemReader.class);
-
- private FlatFileItemReader delegate;
- private long startLine;
- private long endLine;
- private String partitionName;
- private long processedCount = 0;
- private long skippedCount = 0;
- private long currentLine = 0;
- private boolean readerOpened = false;
- private boolean rangeExhausted = false;
-
- // Job execution context for metrics
- private Long jobExecutionId;
-
- // Metrics collector for skip tracking
- private final BatchMetricsCollector metricsCollector;
-
- private final BatchProperties batchProperties;
-
- private static final String UNKNOWN = "unknown";
-
- // Skip tracking and statistics
- private final AtomicLong totalSkipsInReader = new AtomicLong(0);
- private final ConcurrentHashMap skipReasonCounts = new ConcurrentHashMap<>();
-
- private Resource inputResource;
-
- public RangeAwareItemReader(
- Resource resource, BatchMetricsCollector metricsCollector, BatchProperties batchProperties
- ) {
- this.inputResource = resource;
- this.metricsCollector = metricsCollector;
- this.batchProperties = batchProperties;
- }
-
- public void setInputResource(Resource resource) {
- this.inputResource = resource;
- }
-
- /**
- * Extracts partition parameters from StepExecution context before step starts.
- */
- @BeforeStep
- public void beforeStep(StepExecution stepExecution) {
- this.startLine = stepExecution.getExecutionContext().getLong("startLine", 2);
- this.endLine = stepExecution.getExecutionContext().getLong("endLine", Long.MAX_VALUE);
- this.partitionName = stepExecution.getExecutionContext().getString("partitionName", UNKNOWN);
- this.jobExecutionId = stepExecution.getJobExecutionId();
-
- // Initialize current line tracker
- this.currentLine = 1; // Start at header line
-
- String inputFilePath = stepExecution.getJobExecution().getJobParameters().getString("inputFilePath");
- if (inputFilePath == null || inputFilePath.trim().isEmpty()) {
- inputFilePath = batchProperties.getInput().getFilePath();
- }
- if (inputFilePath == null || inputFilePath.trim().isEmpty()) {
- throw new IllegalStateException(
- "No input file path specified in job parameters or properties. Cannot initialize reader for partition: "
- + partitionName
- );
- }
-
- // Create resource from file path
- if (inputFilePath.startsWith("classpath:")) {
- this.inputResource = new org.springframework.core.io.ClassPathResource(inputFilePath.substring(10));
- } else {
- this.inputResource = new org.springframework.core.io.FileSystemResource(inputFilePath);
- }
-
- // Check if the resource actually exists
- if (!inputResource.exists()) {
- throw new IllegalStateException(
- "VDYP input resource does not exist: " + inputFilePath
- + ". Cannot initialize reader for partition: " + partitionName
- );
- }
-
- // Create a new, independent delegate reader for this VDYP partition
- String uniqueReaderName = "VdypRangeAwareItemReader-" + partitionName + "-" + System.currentTimeMillis();
- this.delegate = new FlatFileItemReaderBuilder().name(uniqueReaderName).resource(inputResource)
- .delimited().names("id", "data", "polygonId", "layerId").linesToSkip(1) // Skip header
- .targetType(BatchRecord.class).build();
-
- // Calculate dynamic logging intervals based on partition size
- long partitionSize = endLine - startLine + 1;
-
- logger.info(
- "[{}] VDYP Reader initialized with line range: {} - {} (size: {})", partitionName, startLine, endLine,
- partitionSize
- );
-
- }
-
- @Override
- public void open(@NonNull ExecutionContext executionContext) throws ItemStreamException {
- if (!readerOpened) {
- delegate.open(executionContext);
- readerOpened = true;
- logger.info(
- "[{}] VDYP Reader opened successfully for line range {}-{} (total range: {} lines)", partitionName,
- startLine, endLine, (endLine - startLine + 1)
- );
- }
- }
-
- @Override
- public void update(@NonNull ExecutionContext executionContext) throws ItemStreamException {
- delegate.update(executionContext);
- }
-
- @Override
- public void close() throws ItemStreamException {
- if (readerOpened) {
- delegate.close();
- readerOpened = false;
-
- long totalReaderSkips = totalSkipsInReader.get();
- logger.info("[{}] VDYP Reader closed. Final statistics:", partitionName);
- logger.info(" - VDYP records processed: {}", processedCount);
- logger.info(" - Partition boundary skips: {}", skippedCount);
- logger.info(" - Data quality skips: {}", totalReaderSkips);
- logger.info(" - Total VDYP records examined: {}", processedCount + skippedCount + totalReaderSkips);
-
- if (totalReaderSkips > 0) {
- logger.info("[{}] VDYP data quality skip breakdown:", partitionName);
- skipReasonCounts.forEach((reason, count) -> logger.info(" - {}: {}", reason, count.get()));
- }
- }
- }
-
- /**
- * Reads the next BatchRecord that falls within the partition's line range.
- */
- @Override
- public BatchRecord read() throws ItemStreamException {
- if (!readerOpened) {
- open(new ExecutionContext());
- }
-
- if (rangeExhausted) {
- return null;
- }
-
- return readNextValidRecord();
- }
-
- /**
- * Reads the next valid record within the partition range.
- */
- private BatchRecord readNextValidRecord() throws ItemStreamException {
- while (true) {
- try {
- BatchRecord batchRecord = delegate.read();
- currentLine++;
-
- if (batchRecord == null) {
- return handleEndOfFile();
- }
-
- BatchRecord processedRecord = processRecordWithinRange(batchRecord);
- if (processedRecord != null) {
- return processedRecord;
- }
- // Continue to next record if not in range or invalid
-
- } catch (FlatFileParseException e) {
- handleSkipEvent(e, "VDYP_FILE_PARSE_ERROR", currentLine);
- } catch (IllegalArgumentException e) {
- handleSkipEvent(e, "VDYP_DATA_VALIDATION_ERROR", currentLine);
- } catch (Exception e) {
- handleSkipEvent(e, "VDYP_READER_ERROR", currentLine);
- throw new ItemStreamException("Error reading VDYP record at line " + currentLine, e);
- }
- }
- }
-
- /**
- * Handles end of file scenario.
- */
- private BatchRecord handleEndOfFile() {
- logger.info("[{}] End of VDYP file reached at line {}", partitionName, currentLine - 1);
- rangeExhausted = true;
- logFinalStatistics();
- return null;
- }
-
- /**
- * Processes a record checking if it's within the partition range.
- */
- private BatchRecord processRecordWithinRange(BatchRecord batchRecord) throws IllegalArgumentException {
- if (currentLine < startLine) {
- skippedCount++;
- return null; // Not in range yet
- }
-
- if (currentLine > endLine) {
- return handleEndOfRange();
- }
-
- // Within range - validate and process
- return processVdypRecord(batchRecord);
- }
-
- /**
- * Handles when passed the end of the partition range.
- */
- private BatchRecord handleEndOfRange() {
- if (!rangeExhausted) {
- rangeExhausted = true;
- logger.info(
- "[{}] Reached end of VDYP partition line range at line {}. Stopping reading.", partitionName,
- currentLine
- );
- logFinalStatistics();
- }
- return null;
- }
-
- /**
- * Process a successfully read data record, applying data validation.
- */
- private BatchRecord processVdypRecord(BatchRecord batchRecord) throws IllegalArgumentException {
- Long recordId = batchRecord.getId();
-
- if (recordId == null) {
- handleDataQualitySkip(batchRecord, "NULL_ID", "VDYP record has null ID");
- return null;
- }
-
- // Validate record data quality
- validateVdypRecordData(batchRecord);
-
- // Record is within line range and valid
- processedCount++;
-
- // Log first data record found in partition range
- if (processedCount == 1) {
- logger.info(
- "[{}] Found first VDYP record in partition range: line {}, ID {}", partitionName, currentLine,
- recordId
- );
- }
-
- return batchRecord;
- }
-
- /**
- * Validate record data quality and handle data-related skip events.
- */
- private void validateVdypRecordData(BatchRecord batchRecord) throws IllegalArgumentException {
- Long recordId = batchRecord.getId();
-
- if (batchRecord.getData() == null || batchRecord.getData().trim().isEmpty()) {
- handleDataQualitySkip(batchRecord, "MISSING_VDYP_DATA", "VDYP data field is missing or empty");
- throw new IllegalArgumentException("Missing required VDYP data field for record ID " + recordId);
- }
-
- if (batchRecord.getPolygonId() == null || batchRecord.getPolygonId().trim().isEmpty()) {
- handleDataQualitySkip(batchRecord, "MISSING_POLYGON_ID", "Polygon ID is missing or empty");
- throw new IllegalArgumentException("Missing required polygon ID for record ID " + recordId);
- }
-
- if (batchRecord.getLayerId() == null || batchRecord.getLayerId().trim().isEmpty()) {
- handleDataQualitySkip(batchRecord, "MISSING_LAYER_ID", "Layer ID is missing or empty");
- throw new IllegalArgumentException("Missing required layer ID for record ID " + recordId);
- }
- }
-
- /**
- * Handle skip events from file parsing errors.
- */
- private void handleSkipEvent(Exception exception, String skipReason, Long lineNumber) {
- totalSkipsInReader.incrementAndGet();
- skipReasonCounts.computeIfAbsent(skipReason, k -> new AtomicLong(0)).incrementAndGet();
-
- if (metricsCollector != null && jobExecutionId != null) {
- BatchRecord errorRecord = new BatchRecord();
- if (lineNumber != null) {
- errorRecord.setId(lineNumber);
- }
-
- metricsCollector
- .recordSkip(jobExecutionId, errorRecord.getId(), errorRecord, exception, partitionName, lineNumber);
- }
-
- logger.warn(
- "[{}] VDYP Skip event: {} at line {} - {}", partitionName, skipReason,
- lineNumber != null ? lineNumber.toString() : UNKNOWN, exception.getMessage()
- );
- }
-
- /**
- * Handle skip events from VDYP data quality issues.
- */
- private void handleDataQualitySkip(BatchRecord batchRecord, String skipReason, String description) {
- totalSkipsInReader.incrementAndGet();
- skipReasonCounts.computeIfAbsent(skipReason, k -> new AtomicLong(0)).incrementAndGet();
-
- if (batchRecord != null) {
- BatchSkipPolicy.cacheRecordData(batchRecord.getId(), batchRecord, Thread.currentThread().getName());
- }
-
- if (metricsCollector != null && jobExecutionId != null && batchRecord != null) {
- IllegalArgumentException dataQualityException = new IllegalArgumentException(description);
- Long lineNumber = batchRecord.getId() != null ? batchRecord.getId() + 1 : null;
-
- metricsCollector.recordSkip(
- jobExecutionId, batchRecord.getId(), batchRecord, dataQualityException, partitionName, lineNumber
- );
- }
-
- logger.warn(
- "[{}] VDYP Data quality skip: {} for record ID {} - {}", partitionName, skipReason,
- batchRecord != null ? batchRecord.getId() : UNKNOWN, description
- );
- }
-
- /**
- * Log final statistics when reading is complete.
- */
- private void logFinalStatistics() {
- long totalReaderSkips = totalSkipsInReader.get();
- logger.info(
- "[{}] VDYP Reader completed. Processed: {}, Partition boundary skips: {}, Data quality skips: {}",
- partitionName, processedCount, skippedCount, totalReaderSkips
- );
-
- if (totalReaderSkips > 0) {
- logger.info("[{}] VDYP Skip breakdown by reason:", partitionName);
- skipReasonCounts.forEach((reason, count) -> logger.info(" - {}: {}", reason, count.get()));
- }
- }
-
- public ConcurrentMap getSkipStatistics() {
- return new ConcurrentHashMap<>(skipReasonCounts);
- }
-
- public long getTotalDataSkips() {
- return totalSkipsInReader.get();
- }
-
- public long getTotalRangeSkips() {
- return skippedCount;
- }
-
- public long getTotalProcessed() {
- return processedCount;
- }
-
- public String getPartitionName() {
- return partitionName;
- }
-}
\ No newline at end of file
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypChunkProjectionWriter.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypChunkProjectionWriter.java
new file mode 100644
index 000000000..7f0387de5
--- /dev/null
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypChunkProjectionWriter.java
@@ -0,0 +1,223 @@
+package ca.bc.gov.nrs.vdyp.batch.configuration;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.lang.NonNull;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import ca.bc.gov.nrs.vdyp.batch.model.BatchRecord;
+import ca.bc.gov.nrs.vdyp.batch.service.BatchMetricsCollector;
+import ca.bc.gov.nrs.vdyp.batch.service.VdypProjectionService;
+import ca.bc.gov.nrs.vdyp.ecore.model.v1.Parameters;
+
+/**
+ * Chunk-based ItemWriter that processes multiple BatchRecords together for
+ * improved performance. This writer implements the efficient chunk-based
+ * projection strategy where multiple FEATURE_IDs are processed in a single VDYP
+ * projection operation.
+ */
+public class VdypChunkProjectionWriter implements ItemWriter, StepExecutionListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(VdypChunkProjectionWriter.class);
+
+ private final VdypProjectionService vdypProjectionService;
+ private final BatchMetricsCollector metricsCollector;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ // Step execution context
+ private String partitionName = "unknown";
+ private Long jobExecutionId;
+ private Parameters projectionParameters;
+
+ public VdypChunkProjectionWriter(VdypProjectionService vdypProjectionService,
+ BatchMetricsCollector metricsCollector) {
+ this.vdypProjectionService = vdypProjectionService;
+ this.metricsCollector = metricsCollector;
+ }
+
+ @Override
+ public void beforeStep(StepExecution stepExecution) {
+ logger.info("[{}] VdypChunkProjectionWriter.beforeStep() called", partitionName);
+ this.jobExecutionId = stepExecution.getJobExecutionId();
+ this.partitionName = stepExecution.getExecutionContext().getString("partitionName", "unknown");
+
+ // Debug: Log all available job parameters
+ logger.debug("[{}] Available job parameters: {}", partitionName,
+ stepExecution.getJobParameters().getParameters().keySet());
+
+ // Get projection parameters from job parameters (serialized as JSON)
+ String parametersJson = stepExecution.getJobParameters().getString("projectionParametersJson");
+
+ logger.debug("[{}] Retrieved projectionParametersJson: {} (length: {})",
+ partitionName, parametersJson != null ? "not null" : "null",
+ parametersJson != null ? parametersJson.length() : 0);
+
+ if (parametersJson == null || parametersJson.trim().isEmpty()) {
+ logger.error("[{}] VDYP projection parameters not found in job parameters. Available parameters: {}",
+ partitionName, stepExecution.getJobParameters().getParameters());
+ throw new IllegalStateException(
+ "VDYP projection parameters not found in job parameters. Parameters must be provided in BatchJobRequest.");
+ }
+
+ try {
+ this.projectionParameters = objectMapper.readValue(parametersJson, Parameters.class);
+ logger.info("[{}] VdypChunkProjectionWriter initialized with projection parameters. Parameters null: {}",
+ partitionName, this.projectionParameters == null);
+
+ if (this.projectionParameters != null) {
+ logger.debug("[{}] Projection parameters loaded successfully: selectedExecutionOptions={}",
+ partitionName,
+ this.projectionParameters.getSelectedExecutionOptions() != null
+ ? this.projectionParameters.getSelectedExecutionOptions().size()
+ : "null");
+ } else {
+ logger.error("[{}] Projection parameters deserialized to null from JSON: {}",
+ partitionName, parametersJson);
+ throw new IllegalStateException("Deserialized projection parameters are null");
+ }
+ } catch (JsonProcessingException jsonException) {
+ throw handleParameterDeserializationFailure(
+ parametersJson, jsonException, "JSON parsing failed during parameter deserialization");
+ } catch (Exception generalException) {
+ throw handleParameterDeserializationFailure(
+ parametersJson, generalException, "Unexpected error during parameter deserialization");
+ }
+ }
+
+ @Override
+ public ExitStatus afterStep(StepExecution stepExecution) {
+ logger.info("[{}] VdypChunkProjectionWriter.afterStep() called", partitionName);
+ return stepExecution.getExitStatus();
+ }
+
+ @Override
+ public void write(@NonNull Chunk extends BatchRecord> chunk) throws Exception {
+ if (chunk.isEmpty()) {
+ logger.debug("[{}] Empty chunk received, skipping", partitionName);
+ return;
+ }
+
+ List batchRecords = chunk.getItems().stream()
+ .collect(Collectors.toList());
+
+ // Get actual partition name from the first BatchRecord if available
+ String actualPartitionName = partitionName;
+ if (!batchRecords.isEmpty() && batchRecords.get(0).getPartitionName() != null) {
+ actualPartitionName = batchRecords.get(0).getPartitionName();
+ }
+
+ logger.info("[{}] Processing chunk of {} records using VdypProjectionService",
+ actualPartitionName, batchRecords.size());
+
+ try {
+ // Validate projection parameters before processing
+ if (projectionParameters == null) {
+ throw new IllegalStateException(
+ "VDYP projection parameters are null. Cannot perform chunk projection.");
+ }
+
+ // Perform chunk-based projection
+ String chunkResult = vdypProjectionService.performProjectionForChunk(
+ batchRecords, actualPartitionName, projectionParameters);
+
+ // Record metrics for successful chunk processing
+ recordChunkMetrics(batchRecords, actualPartitionName, true, null);
+
+ logger.info("[{}] Successfully processed chunk of {} records. Result: {}",
+ actualPartitionName, batchRecords.size(), chunkResult);
+
+ } catch (RuntimeException runtimeException) {
+ throw handleChunkProcessingFailure(
+ batchRecords, actualPartitionName, runtimeException, "Runtime error during chunk processing");
+ } catch (Exception generalException) {
+ throw handleChunkProcessingFailure(
+ batchRecords, actualPartitionName, generalException, "Unexpected error during chunk processing");
+ }
+ }
+
+ /**
+ * Records metrics for chunk processing results.
+ */
+ private void recordChunkMetrics(List batchRecords, String actualPartitionName, boolean success,
+ Exception error) {
+ if (metricsCollector != null && jobExecutionId != null) {
+ for (BatchRecord batchRecord : batchRecords) {
+ try {
+ Long recordIdHash = batchRecord.getFeatureId() != null
+ ? (long) batchRecord.getFeatureId().hashCode()
+ : 0L;
+
+ if (success) {
+ // Record successful processing
+ logger.trace("[{}] Recording successful processing for FEATURE_ID: {}",
+ actualPartitionName, batchRecord.getFeatureId());
+ } else {
+ // Record processing failure
+ metricsCollector.recordSkip(jobExecutionId, recordIdHash, batchRecord,
+ error, actualPartitionName, null);
+ }
+ } catch (Exception metricsException) {
+ logger.warn("[{}] Failed to record metrics for FEATURE_ID: {} - {}",
+ actualPartitionName, batchRecord.getFeatureId(), metricsException.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Handles parameter deserialization failures by logging and creating
+ * appropriate exception.
+ */
+ private IllegalStateException handleParameterDeserializationFailure(
+ String parametersJson, Exception cause, String errorDescription) {
+ // Create enhanced contextual message
+ String contextualMessage = String.format(
+ "[%s] %s. JSON length: %d, Exception type: %s, Root cause: %s",
+ partitionName, errorDescription,
+ parametersJson != null ? parametersJson.length() : 0,
+ cause.getClass().getSimpleName(),
+ cause.getMessage() != null ? cause.getMessage() : "No error message available");
+
+ // Log the failure with full context
+ logger.error(contextualMessage, cause);
+
+ return new IllegalStateException(contextualMessage, cause);
+ }
+
+ /**
+ * Handles chunk processing failures by logging, recording metrics, and creating
+ * appropriate exception.
+ */
+ private RuntimeException handleChunkProcessingFailure(
+ java.util.List batchRecords, String actualPartitionName,
+ Exception cause, String errorDescription) {
+ // Create enhanced contextual message
+ String contextualMessage = String.format(
+ "[%s] %s. Chunk size: %d, Exception type: %s, Root cause: %s",
+ actualPartitionName, errorDescription, batchRecords.size(),
+ cause.getClass().getSimpleName(),
+ cause.getMessage() != null ? cause.getMessage() : "No error message available");
+
+ // Log the failure with full context
+ logger.error(contextualMessage, cause);
+
+ // Record metrics for failed chunk processing
+ recordChunkMetrics(batchRecords, actualPartitionName, false, cause);
+
+ if (cause instanceof RuntimeException runtimeException) {
+ return runtimeException;
+ } else {
+ return new RuntimeException(contextualMessage, cause);
+ }
+ }
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypProjectionProcessor.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypProjectionProcessor.java
index 4c8fd318f..3c09aa474 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypProjectionProcessor.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/configuration/VdypProjectionProcessor.java
@@ -11,8 +11,6 @@
import org.springframework.lang.NonNull;
import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
public class VdypProjectionProcessor implements ItemProcessor {
@@ -23,10 +21,6 @@ public class VdypProjectionProcessor implements ItemProcessor retriedRecords = ConcurrentHashMap.newKeySet();
// Validation thresholds
@Value("${batch.validation.max-data-length:50000}")
@@ -38,7 +32,8 @@ public class VdypProjectionProcessor implements ItemProcessor maxDataLength) {
- throw new IllegalArgumentException(
- String.format(
- "VDYP data field too long for record ID %d (length: %d, max: %d)", recordId,
- batchRecord.getData().length(), maxDataLength
- )
- );
- }
-
- String polygonId = batchRecord.getPolygonId();
- if (polygonId.length() < minPolygonIdLength || polygonId.length() > maxPolygonIdLength) {
- throw new IllegalArgumentException(
- String.format(
- "Invalid polygon ID length for record ID %d (length: %d)", recordId, polygonId.length()
- )
- );
- }
- }
-
- /**
- * Perform VDYP projection processing with proper error handling.
- *
- * This method handles both retryable errors (IOException) and non-retryable validation errors.
- */
- private String performVdypProjectionWithErrorHandling(BatchRecord batchRecord)
- throws IOException, IllegalArgumentException {
- try {
- String result = performVdypProjection(batchRecord);
- return validateProjectionResult(result, batchRecord.getId());
- } catch (Exception e) {
- handleProjectionException(e, batchRecord);
- reclassifyAndThrowException(e, batchRecord.getId());
- return null;
- }
- }
-
- /**
- * Validates the projection result and throws IOException if empty.
- */
- private String validateProjectionResult(String result, Long recordId) throws IOException {
- if (result == null || result.trim().isEmpty()) {
- throw new IOException(String.format("VDYP projection returned empty result for record ID %d", recordId));
- }
- return result;
- }
-
- /**
- * Handles exception by recording appropriate metrics.
- */
- private void handleProjectionException(Exception e, BatchRecord batchRecord) {
- if (metricsCollector != null && jobExecutionId != null) {
- if (isRetryableException(e)) {
- metricsCollector.recordRetryAttempt(
- jobExecutionId, batchRecord.getId(), batchRecord, 1, e, false, partitionName
- );
- } else {
- metricsCollector.recordSkip(jobExecutionId, batchRecord.getId(), batchRecord, e, partitionName, null);
- }
- }
- }
-
- /**
- * Determines if an exception should be retried.
- */
- private boolean isRetryableException(Exception e) {
- return e instanceof IOException || (e instanceof RuntimeException && isTransientError(e));
- }
-
- /**
- * Reclassifies and throws exceptions for proper Spring Batch handling.
- */
- private void reclassifyAndThrowException(Exception e, Long recordId) throws IOException, IllegalArgumentException {
- if (e instanceof IOException ioException) {
- throw ioException;
- }
-
- if (e instanceof IllegalArgumentException illegalArgException) {
- throw illegalArgException;
- }
-
- if (e instanceof RuntimeException && isTransientError(e)) {
- throw new IOException("Transient error during VDYP projection for record ID " + recordId, e);
- }
-
- // Unknown errors treated as data quality issues
- throw new IllegalArgumentException(
- "VDYP projection failed for record ID " + recordId + ": " + e.getMessage(), e
- );
- }
-
- /**
- * Determine if a runtime exception represents a transient error that should be retried.
- */
- private boolean isTransientError(Exception e) {
- String message = e.getMessage() != null ? e.getMessage().toLowerCase() : "";
- String className = e.getClass().getSimpleName().toLowerCase();
-
- return hasTransientMessagePattern(message) || hasTransientClassNamePattern(className);
- }
-
- /**
- * Checks if error message contains transient error patterns.
- */
- private boolean hasTransientMessagePattern(String message) {
- return message.contains("timeout") || message.contains("connection") || message.contains("network")
- || message.contains("temporary") || message.contains("unavailable");
- }
-
- /**
- * Checks if class name contains transient error patterns.
- */
- private boolean hasTransientClassNamePattern(String className) {
- return className.contains("timeout") || className.contains("connection");
+ return batchRecord;
}
- /**
- * This is a placeholder implementation that will be replaced with actual VDYP extended core service calls.
- *
- * @param batchRecord The VDYP record containing polygon and layer information
- * @return Projection result string
- */
- private String performVdypProjection(BatchRecord batchRecord) throws IOException {
- String polygonId = batchRecord.getPolygonId();
- String layerId = batchRecord.getLayerId();
- String data = batchRecord.getData();
-
- try {
- Thread.sleep(10); // Minimal delay to simulate processing
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Processing interrupted for record ID " + batchRecord.getId(), e);
- }
-
- return String.format(
- "PROJECTED[P:%s,L:%s,Data:%s]", polygonId != null ? polygonId : "N/A",
- layerId != null ? layerId : "N/A",
- data != null && data.length() > 10 ? data.substring(0, 10) + "..." : data
- );
- }
-}
\ No newline at end of file
+}
diff --git a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/controller/BatchController.java b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/controller/BatchController.java
index ab2e24871..999803d26 100644
--- a/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/controller/BatchController.java
+++ b/batch/src/main/java/ca/bc/gov/nrs/vdyp/batch/controller/BatchController.java
@@ -1,20 +1,45 @@
package ca.bc.gov.nrs.vdyp.batch.controller;
-import ca.bc.gov.nrs.vdyp.batch.model.BatchMetrics;
-import ca.bc.gov.nrs.vdyp.batch.service.BatchMetricsCollector;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.batch.core.*;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobInstance;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
-import java.time.Duration;
-import java.util.*;
+import ca.bc.gov.nrs.vdyp.batch.model.BatchMetrics;
+import ca.bc.gov.nrs.vdyp.batch.service.BatchMetricsCollector;
+import ca.bc.gov.nrs.vdyp.batch.service.StreamingCsvPartitioner;
+import ca.bc.gov.nrs.vdyp.batch.util.Utils;
+import ca.bc.gov.nrs.vdyp.ecore.api.v1.exceptions.ProjectionRequestValidationException;
+import ca.bc.gov.nrs.vdyp.ecore.model.v1.ValidationMessage;
+import ca.bc.gov.nrs.vdyp.ecore.model.v1.ValidationMessageKind;
@RestController
@RequestMapping("/api/batch")
@@ -37,32 +62,49 @@ public class BatchController {
private final Job partitionedJob;
private final JobExplorer jobExplorer;
private final BatchMetricsCollector metricsCollector;
+ private final StreamingCsvPartitioner csvPartitioner;
+
+ @Value("${batch.input.directory.default-path}")
+ private String inputBasePath;
+
+ @Value("${batch.output.directory.default-path}")
+ private String outputBasePath;
public BatchController(
- JobLauncher jobLauncher, Job partitionedJob, JobExplorer jobExplorer, BatchMetricsCollector metricsCollector
- ) {
+ JobLauncher jobLauncher, Job partitionedJob, JobExplorer jobExplorer,
+ BatchMetricsCollector metricsCollector, StreamingCsvPartitioner csvPartitioner) {
this.jobLauncher = jobLauncher;
this.partitionedJob = partitionedJob;
this.jobExplorer = jobExplorer;
this.metricsCollector = metricsCollector;
+ this.csvPartitioner = csvPartitioner;
}
/**
- * Start a new batch job execution with configuration options.
+ * Start a new batch job execution with uploaded CSV files.
*
- * @param request Optional configuration parameters for the batch job
+ * @param polygonFile CSV file containing polygon data
+ * @param layerFile CSV file containing layer data
+ * @param partitionSize Number of partitions (optional, default from config)
+ * @param parametersJson JSON string containing VDYP projection parameters
* @return ResponseEntity containing job execution details and metrics endpoint
*/
- @PostMapping("/start")
- public ResponseEntity