Skip to content

Commit 86189b2

Browse files
authored
Add abitlity to do async completion without having to specify task token (temporalio#329)
1 parent 61396a9 commit 86189b2

11 files changed

+93
-26
lines changed

temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package io.temporal.activity;
2121

2222
import com.uber.m3.tally.Scope;
23-
import io.temporal.client.ActivityCompletionClient;
2423
import io.temporal.client.ActivityCompletionException;
2524
import io.temporal.worker.WorkerOptions;
2625
import java.lang.reflect.Type;
@@ -110,10 +109,10 @@ public interface ActivityExecutionContext {
110109
* #doNotCompleteOnReturn()} directly is that by using this method maximum number of concurrent
111110
* activities defined by {@link WorkerOptions#getMaxConcurrentActivityExecutionSize()} will be
112111
* respected. Users must be careful and always call completion method on the {@link
113-
* ActivityCompletionClient} otherwise activity worker could stop polling new work as it will
114-
* consider all activities that didn't explicitly finish as still running.
112+
* ManualActivityCompletionClient} otherwise activity worker could stop polling new work as it
113+
* will consider all activities that didn't explicitly finish as still running.
115114
*/
116-
ActivityCompletionClient useLocalManualCompletion();
115+
ManualActivityCompletionClient useLocalManualCompletion();
117116

118117
Scope getMetricsScope();
119118
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* permissions and limitations under the License.
1818
*/
1919

20-
package io.temporal.internal.external;
20+
package io.temporal.activity;
2121

2222
import io.temporal.failure.CanceledFailure;
2323

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.uber.m3.tally.Scope;
2323
import io.temporal.activity.ActivityExecutionContext;
2424
import io.temporal.activity.ActivityInfo;
25-
import io.temporal.client.ActivityCompletionClient;
25+
import io.temporal.activity.ManualActivityCompletionClient;
2626
import io.temporal.client.ActivityCompletionException;
2727
import java.lang.reflect.Type;
2828
import java.util.Optional;
@@ -76,7 +76,7 @@ public boolean isUseLocalManualCompletion() {
7676
}
7777

7878
@Override
79-
public ActivityCompletionClient useLocalManualCompletion() {
79+
public ManualActivityCompletionClient useLocalManualCompletion() {
8080
return next.useLocalManualCompletion();
8181
}
8282

temporal-sdk/src/main/java/io/temporal/internal/external/ManualActivityCompletionClientFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package io.temporal.internal.external;
2121

22+
import io.temporal.activity.ManualActivityCompletionClient;
2223
import io.temporal.api.common.v1.WorkflowExecution;
2324

2425
public interface ManualActivityCompletionClientFactory {

temporal-sdk/src/main/java/io/temporal/internal/external/ManualActivityCompletionClientFactoryImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.uber.m3.tally.Scope;
2323
import com.uber.m3.util.ImmutableMap;
24+
import io.temporal.activity.ManualActivityCompletionClient;
2425
import io.temporal.api.common.v1.WorkflowExecution;
2526
import io.temporal.common.converter.DataConverter;
2627
import io.temporal.serviceclient.MetricsTag;

temporal-sdk/src/main/java/io/temporal/internal/external/ManualActivityCompletionClientImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.m3.tally.Scope;
2626
import io.grpc.Status;
2727
import io.grpc.StatusRuntimeException;
28+
import io.temporal.activity.ManualActivityCompletionClient;
2829
import io.temporal.api.common.v1.Payloads;
2930
import io.temporal.api.common.v1.WorkflowExecution;
3031
import io.temporal.api.workflowservice.v1.*;

temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityExecutionContextImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import io.grpc.StatusRuntimeException;
2727
import io.temporal.activity.ActivityExecutionContext;
2828
import io.temporal.activity.ActivityInfo;
29+
import io.temporal.activity.ManualActivityCompletionClient;
2930
import io.temporal.api.common.v1.Payloads;
3031
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
3132
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
3233
import io.temporal.client.ActivityCanceledException;
33-
import io.temporal.client.ActivityCompletionClient;
3434
import io.temporal.client.ActivityCompletionException;
3535
import io.temporal.client.ActivityCompletionFailureException;
3636
import io.temporal.client.ActivityNotExistsException;
@@ -260,12 +260,13 @@ public boolean isUseLocalManualCompletion() {
260260
}
261261

262262
@Override
263-
public ActivityCompletionClient useLocalManualCompletion() {
263+
public ManualActivityCompletionClient useLocalManualCompletion() {
264264
lock.lock();
265265
try {
266266
doNotCompleteOnReturn();
267267
useLocalManualCompletion = true;
268-
return new ActivityCompletionClientImpl(manualCompletionClientFactory, completionHandle);
268+
return new CompletionAwareManualCompletionClient(
269+
manualCompletionClientFactory.getClient(info.getTaskToken()), completionHandle);
269270
} finally {
270271
lock.unlock();
271272
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.internal.sync;
21+
22+
import io.temporal.activity.ManualActivityCompletionClient;
23+
import io.temporal.failure.CanceledFailure;
24+
import io.temporal.workflow.Functions;
25+
26+
public final class CompletionAwareManualCompletionClient implements ManualActivityCompletionClient {
27+
private final ManualActivityCompletionClient client;
28+
private final Functions.Proc completionHandle;
29+
30+
public CompletionAwareManualCompletionClient(
31+
ManualActivityCompletionClient client, Functions.Proc completionHandle) {
32+
this.client = client;
33+
this.completionHandle = completionHandle;
34+
}
35+
36+
@Override
37+
public void complete(Object result) {
38+
try {
39+
client.complete(result);
40+
} finally {
41+
completionHandle.apply();
42+
}
43+
}
44+
45+
@Override
46+
public void fail(Throwable failure) {
47+
try {
48+
client.fail(failure);
49+
} finally {
50+
completionHandle.apply();
51+
}
52+
}
53+
54+
@Override
55+
public void recordHeartbeat(Object details) throws CanceledFailure {
56+
client.recordHeartbeat(details);
57+
}
58+
59+
@Override
60+
public void reportCancellation(Object details) {
61+
try {
62+
client.reportCancellation(details);
63+
} finally {
64+
completionHandle.apply();
65+
}
66+
}
67+
}

temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityExecutionContextImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.uber.m3.tally.Scope;
2323
import io.temporal.activity.ActivityExecutionContext;
2424
import io.temporal.activity.ActivityInfo;
25-
import io.temporal.client.ActivityCompletionClient;
25+
import io.temporal.activity.ManualActivityCompletionClient;
2626
import io.temporal.client.ActivityCompletionException;
2727
import java.lang.reflect.Type;
2828
import java.util.Optional;
@@ -80,7 +80,7 @@ public boolean isUseLocalManualCompletion() {
8080
}
8181

8282
@Override
83-
public ActivityCompletionClient useLocalManualCompletion() {
83+
public ManualActivityCompletionClient useLocalManualCompletion() {
8484
throw new UnsupportedOperationException(
8585
"getManualCompletionClient is not supported for local activities");
8686
}

temporal-sdk/src/test/java/io/temporal/workflow/AsyncActivityCompleteWithErrorTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package io.temporal.workflow;
2121

2222
import io.temporal.activity.*;
23-
import io.temporal.client.ActivityCompletionClient;
23+
import io.temporal.activity.ManualActivityCompletionClient;
2424
import io.temporal.client.WorkflowOptions;
2525
import io.temporal.common.RetryOptions;
2626
import io.temporal.failure.ApplicationFailure;
@@ -84,15 +84,13 @@ public static class AsyncActivityWithManualCompletion implements TestActivity {
8484
@Override
8585
public int execute() {
8686
ActivityExecutionContext context = Activity.getExecutionContext();
87-
ActivityCompletionClient completionClient = context.useLocalManualCompletion();
88-
ForkJoinPool.commonPool().execute(() -> asyncActivityFn(context, completionClient));
87+
ManualActivityCompletionClient completionClient = context.useLocalManualCompletion();
88+
ForkJoinPool.commonPool().execute(() -> asyncActivityFn(completionClient));
8989
return 0;
9090
}
9191

92-
private void asyncActivityFn(
93-
ActivityExecutionContext context, ActivityCompletionClient completionClient) {
94-
completionClient.completeExceptionally(
95-
context.getTaskToken(),
92+
private void asyncActivityFn(ManualActivityCompletionClient completionClient) {
93+
completionClient.fail(
9694
ApplicationFailure.newFailure("simulated failure", "test", "some details"));
9795
}
9896
}

0 commit comments

Comments
 (0)