Skip to content

Commit 25c7cb0

Browse files
committed
Merging from master
2 parents 500b131 + 60ec426 commit 25c7cb0

19 files changed

+628
-43
lines changed

src/main/java/com/uber/cadence/client/WorkflowStub.java

+5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ static <T> WorkflowStub fromTyped(T typed) {
6565

6666
void signal(String signalName, Object... args);
6767

68+
CompletableFuture<Void> signalAsync(String signalName, Object... args);
69+
70+
CompletableFuture<Void> signalAsyncWithTimeout(
71+
long timeout, TimeUnit unit, String signalName, Object... args);
72+
6873
WorkflowExecution start(Object... args);
6974

7075
CompletableFuture<WorkflowExecution> startAsync(Object... args);

src/main/java/com/uber/cadence/common/RetryOptions.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -92,22 +92,9 @@ public final RetryOptions addDoNotRetry(Class<? extends Throwable>... doNotRetry
9292
return this;
9393
}
9494

95-
double backoffCoefficient = getBackoffCoefficient();
96-
if (backoffCoefficient == 0) {
97-
backoffCoefficient = DEFAULT_BACKOFF_COEFFICIENT;
98-
}
99-
100-
RetryOptions.Builder builder =
101-
new RetryOptions.Builder()
102-
.setInitialInterval(getInitialInterval())
103-
.setExpiration(getExpiration())
104-
.setMaximumInterval(getMaximumInterval())
105-
.setBackoffCoefficient(backoffCoefficient)
106-
.setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry)));
95+
RetryOptions.Builder builder = new RetryOptions.Builder(this);
96+
builder.setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry)));
10797

108-
if (getMaximumAttempts() > 0) {
109-
builder.setMaximumAttempts(getMaximumAttempts());
110-
}
11198
return builder.validateBuildWithDefaults();
11299
}
113100

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java

+6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ CompletableFuture<WorkflowExecution> startWorkflowAsync(
4141

4242
void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);
4343

44+
CompletableFuture<Void> signalWorkflowExecutionAsync(
45+
SignalExternalWorkflowParameters signalParameters);
46+
47+
CompletableFuture<Void> signalWorkflowExecutionAsync(
48+
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis);
49+
4450
WorkflowExecution signalWithStartWorkflowExecution(
4551
SignalWithStartWorkflowExecutionParameters parameters);
4652

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

