Skip to content

Commit 8fa0445

Browse files
Add publishToInternalChannel API (#306)
1 parent 7fed832 commit 8fa0445

File tree

6 files changed

+198
-45
lines changed

6 files changed

+198
-45
lines changed

iwf-idl

src/main/java/io/iworkflow/core/Client.java

Lines changed: 90 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,7 @@
55
import io.iworkflow.core.exceptions.WorkflowAlreadyStartedException;
66
import io.iworkflow.core.exceptions.WorkflowNotExistsException;
77
import io.iworkflow.core.persistence.PersistenceOptions;
8-
import io.iworkflow.gen.models.ErrorSubStatus;
9-
import io.iworkflow.gen.models.KeyValue;
10-
import io.iworkflow.gen.models.SearchAttribute;
11-
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
12-
import io.iworkflow.gen.models.SearchAttributeValueType;
13-
import io.iworkflow.gen.models.StateCompletionOutput;
14-
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
15-
import io.iworkflow.gen.models.WorkflowGetResponse;
16-
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
17-
import io.iworkflow.gen.models.WorkflowSearchRequest;
18-
import io.iworkflow.gen.models.WorkflowSearchResponse;
8+
import io.iworkflow.gen.models.*;
199
import io.iworkflow.gen.models.WorkflowStateOptions;
2010
import net.bytebuddy.ByteBuddy;
2111
import net.bytebuddy.implementation.MethodDelegation;
@@ -25,11 +15,7 @@
2515

2616
import java.lang.reflect.Constructor;
2717
import java.lang.reflect.InvocationTargetException;
28-
import java.util.ArrayList;
29-
import java.util.HashMap;
30-
import java.util.List;
31-
import java.util.Map;
32-
import java.util.Optional;
18+
import java.util.*;
3319
import java.util.stream.Collectors;
3420

3521
import static io.iworkflow.core.WorkflowState.shouldSkipWaitUntil;
@@ -505,6 +491,94 @@ public void signalWorkflow(
505491
signalWorkflow(workflowClass, workflowId, "", signalChannelName, signalValue);
506492
}
507493

494+
/**
495+
* Send a single empty message to internalChannel
496+
*
497+
* @param workflowClass required
498+
* @param workflowId required
499+
* @param internalChannelName required
500+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
501+
*/
502+
public void publishToInternalChannel(
503+
final Class<? extends ObjectWorkflow> workflowClass,
504+
final String workflowId,
505+
final String internalChannelName) {
506+
publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, null);
507+
}
508+
509+
/**
510+
* Send a single message to internalChannel
511+
*
512+
* @param workflowClass required
513+
* @param workflowId required
514+
* @param internalChannelName required
515+
* @param channelMessage optional, can be null.
516+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
517+
*/
518+
public void publishToInternalChannel(
519+
final Class<? extends ObjectWorkflow> workflowClass,
520+
final String workflowId,
521+
final String internalChannelName,
522+
final Object channelMessage) {
523+
publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, channelMessage);
524+
}
525+
526+
/**
527+
* Send a single message to internalChannel
528+
*
529+
* @param workflowClass required
530+
* @param workflowId required
531+
* @param workflowRunId optional, can be empty
532+
* @param internalChannelName required
533+
* @param channelMessage optional, can be null.
534+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
535+
*/
536+
public void publishToInternalChannel(
537+
final Class<? extends ObjectWorkflow> workflowClass,
538+
final String workflowId,
539+
final String workflowRunId,
540+
final String internalChannelName,
541+
final Object channelMessage) {
542+
publishToInternalChannelBatch(workflowClass, workflowId, workflowRunId, internalChannelName, Arrays.asList(channelMessage));
543+
}
544+
545+
/**
546+
* Send a batch of messages to internalChannel
547+
*
548+
* @param workflowClass required
549+
* @param workflowId required
550+
* @param workflowRunId optional, can be empty
551+
* @param internalChannelName required
552+
* @param channelMessages messages in batch
553+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
554+
*/
555+
public void publishToInternalChannelBatch(
556+
final Class<? extends ObjectWorkflow> workflowClass,
557+
final String workflowId,
558+
final String workflowRunId,
559+
final String internalChannelName,
560+
final List<Object> channelMessages) {
561+
final String wfType = workflowClass.getSimpleName();
562+
563+
checkWorkflowTypeExists(wfType);
564+
565+
final Class<?> channelValueType = registry.getInternalChannelTypeStore(wfType).getType(internalChannelName);
566+
567+
List<InterStateChannelPublishing> rawMessages = new ArrayList<>(channelMessages.size());
568+
for (Object channelValue : channelMessages) {
569+
if (channelValue != null && !channelValueType.isInstance(channelValue)) {
570+
throw new IllegalArgumentException(String.format("message value is not of channel type %s", channelValueType.getName()));
571+
}
572+
rawMessages.add(
573+
new InterStateChannelPublishing()
574+
.channelName(internalChannelName)
575+
.value(clientOptions.getObjectEncoder().encode(channelValue))
576+
);
577+
}
578+
579+
unregisteredClient.publishToInternalChannel(workflowId, workflowRunId, rawMessages);
580+
}
581+
508582
/**
509583
* @param workflowId required
510584
* @param workflowRunId optional, can be empty

src/main/java/io/iworkflow/core/UnregisteredClient.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,7 @@
88
import io.iworkflow.core.validator.CronScheduleValidator;
99
import io.iworkflow.gen.api.ApiClient;
1010
import io.iworkflow.gen.api.DefaultApi;
11-
import io.iworkflow.gen.models.EncodedObject;
12-
import io.iworkflow.gen.models.KeyValue;
13-
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
14-
import io.iworkflow.gen.models.SearchAttribute;
15-
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
16-
import io.iworkflow.gen.models.StateCompletionOutput;
17-
import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest;
18-
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
19-
import io.iworkflow.gen.models.WorkflowGetRequest;
20-
import io.iworkflow.gen.models.WorkflowGetResponse;
21-
import io.iworkflow.gen.models.WorkflowGetSearchAttributesRequest;
22-
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
23-
import io.iworkflow.gen.models.WorkflowResetRequest;
24-
import io.iworkflow.gen.models.WorkflowResetResponse;
25-
import io.iworkflow.gen.models.WorkflowRpcRequest;
26-
import io.iworkflow.gen.models.WorkflowRpcResponse;
27-
import io.iworkflow.gen.models.WorkflowSearchRequest;
28-
import io.iworkflow.gen.models.WorkflowSearchResponse;
29-
import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest;
30-
import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest;
31-
import io.iworkflow.gen.models.WorkflowSignalRequest;
32-
import io.iworkflow.gen.models.WorkflowSkipTimerRequest;
33-
import io.iworkflow.gen.models.WorkflowStartOptions;
34-
import io.iworkflow.gen.models.WorkflowStartRequest;
35-
import io.iworkflow.gen.models.WorkflowStartResponse;
36-
import io.iworkflow.gen.models.WorkflowStatus;
37-
import io.iworkflow.gen.models.WorkflowStopRequest;
38-
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;
11+
import io.iworkflow.gen.models.*;
3912

4013
import java.util.List;
4114
import java.util.Map;
@@ -449,6 +422,23 @@ public void signalWorkflow(
449422
}
450423
}
451424

425+
public void publishToInternalChannel(
426+
final String workflowId,
427+
final String workflowRunId,
428+
final List<InterStateChannelPublishing> messages){
429+
430+
try {
431+
defaultApi.apiV1WorkflowPublishToInternalChannelPost(
432+
new PublishToInternalChannelRequest()
433+
.messages(messages)
434+
.workflowId(workflowId)
435+
.workflowRunId(workflowRunId)
436+
);
437+
} catch (FeignException.FeignClientException exp) {
438+
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
439+
}
440+
}
441+
452442
/**
453443
* @param workflowId workflowId
454444
* @param workflowRunId workflowRunId

src/test/java/io/iworkflow/integ/InternalChannelTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import io.iworkflow.core.Client;
44
import io.iworkflow.core.ClientOptions;
55
import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow;
6+
import io.iworkflow.integ.internalchannel.WaitingInternalChannelWorkflow;
67
import io.iworkflow.spring.TestSingletonWorkerService;
78
import io.iworkflow.spring.controller.WorkflowRegistry;
89
import org.junit.jupiter.api.Assertions;
910
import org.junit.jupiter.api.BeforeEach;
1011
import org.junit.jupiter.api.Test;
1112

13+
import java.util.Arrays;
1214
import java.util.concurrent.ExecutionException;
1315

1416
public class InternalChannelTest {
@@ -28,4 +30,16 @@ public void testBasicInternalWorkflow() throws InterruptedException {
2830
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
2931
Assertions.assertEquals(3, output);
3032
}
33+
34+
@Test
35+
public void testWaitingInternalWorkflow() throws InterruptedException {
36+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
37+
final String wfId = "waiting-internal-test-id" + System.currentTimeMillis() / 1000;
38+
final Integer input = 1;
39+
final String runId = client.startWorkflow(
40+
WaitingInternalChannelWorkflow.class, wfId, 10, input);
41+
client.publishToInternalChannelBatch(WaitingInternalChannelWorkflow.class, wfId, runId, WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME, Arrays.asList(2, 3));
42+
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
43+
Assertions.assertEquals(6, output);
44+
}
3145
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.iworkflow.integ.internalchannel;
2+
3+
import io.iworkflow.core.ObjectWorkflow;
4+
import io.iworkflow.core.StateDef;
5+
import io.iworkflow.core.communication.CommunicationMethodDef;
6+
import io.iworkflow.core.communication.InternalChannelDef;
7+
import org.springframework.stereotype.Component;
8+
9+
import java.util.Arrays;
10+
import java.util.List;
11+
12+
@Component
13+
public class WaitingInternalChannelWorkflow implements ObjectWorkflow {
14+
public static final String INTER_STATE_CHANNEL_NAME = "test-inter-state-channel-1";
15+
16+
@Override
17+
public List<CommunicationMethodDef> getCommunicationSchema() {
18+
return Arrays.asList(
19+
InternalChannelDef.create(Integer.class, INTER_STATE_CHANNEL_NAME)
20+
);
21+
}
22+
23+
@Override
24+
public List<StateDef> getWorkflowStates() {
25+
return Arrays.asList(
26+
StateDef.startingState(new WaitingInternalChannelWorkflowState())
27+
);
28+
}
29+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.iworkflow.integ.internalchannel;
2+
3+
import io.iworkflow.core.Context;
4+
import io.iworkflow.core.StateDecision;
5+
import io.iworkflow.core.WorkflowState;
6+
import io.iworkflow.core.command.CommandRequest;
7+
import io.iworkflow.core.command.CommandResults;
8+
import io.iworkflow.core.communication.Communication;
9+
import io.iworkflow.core.communication.InternalChannelCommand;
10+
import io.iworkflow.core.communication.InternalChannelCommandResult;
11+
import io.iworkflow.core.persistence.Persistence;
12+
13+
public class WaitingInternalChannelWorkflowState implements WorkflowState<Integer> {
14+
15+
@Override
16+
public Class<Integer> getInputType() {
17+
return Integer.class;
18+
}
19+
20+
@Override
21+
public CommandRequest waitUntil(
22+
Context context,
23+
Integer input,
24+
Persistence persistence,
25+
final Communication communication) {
26+
return CommandRequest.forAllCommandCompleted(
27+
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME),
28+
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME)
29+
);
30+
}
31+
32+
@Override
33+
public StateDecision execute(
34+
Context context,
35+
Integer input,
36+
CommandResults commandResults,
37+
Persistence persistence,
38+
final Communication communication) {
39+
final InternalChannelCommandResult result1 = commandResults.getAllInternalChannelCommandResult().get(0);
40+
final InternalChannelCommandResult result2 = commandResults.getAllInternalChannelCommandResult().get(1);
41+
42+
Integer output = input + (Integer) result1.getValue().get() + (Integer) result2.getValue().get();
43+
44+
return StateDecision.gracefulCompleteWorkflow(output);
45+
}
46+
}

0 commit comments

Comments
 (0)