diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java index 0b841087..ece84d3d 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceBulkUtil.java @@ -32,6 +32,7 @@ import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; import com.sforce.ws.SessionRenewer; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil { * @throws AsyncApiException if there is an issue creating the job */ - public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum, - @Nullable String externalIdField, + public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnection, String sObject, + OperationEnum operationEnum, @Nullable String externalIdField, ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException { JobInfo job = new JobInfo(); job.setObject(sObject); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java index 653026c3..886052c0 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java @@ -27,6 +27,8 @@ import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; +import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +57,11 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { .put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString()) .put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString()) .put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString()) - .put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString()); + .put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString()) + .put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, Long.toString(config.getInitialRetryDuration())) + .put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, Long.toString(config.getMaxRetryDuration())) + .put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, Integer.toString(config.getMaxRetryCount())) + .put(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, Boolean.toString(config.isRetryRequired())); if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) { configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl()); @@ -83,7 +89,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { try { BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); - JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(), + BulkConnectionRetryWrapper retryWrapper = new BulkConnectionRetryWrapper(bulkConnection, config.isRetryRequired(), + config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount()); + JobInfo job = SalesforceBulkUtil.createJob(retryWrapper, config.getSObject(), config.getOperationEnum(), config.getExternalIdField(), config.getConcurrencyModeEnum(), ContentType.ZIP_CSV); configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId()); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java index 73c2ec72..7abd3586 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java @@ -41,6 +41,7 @@ import io.cdap.plugin.salesforce.plugin.SalesforceConnectorBaseConfig; import io.cdap.plugin.salesforce.plugin.SalesforceConnectorInfo; import io.cdap.plugin.salesforce.plugin.connector.SalesforceConnectorConfig; +import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,6 +153,26 @@ public class SalesforceSinkConfig extends ReferencePluginConfig { @Description("Whether to validate the field data types of the input schema as per Salesforce specific data types") private final Boolean datatypeValidation; + @Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION) + @Description("Time taken for the first retry. Default is 5 seconds.") + @Nullable + private Long initialRetryDuration; + + @Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION) + @Description("Maximum time in seconds retries can take. Default is 80 seconds.") + @Nullable + private Long maxRetryDuration; + + @Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT) + @Description("Maximum number of retries allowed. Default is 5.") + @Nullable + private Integer maxRetryCount; + + @Name(SalesforceSourceConstants.PROPERTY_RETRY_REQUIRED) + @Description("Retry is required or not for some of the internal call failures.") + @Nullable + private Boolean retryOnBackendError; + public SalesforceSinkConfig(String referenceName, @Nullable String clientId, @Nullable String clientSecret, @@ -277,6 +298,23 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException { return partnerConnection.getUserInfo().getOrganizationId(); } + public boolean isRetryRequired() { + return retryOnBackendError == null || retryOnBackendError; + } + + public long getInitialRetryDuration() { + return initialRetryDuration == null ? SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS : + initialRetryDuration; + } + + public long getMaxRetryDuration() { + return maxRetryDuration == null ? SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration; + } + + public int getMaxRetryCount() { + return maxRetryCount == null ? SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT : maxRetryCount; + } + public void validate(Schema schema, FailureCollector collector, @Nullable OAuthInfo oAuthInfo) { if (connection != null) { getConnection().validate(collector, oAuthInfo); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java index f668e1fe..b8dd1434 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java @@ -39,6 +39,7 @@ import io.cdap.plugin.salesforce.SalesforceConstants; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; import java.util.ArrayList; @@ -103,8 +104,11 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException { String sObjectNameField = config.getSObjectNameField(); authenticatorCredentials = config.getConnection().getAuthenticatorCredentials(); BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, + config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), + config.getMaxRetryCount()); List querySplits = queries.parallelStream() - .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(), + .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, false, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount(), config.isRetryRequired())) .flatMap(Collection::stream).collect(Collectors.toList()); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java index cc8424c7..59c75586 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java @@ -45,6 +45,7 @@ import io.cdap.plugin.salesforce.SalesforceSchemaUtil; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.OAuthInfo; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; import org.apache.hadoop.mapreduce.RecordReader; @@ -165,7 +166,10 @@ public static List getSplits( bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK, String.join(";", chunkHeaderValues)); } - List querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection, + BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, + config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), + config.getMaxRetryCount()); + List querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount(), config.isRetryRequired()); return querySplits; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java index bbf876d8..d315347a 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReader.java @@ -28,9 +28,9 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.salesforce.BulkAPIBatchException; import io.cdap.plugin.salesforce.SalesforceConnectionUtil; -import io.cdap.plugin.salesforce.SalesforceConstants; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; +import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; @@ -78,13 +78,15 @@ public class SalesforceBulkRecordReader extends RecordReader getQueryResultStream(bulkConnection)); - } else { - queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); - } - + final InputStream queryResponseStream = bulkConnectionRetryWrapper + .getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]); CSVFormat csvFormat = CSVFormat.DEFAULT .withHeader() .withQuoteMode(QuoteMode.ALL) diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java new file mode 100644 index 00000000..e2e1c5cb --- /dev/null +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/BulkConnectionRetryWrapper.java @@ -0,0 +1,208 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.salesforce.plugin.source.batch.util; + +import com.sforce.async.AsyncApiException; +import com.sforce.async.BatchInfo; +import com.sforce.async.BatchInfoList; +import com.sforce.async.BulkConnection; +import com.sforce.async.JobInfo; +import com.sforce.ws.ConnectorConfig; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceBulkRecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * BulkConnectionRetryWrapper class to retry all the salesforce api calls in case of failure. + */ +public class BulkConnectionRetryWrapper { + + private final BulkConnection bulkConnection; + private final RetryPolicy retryPolicy; + private static final Logger LOG = LoggerFactory.getLogger(BulkConnectionRetryWrapper.class); + private final boolean retryOnBackendError; + private final long maxRetryDuration; + private final int maxRetryCount; + private final long initialRetryDuration; + + public BulkConnectionRetryWrapper(BulkConnection bulkConnection, boolean isRetryRequired, + long initialRetryDuration, long maxRetryDuration, int maxRetryCount) { + this.bulkConnection = bulkConnection; + this.retryOnBackendError = isRetryRequired; + this.initialRetryDuration = initialRetryDuration; + this.maxRetryDuration = maxRetryDuration; + this.maxRetryCount = maxRetryCount; + this.retryPolicy = SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount); + } + + public JobInfo createJob(JobInfo jobInfo) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.createJob(jobInfo); + } + Object resultJobInfo = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while creating job.")) + .get(() -> { + try { + return bulkConnection.createJob(jobInfo); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (JobInfo) resultJobInfo; + } + + public JobInfo getJobStatus(String jobId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getJobStatus(jobId); + } + Object resultJobInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting job status.")) + .get(() -> { + try { + return bulkConnection.getJobStatus(jobId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (JobInfo) resultJobInfo; + } + + public void updateJob(JobInfo jobInfo) throws AsyncApiException { + if (!retryOnBackendError) { + bulkConnection.updateJob(jobInfo); + return; + } + Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while updating job.")) + .get(() -> { + try { + return bulkConnection.updateJob(jobInfo); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + } + + public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getBatchInfoList(jobId); + } + Object batchInfoList = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch info list.")) + .get(() -> { + try { + return bulkConnection.getBatchInfoList(jobId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfoList) batchInfoList; + } + + public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getBatchInfo(jobId, batchId); + } + Object batchInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch status.")) + .get(() -> { + try { + return bulkConnection.getBatchInfo(jobId, batchId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfo) batchInfo; + } + + public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getBatchResultStream(jobId, batchId); + } + Object inputStream = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting batch result stream.")) + .get(() -> { + try { + return bulkConnection.getBatchResultStream(jobId, batchId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (InputStream) inputStream; + } + + public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException { + if (!retryOnBackendError) { + return bulkConnection.getQueryResultStream(jobId, batchId, resultId); + } + Object inputStream = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while getting query result stream.")) + .get(() -> { + try { + return bulkConnection.getQueryResultStream(jobId, batchId, resultId); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (InputStream) inputStream; + } + + public BatchInfo createBatchFromStream(String query, JobInfo job) throws AsyncApiException, + SalesforceQueryExecutionException, IOException { + if (!retryOnBackendError) { + return createBatchFromStreamI(query, job); + } + Object batchInfo = Failsafe.with(retryPolicy) + .onFailure(event -> LOG.info("Failed while creating batch from stream.")) + .get(() -> { + try { + return createBatchFromStreamI(query, job); + } catch (AsyncApiException e) { + throw new SalesforceQueryExecutionException(e.getMessage()); + } + }); + return (BatchInfo) batchInfo; + } + + private BatchInfo createBatchFromStreamI(String query, JobInfo job) throws + SalesforceQueryExecutionException, IOException, AsyncApiException { + BatchInfo batchInfo = null; + try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { + batchInfo = bulkConnection.createBatchFromStream(job, bout); + } catch (AsyncApiException exception) { + LOG.warn("The bulk query job {} failed. Job State: {}.", job.getId(), job.getState()); + if (SalesforceBulkRecordReader.RETRY_ON_REASON.contains(exception.getExceptionCode())) { + throw new SalesforceQueryExecutionException(exception); + } + throw exception; + } + return batchInfo; + } + + public ConnectorConfig getConfig() { + return bulkConnection.getConfig(); + } + + public BulkConnection getBukConnection() { + return bulkConnection; + } +} diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index 6223c292..540d9fd0 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -65,7 +65,7 @@ public final class SalesforceSplitUtil { * @param enablePKChunk indicates if pk chunking is enabled * @return list of salesforce splits */ - public static List getQuerySplits(String query, BulkConnection bulkConnection, + public static List getQuerySplits(String query, BulkConnectionRetryWrapper bulkConnection, boolean enablePKChunk, String operation, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) { @@ -85,7 +85,7 @@ public static List getQuerySplits(String query, BulkConnection * @param enablePKChunk enable PK Chunking * @return array of batch info */ - private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection, + private static BatchInfo[] getBatches(String query, BulkConnectionRetryWrapper bulkConnection, boolean enablePKChunk, String operation, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) { @@ -114,7 +114,7 @@ private static BatchInfo[] getBatches(String query, BulkConnection bulkConnectio * @throws AsyncApiException if there is an issue creating the job * @throws IOException failed to close the query */ - private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, + private static BatchInfo[] runBulkQuery(BulkConnectionRetryWrapper bulkConnection, String query, boolean enablePKChunk, String operation, Long initialRetryDuration, Long maxRetryDuration, Integer maxRetryCount, Boolean retryOnBackendError) @@ -123,17 +123,9 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query); JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), getOperationEnum(operation), null, ConcurrencyMode.Parallel, ContentType.CSV); - final BatchInfo batchInfo; + BatchInfo batchInfo; try { - if (retryOnBackendError) { - batchInfo = - Failsafe.with(getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount)) - .get(() -> createBatchFromStream(bulkConnection, query, job)); - } else { - try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { - batchInfo = bulkConnection.createBatchFromStream(job, bout); - } - } + batchInfo = bulkConnection.createBatchFromStream(query, job); if (enablePKChunk) { LOG.debug("PKChunking is enabled"); return waitForBatchChunks(bulkConnection, job.getId(), batchInfo.getId()); @@ -155,23 +147,11 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu throw (AsyncApiException) e.getCause(); } throw e; + } catch (SalesforceQueryExecutionException e) { + throw new RuntimeException(e); } } - private static BatchInfo createBatchFromStream(BulkConnection bulkConnection, String query, JobInfo job) throws - SalesforceQueryExecutionException, IOException, AsyncApiException { - BatchInfo batchInfo = null; - try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { - batchInfo = bulkConnection.createBatchFromStream(job, bout); - } catch (AsyncApiException exception) { - LOG.warn("The bulk query job {} failed. Job State: {}", job.getId(), job.getState()); - if (SalesforceBulkRecordReader.RETRY_ON_REASON.contains(exception.getExceptionCode())) { - throw new SalesforceQueryExecutionException(exception); - } - throw exception; - } - return batchInfo; - } /** * Initializes bulk connection based on given Hadoop credentials configuration. @@ -198,7 +178,8 @@ public static BulkConnection getBulkConnection(AuthenticatorCredentials authenti * @return Array with Batches created by Salesforce API * @throws AsyncApiException if there is an issue creating the job */ - private static BatchInfo[] waitForBatchChunks(BulkConnection bulkConnection, String jobId, String initialBatchId) + private static BatchInfo[] waitForBatchChunks(BulkConnectionRetryWrapper bulkConnection, + String jobId, String initialBatchId) throws AsyncApiException { BatchInfo initialBatchInfo = null; for (int i = 0; i < SalesforceSourceConstants.GET_BATCH_RESULTS_TRIES; i++) { @@ -269,8 +250,11 @@ public static RetryPolicy getRetryPolicy(Long initialRetryDuration, Long .handle(SalesforceQueryExecutionException.class) .withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration), 2) .withMaxRetries(maxRetryCount) - .onRetry(event -> LOG.debug("Retrying Salesforce Bulk Query. Retry count: {}", event - .getAttemptCount())) + .onRetry(event -> { + Throwable t = event.getLastException(); + LOG.warn("Attempt #{} failed while executing job with error: {}", event.getAttemptCount(), t.getMessage(), t); + LOG.debug("Retrying Salesforce Bulk Query. Retry count: {}", event.getAttemptCount()); + }) .onSuccess(event -> LOG.debug("Salesforce Bulk Query executed successfully.")) .onRetriesExceeded(event -> LOG.error("Retry limit reached for Salesforce Bulk Query.")) .build(); diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java index 3491c140..0975388d 100644 --- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java @@ -292,8 +292,8 @@ private void assertRecordReaderOutputRecords(String[] csvStrings, Schema schema, resultIds[i] = String.format("result%d", i); } - SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, resultIds); BulkConnection mock = Mockito.mock(BulkConnection.class); + SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, resultIds, mock); FieldSetter.setField(reader, SalesforceBulkRecordReader.class.getDeclaredField("bulkConnection"), mock); for (int i = 0; i < csvStrings.length; i++) { Mockito.when(mock.getQueryResultStream(jobId, batchId, resultIds[i])) @@ -354,8 +354,8 @@ private void assertRecordReaderOutputRecordsRetryMechanism(String[] csvStrings, resultIds[i] = String.format("result%d", i); } - SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, resultIds); BulkConnection mock = Mockito.mock(BulkConnection.class); + SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, resultIds, mock); FieldSetter.setField(reader, SalesforceBulkRecordReader.class.getDeclaredField("bulkConnection"), mock); for (int i = 0; i < csvStrings.length; i++) { AsyncApiException salesforceQueryExecutionException = @@ -370,7 +370,7 @@ private void assertRecordReaderOutputRecordsRetryMechanism(String[] csvStrings, reader.setupParser(); } - @Test (expected = AsyncApiException.class) + @Test (expected = FailsafeException.class) public void testSetupParserWithoutRetry() throws Exception { String csvString1 = "\"Id\",\"IsDeleted\",\"ExpectedRevenue\",\"LastModifiedDate\",\"CloseDate\",\"Time\"\n" + "\"0061i000003XNcBAAW\",\"false\",\"1500.0\",\"2019-02-22T07:03:21.000Z\",\"2019-01-01\",\"12:00:30.000Z\"\n"; @@ -398,8 +398,8 @@ private void assertRecordReaderOutputRecordsWithoutRetryMechanism(String[] csvSt resultIds[i] = String.format("result%d", i); } - SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, resultIds); BulkConnection mock = Mockito.mock(BulkConnection.class); + SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, resultIds, mock); FieldSetter.setField(reader, SalesforceBulkRecordReader.class.getDeclaredField("bulkConnection"), mock); for (int i = 0; i < csvStrings.length; i++) { AsyncApiException salesforceQueryExecutionException = @@ -480,8 +480,9 @@ public void testSetupParserWithRetrySuccess() throws Exception { resultIds[i] = String.format("result%d", i); } - SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, new String[]{resultId}); BulkConnection mockConnection = Mockito.mock(BulkConnection.class); + SalesforceBulkRecordReader reader = new SalesforceBulkRecordReader(schema, jobId, batchId, new String[]{resultId}, + mockConnection); AsyncApiException salesforceQueryExecutionException = Mockito.mock(AsyncApiException.class); Mockito.when(mockConnection.getQueryResultStream(jobId, batchId, resultId)) diff --git a/widgets/Salesforce-batchsink.json b/widgets/Salesforce-batchsink.json index 4f338743..4c76f186 100644 --- a/widgets/Salesforce-batchsink.json +++ b/widgets/Salesforce-batchsink.json @@ -218,6 +218,49 @@ }, "default": "true" } + }, + { + "widget-type": "hidden", + "label": "Initial Retry Duration", + "name": "initialRetryDuration", + "widget-attributes": { + "min": "1", + "default": "5" + } + }, + { + "widget-type": "hidden", + "label": "Max Retry Duration", + "name": "maxRetryDuration", + "widget-attributes": { + "min": "6", + "default": "80" + } + }, + { + "widget-type": "hidden", + "label": "Max Retry Count", + "name": "maxRetryCount", + "widget-attributes": { + "min": "1", + "default": "5" + } + }, + { + "widget-type": "hidden", + "label": "Retry On Backend Error", + "name": "retryOnBackendError", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "true" + } } ] }