+50-8
Original file line numberDiff line numberDiff line change
@@ -269,15 +269,8 @@ private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {
269269

270270
@Override
271271
public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters) {
272-
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
273-
request.setDomain(domain);
272+
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
274273

275-
request.setInput(signalParameters.getInput());
276-
request.setSignalName(signalParameters.getSignalName());
277-
WorkflowExecution execution = new WorkflowExecution();
278-
execution.setRunId(signalParameters.getRunId());
279-
execution.setWorkflowId(signalParameters.getWorkflowId());
280-
request.setWorkflowExecution(execution);
281274
try {
282275
Retryer.retry(
283276
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
@@ -287,6 +280,55 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
287280
}
288281
}
289282

283+
@Override
284+
public CompletableFuture<Void> signalWorkflowExecutionAsync(
285+
SignalExternalWorkflowParameters signalParameters) {
286+
return signalWorkflowExecutionAsync(signalParameters, Long.MAX_VALUE);
287+
}
288+
289+
@Override
290+
public CompletableFuture<Void> signalWorkflowExecutionAsync(
291+
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
292+
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
293+
return Retryer.retryWithResultAsync(
294+
getRetryOptionsWithExpiration(
295+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
296+
() -> {
297+
CompletableFuture<Void> result = new CompletableFuture<>();
298+
try {
299+
service.SignalWorkflowExecution(
300+
request,
301+
new AsyncMethodCallback() {
302+
@Override
303+
public void onComplete(Object response) {
304+
result.complete(null);
305+
}
306+
307+
@Override
308+
public void onError(Exception exception) {
309+
result.completeExceptionally(exception);
310+
}
311+
});
312+
} catch (TException e) {
313+
result.completeExceptionally(e);
314+
}
315+
return result;
316+
});
317+
}
318+
319+
private SignalWorkflowExecutionRequest getSignalRequest(
320+
SignalExternalWorkflowParameters signalParameters) {
321+
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
322+
request.setDomain(domain);
323+
request.setInput(signalParameters.getInput());
324+
request.setSignalName(signalParameters.getSignalName());
325+
WorkflowExecution execution = new WorkflowExecution();
326+
execution.setRunId(signalParameters.getRunId());
327+
execution.setWorkflowId(signalParameters.getWorkflowId());
328+
request.setWorkflowExecution(execution);
329+
return request;
330+
}
331+
290332
@Override
291333
public WorkflowExecution signalWithStartWorkflowExecution(
292334
SignalWithStartWorkflowExecutionParameters parameters) {

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class MetricsType {
6767
CADENCE_METRICS_PREFIX + "decision-task-error";
6868
public static final String DECISION_TASK_COMPLETED_COUNTER =
6969
CADENCE_METRICS_PREFIX + "decision-task-completed";
70+
public static final String DECISION_TASK_FORCE_COMPLETED =
71+
CADENCE_METRICS_PREFIX + "decision-task-force-completed";
7072

7173
public static final String ACTIVITY_POLL_COUNTER = CADENCE_METRICS_PREFIX + "activity-poll-total";
7274
public static final String ACTIVITY_POLL_FAILED_COUNTER =
@@ -145,4 +147,7 @@ public class MetricsType {
145147
public static final String STICKY_CACHE_SIZE = CADENCE_METRICS_PREFIX + "sticky-cache-size";
146148
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =
147149
CADENCE_METRICS_PREFIX + "workflow_active_thread_count";
150+
151+
public static final String NON_DETERMINISTIC_ERROR =
152+
CADENCE_METRICS_PREFIX + "non-deterministic-error";
148153
}

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public void accept(Exception reason) {
8888
private final DataConverter dataConverter;
8989
private final Condition taskCondition;
9090
private boolean taskCompleted = false;
91+
private final Map<String, Integer> versionMap = new HashMap<>();
9192

9293
ClockDecisionContext(
9394
DecisionsHelper decisions,
@@ -227,6 +228,8 @@ void handleMarkerRecorded(HistoryEvent event) {
227228
sideEffectResults.put(event.getEventId(), attributes.getDetails());
228229
} else if (LOCAL_ACTIVITY_MARKER_NAME.equals(name)) {
229230
handleLocalActivityMarker(attributes);
231+
} else if (VERSION_MARKER_NAME.equals(name)) {
232+
handleVersionMarker(attributes);
230233
} else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) {
231234
if (log.isWarnEnabled()) {
232235
log.warn("Unexpected marker: " + event);
@@ -276,6 +279,14 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
276279
}
277280
}
278281

282+
private void handleVersionMarker(MarkerRecordedEventAttributes attributes) {
283+
MarkerHandler.MarkerInterface markerData =
284+
MarkerHandler.MarkerInterface.fromEventAttributes(attributes, dataConverter);
285+
String versionID = markerData.getId();
286+
int version = dataConverter.fromData(attributes.getDetails(), Integer.class, Integer.class);
287+
versionMap.put(versionID, version);
288+
}
289+
279290
int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
280291
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
281292
(attributes) -> {
@@ -285,6 +296,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
285296
};
286297
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));
287298

299+
Integer version = versionMap.get(changeId);
300+
if (version != null) {
301+
validateVersion(changeId, version, minSupported, maxSupported);
302+
return version;
303+
}
304+
288305
Optional<byte[]> result =
289306
versionHandler.handle(
290307
changeId,
@@ -299,7 +316,7 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
299316
if (!result.isPresent()) {
300317
return WorkflowInternal.DEFAULT_VERSION;
301318
}
302-
int version = converter.fromData(result.get(), Integer.class, Integer.class);
319+
version = converter.fromData(result.get(), Integer.class, Integer.class);
303320
validateVersion(changeId, version, minSupported, maxSupported);
304321
return version;
305322
}

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@
5757
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
5858
import com.uber.cadence.WorkflowType;
5959
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
60+
import com.uber.cadence.internal.metrics.MetricsTag;
61+
import com.uber.cadence.internal.metrics.MetricsType;
6062
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
63+
import com.uber.cadence.internal.worker.SingleWorkerOptions;
6164
import com.uber.cadence.internal.worker.WorkflowExecutionException;
65+
import com.uber.m3.tally.Scope;
66+
import com.uber.m3.util.ImmutableMap;
6267
import java.util.ArrayList;
6368
import java.util.HashMap;
6469
import java.util.Iterator;
@@ -86,6 +91,7 @@ class DecisionsHelper {
8691
+ "change in the workflow definition.";
8792

8893
private final PollForDecisionTaskResponse task;
94+
private final SingleWorkerOptions options;
8995

9096
/**
9197
* When workflow task completes the decisions are converted to events that follow the decision
@@ -105,8 +111,9 @@ class DecisionsHelper {
105111
// TODO: removal of completed activities
106112
private final Map<String, Long> activityIdToScheduledEventId = new HashMap<>();
107113

108-
DecisionsHelper(PollForDecisionTaskResponse task) {
114+
DecisionsHelper(PollForDecisionTaskResponse task, SingleWorkerOptions options) {
109115
this.task = task;
116+
this.options = options;
110117
}
111118

112119
long getNextDecisionEventId() {
@@ -664,10 +671,10 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) {
664671
// is removed in replay.
665672
void addAllMissingVersionMarker(
666673
boolean isNextDecisionVersionMarker,
667-
Optional<Predicate<MarkerRecordedEventAttributes>> isDifferentChange) {
674+
Optional<Predicate<MarkerRecordedEventAttributes>> changeIdEquals) {
668675
boolean added;
669676
do {
670-
added = addMissingVersionMarker(isNextDecisionVersionMarker, isDifferentChange);
677+
added = addMissingVersionMarker(isNextDecisionVersionMarker, changeIdEquals);
671678
} while (added);
672679
}
673680

@@ -718,6 +725,11 @@ private boolean addMissingVersionMarker(
718725
private DecisionStateMachine getDecision(DecisionId decisionId) {
719726
DecisionStateMachine result = decisions.get(decisionId);
720727
if (result == null) {
728+
Scope metricsScope =
729+
options
730+
.getMetricsScope()
731+
.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, task.getWorkflowType().getName()));
732+
metricsScope.counter(MetricsType.NON_DETERMINISTIC_ERROR).inc(1);
721733
throw new NonDeterminisicWorkflowError(
722734
"Unknown " + decisionId + ". " + NON_DETERMINISTIC_MESSAGE);
723735
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,9 @@ private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.P
454454
// Reset state to before running the event loop
455455
decisionsHelper.handleDecisionTaskStartedEvent(decision);
456456
}
457-
457+
if (forceCreateNewDecisionTask) {
458+
metricsScope.counter(MetricsType.DECISION_TASK_FORCE_COMPLETED).inc(1);
459+
}
458460
return forceCreateNewDecisionTask;
459461
} catch (Error e) {
460462
if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy()
@@ -596,7 +598,7 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis
596598
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
597599
private Duration decisionTaskStartToCloseTimeout;
598600

599-
private final Duration retryServiceOperationExpirationInterval() {
601+
private final Duration decisionTaskRemainingTime() {
600602
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
601603
return decisionTaskStartToCloseTimeout.minus(passed);
602604
}
@@ -640,11 +642,18 @@ public HistoryEvent next() {
640642
return current.next();
641643
}
642644

645+
Duration decisionTaskRemainingTime = decisionTaskRemainingTime();
646+
if (decisionTaskRemainingTime.isNegative() || decisionTaskRemainingTime.isZero()) {
647+
throw new Error(
648+
"Decision task timed out while querying history. If this happens consistently please consider "
649+
+ "increase decision task timeout or reduce history size.");
650+
}
651+
643652
metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1);
644653
Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start();
645654
RetryOptions retryOptions =
646655
new RetryOptions.Builder()
647-
.setExpiration(retryServiceOperationExpirationInterval())
656+
.setExpiration(decisionTaskRemainingTime)
648657
.setInitialInterval(retryServiceOperationInitialInterval)
649658
.setMaximumInterval(retryServiceOperationMaxInterval)
650659
.build();

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ private Decider createDecider(PollForDecisionTaskResponse decisionTask) throws E
276276
decisionTask.setHistory(getHistoryResponse.getHistory());
277277
decisionTask.setNextPageToken(getHistoryResponse.getNextPageToken());
278278
}
279-
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask);
279+
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask, options);
280280
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
281281
return new ReplayDecider(
282282
service, domain, workflowType, workflow, decisionsHelper, options, laTaskPoller);

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

+9
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,15 @@ public void SignalWorkflowExecution(
699699
impl.SignalWorkflowExecution(signalRequest, resultHandler);
700700
}
701701

702+
@Override
703+
public void SignalWorkflowExecutionWithTimeout(
704+
SignalWorkflowExecutionRequest signalRequest,
705+
AsyncMethodCallback resultHandler,
706+
Long timeoutInMillis)
707+
throws TException {
708+
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
709+
}
710+
702711
@Override
703712
public void SignalWithStartWorkflowExecution(
704713
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

+20
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,15 @@ public void SignalWorkflowExecution(
550550
impl.SignalWorkflowExecution(signalRequest, resultHandler);
551551
}
552552

553+
@Override
554+
public void SignalWorkflowExecutionWithTimeout(
555+
SignalWorkflowExecutionRequest signalRequest,
556+
AsyncMethodCallback resultHandler,
557+
Long timeoutInMillis)
558+
throws TException {
559+
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
560+
}
561+
553562
@Override
554563
public void SignalWithStartWorkflowExecution(
555564
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,
@@ -806,6 +815,17 @@ public void signal(String signalName, Object... args) {
806815
next.signal(signalName, args);
807816
}
808817

818+
@Override
819+
public CompletableFuture<Void> signalAsync(String signalName, Object... args) {
820+
return next.signalAsync(signalName, args);
821+
}
822+
823+
@Override
824+
public CompletableFuture<Void> signalAsyncWithTimeout(
825+
long timeout, TimeUnit unit, String signalName, Object... args) {
826+
return next.signalAsyncWithTimeout(timeout, unit, signalName, args);
827+
}
828+
809829
@Override
810830
public WorkflowExecution start(Object... args) {
811831
return next.start(args);

0 commit comments

Comments
 (0)