Skip to content

[🍒][PLUGIN-1892] Add BulkConnectionRetryWrapper to make retry calls to salesforce api failures #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.SessionRenewer;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil {
* @throws AsyncApiException if there is an issue creating the job
*/

public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnection, String sObject,
OperationEnum operationEnum, @Nullable String externalIdField,
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
JobInfo job = new JobInfo();
job.setObject(sObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,7 +57,11 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
.put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString())
.put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString())
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString())
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString());
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString())
.put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, Long.toString(config.getInitialRetryDuration()))
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, Long.toString(config.getMaxRetryDuration()))
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, Integer.toString(config.getMaxRetryCount()))
.put(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, Boolean.toString(config.isRetryRequired()));

if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) {
configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl());
Expand Down Expand Up @@ -83,7 +89,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {

try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(),
BulkConnectionRetryWrapper retryWrapper = new BulkConnectionRetryWrapper(bulkConnection, config.isRetryRequired(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount());
JobInfo job = SalesforceBulkUtil.createJob(retryWrapper, config.getSObject(), config.getOperationEnum(),
config.getExternalIdField(), config.getConcurrencyModeEnum(),
ContentType.ZIP_CSV);
configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorBaseConfig;
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorInfo;
import io.cdap.plugin.salesforce.plugin.connector.SalesforceConnectorConfig;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -152,6 +153,26 @@ public class SalesforceSinkConfig extends ReferencePluginConfig {
@Description("Whether to validate the field data types of the input schema as per Salesforce specific data types")
private final Boolean datatypeValidation;

@Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION)
@Description("Time taken for the first retry. Default is 5 seconds.")
@Nullable
private Long initialRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION)
@Description("Maximum time in seconds retries can take. Default is 80 seconds.")
@Nullable
private Long maxRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT)
@Description("Maximum number of retries allowed. Default is 5.")
@Nullable
private Integer maxRetryCount;

@Name(SalesforceSourceConstants.PROPERTY_RETRY_REQUIRED)
@Description("Retry is required or not for some of the internal call failures.")
@Nullable
private Boolean retryOnBackendError;

public SalesforceSinkConfig(String referenceName,
@Nullable String clientId,
@Nullable String clientSecret,
Expand Down Expand Up @@ -277,6 +298,23 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException {
return partnerConnection.getUserInfo().getOrganizationId();
}

public boolean isRetryRequired() {
return retryOnBackendError == null || retryOnBackendError;
}

public long getInitialRetryDuration() {
return initialRetryDuration == null ? SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS :
initialRetryDuration;
}

public long getMaxRetryDuration() {
return maxRetryDuration == null ? SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
}

public int getMaxRetryCount() {
return maxRetryCount == null ? SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
}

public void validate(Schema schema, FailureCollector collector, @Nullable OAuthInfo oAuthInfo) {
if (connection != null) {
getConnection().validate(collector, oAuthInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

import java.util.ArrayList;
Expand Down Expand Up @@ -103,8 +104,11 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
String sObjectNameField = config.getSObjectNameField();
authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection,
config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(),
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, false, config.getOperation(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired()))
.flatMap(Collection::stream).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
import org.apache.hadoop.mapreduce.RecordReader;
Expand Down Expand Up @@ -165,7 +166,10 @@ public static List<SalesforceSplit> getSplits(
bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK,
String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection,
config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper,
enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired());
return querySplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
Expand Down Expand Up @@ -78,13 +78,15 @@ public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String,
private String batchId;
private String[] resultIds;
private int resultIdIndex;
private BulkConnectionRetryWrapper bulkConnectionRetryWrapper;

public SalesforceBulkRecordReader(Schema schema) {
this(schema, null, null, null);
this(schema, null, null, null, null);
}

@VisibleForTesting
SalesforceBulkRecordReader(Schema schema, String jobId, String batchId, String[] resultIds) {
SalesforceBulkRecordReader(Schema schema, String jobId, String batchId, String[] resultIds,
BulkConnection bulkConnection) {
this.schema = schema;
this.resultIdIndex = 0;
this.jobId = jobId;
Expand All @@ -94,6 +96,8 @@ public SalesforceBulkRecordReader(Schema schema) {
maxRetryDuration = SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS;
maxRetryCount = SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT;
isRetryRequired = true;
bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, isRetryRequired, initialRetryDuration,
maxRetryDuration, maxRetryCount);
}

/**
Expand Down Expand Up @@ -128,6 +132,8 @@ public SalesforceBulkRecordReader initialize(InputSplit inputSplit, Authenticato

try {
bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, isRetryRequired, initialRetryDuration,
maxRetryDuration, maxRetryCount);
resultIds = waitForBatchResults(bulkConnection);
LOG.debug("Batch {} returned {} results", batchId, resultIds.length);
setupParser();
Expand Down Expand Up @@ -205,15 +211,8 @@ void setupParser() throws IOException, AsyncApiException, InterruptedException {
resultIdIndex, resultIds.length));
}
try {
final InputStream queryResponseStream;
if (isRetryRequired) {
queryResponseStream =
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.get(() -> getQueryResultStream(bulkConnection));
} else {
queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
}

final InputStream queryResponseStream = bulkConnectionRetryWrapper
.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
.withQuoteMode(QuoteMode.ALL)
Expand Down
Loading
Loading