Skip to content

Commit f5f4a50

Browse files
committed
feat: add sub worflows run on FuncDSL
Signed-off-by: Matheus Andre <matheusandr2@gmail.com> Signed-off-by: Matheus André <matheusandr2@gmail.com>
1 parent 3aadb94 commit f5f4a50

5 files changed

Lines changed: 231 additions & 4 deletions

File tree

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.serverlessworkflow.fluent.func.spi.FuncDoFluent;
2020
import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations;
2121
import io.serverlessworkflow.fluent.spec.BaseDoTaskBuilder;
22+
import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder;
2223
import java.util.function.Consumer;
2324

2425
public class FuncDoTaskBuilder extends BaseDoTaskBuilder<FuncDoTaskBuilder, FuncTaskItemListBuilder>
@@ -96,4 +97,22 @@ public FuncDoTaskBuilder openapi(
9697
this.listBuilder().openapi(name, itemsConfigurer);
9798
return this;
9899
}
100+
101+
@Override
102+
public FuncDoTaskBuilder workflow(String name, Consumer<WorkflowTaskBuilder> itemsConfigurer) {
103+
this.listBuilder().workflow(name, itemsConfigurer);
104+
return this;
105+
}
106+
107+
@Override
108+
public FuncDoTaskBuilder subflow(String name, Consumer<WorkflowTaskBuilder> itemsConfigurer) {
109+
this.listBuilder().subflow(name, itemsConfigurer);
110+
return this;
111+
}
112+
113+
@Override
114+
public FuncDoTaskBuilder subflow(Consumer<WorkflowTaskBuilder> itemsConfigurer) {
115+
this.listBuilder().subflow(itemsConfigurer);
116+
return this;
117+
}
99118
}

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import io.serverlessworkflow.api.types.TaskItem;
2323
import io.serverlessworkflow.fluent.func.spi.FuncDoFluent;
2424
import io.serverlessworkflow.fluent.spec.BaseTaskItemListBuilder;
25+
import io.serverlessworkflow.fluent.spec.TaskItemListBuilder;
26+
import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder;
2527
import java.util.List;
28+
import java.util.UUID;
2629
import java.util.function.Consumer;
2730

2831
public class FuncTaskItemListBuilder extends BaseTaskItemListBuilder<FuncTaskItemListBuilder>
@@ -154,4 +157,38 @@ public FuncTaskItemListBuilder openapi(
154157

155158
return this.addTaskItem(new TaskItem(name, task));
156159
}
160+
161+
@Override
162+
public FuncTaskItemListBuilder workflow(
163+
String name, Consumer<WorkflowTaskBuilder> itemsConfigurer) {
164+
return this.addDelegatedWorkflow(name, itemsConfigurer);
165+
}
166+
167+
@Override
168+
public FuncTaskItemListBuilder workflow(Consumer<WorkflowTaskBuilder> itemsConfigurer) {
169+
return this.addDelegatedWorkflow(UUID.randomUUID().toString(), itemsConfigurer);
170+
}
171+
172+
@Override
173+
public FuncTaskItemListBuilder subflow(
174+
String name, Consumer<WorkflowTaskBuilder> itemsConfigurer) {
175+
return this.addDelegatedWorkflow(name, itemsConfigurer);
176+
}
177+
178+
@Override
179+
public FuncTaskItemListBuilder subflow(Consumer<WorkflowTaskBuilder> itemsConfigurer) {
180+
return this.workflow(itemsConfigurer);
181+
}
182+
183+
private FuncTaskItemListBuilder addDelegatedWorkflow(
184+
String name, Consumer<WorkflowTaskBuilder> itemsConfigurer) {
185+
final TaskItemListBuilder delegate = new TaskItemListBuilder(this.mutableList().size());
186+
delegate.workflow(name, itemsConfigurer);
187+
final List<TaskItem> taskItems = delegate.build();
188+
if (taskItems.size() != 1) {
189+
throw new IllegalStateException(
190+
"Expected workflow delegate to build exactly 1 TaskItem, but got " + taskItems.size());
191+
}
192+
return addTaskItem(taskItems.get(0));
193+
}
157194
}

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,12 @@
3838
import io.serverlessworkflow.fluent.spec.EventFilterBuilder;
3939
import io.serverlessworkflow.fluent.spec.ScheduleBuilder;
4040
import io.serverlessworkflow.fluent.spec.TimeoutBuilder;
41+
import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder;
4142
import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer;
43+
import io.serverlessworkflow.fluent.spec.configurers.WorkflowConfigurer;
4244
import io.serverlessworkflow.fluent.spec.dsl.DSL;
4345
import io.serverlessworkflow.fluent.spec.dsl.UseSpec;
46+
import io.serverlessworkflow.fluent.spec.dsl.WorkflowSpec;
4447
import io.serverlessworkflow.impl.TaskContextData;
4548
import io.serverlessworkflow.impl.WorkflowContextData;
4649
import java.net.URI;
@@ -927,14 +930,19 @@ public static FuncTaskConfigurer switchWhen(String jqExpression, String thenTask
927930
* @return list configurer
928931
*/
929932
public static <T> FuncTaskConfigurer switchWhenOrElse(
930-
Predicate<T> pred, String thenTask, FlowDirectiveEnum otherwise, Class<T> predClass) {
933+
Predicate<T> pred,
934+
String thenTask,
935+
io.serverlessworkflow.api.types.FlowDirectiveEnum otherwise,
936+
Class<T> predClass) {
931937
return list ->
932938
list.switchCase(
933939
FuncDSL.cases(caseOf(pred, predClass).then(thenTask), caseDefault(otherwise)));
934940
}
935941

936942
public static <T> FuncTaskConfigurer switchWhenOrElse(
937-
SerializablePredicate<T> pred, String thenTask, FlowDirectiveEnum otherwise) {
943+
SerializablePredicate<T> pred,
944+
String thenTask,
945+
io.serverlessworkflow.api.types.FlowDirectiveEnum otherwise) {
938946
return switchWhenOrElse(pred, thenTask, otherwise, ReflectionUtils.inferInputType(pred));
939947
}
940948

@@ -975,7 +983,9 @@ public static <T> FuncTaskConfigurer switchWhenOrElse(
975983
* @return list configurer
976984
*/
977985
public static FuncTaskConfigurer switchWhenOrElse(
978-
String jqExpression, String thenTask, FlowDirectiveEnum otherwise) {
986+
String jqExpression,
987+
String thenTask,
988+
io.serverlessworkflow.api.types.FlowDirectiveEnum otherwise) {
979989

980990
Objects.requireNonNull(jqExpression, "jqExpression");
981991
Objects.requireNonNull(thenTask, "thenTask");
@@ -1072,6 +1082,107 @@ public static FuncTaskConfigurer set(Map<String, Object> map) {
10721082
return list -> list.set(s -> s.expr(map));
10731083
}
10741084

1085+
/**
1086+
* Create a {@link FuncTaskConfigurer} that adds a sub-workflow call task using a {@link
1087+
* WorkflowConfigurer}.
1088+
*
1089+
* <pre>{@code
1090+
* tasks(
1091+
* subflow(
1092+
* workflow("org.acme", "sub-workflow", "0.1.0")
1093+
* .input("id", 99)
1094+
* .await(false)
1095+
* )
1096+
* );
1097+
* }</pre>
1098+
*
1099+
* @param configurer nested workflow configurer
1100+
* @return a {@link FuncTaskConfigurer} that adds a workflow task to the tasks list
1101+
*/
1102+
public static FuncTaskConfigurer subflow(WorkflowConfigurer configurer) {
1103+
Objects.requireNonNull(configurer, "configurer");
1104+
return list -> list.subflow(configurer);
1105+
}
1106+
1107+
/**
1108+
* Create a {@link FuncTaskConfigurer} that adds a named sub-workflow call task.
1109+
*
1110+
* @param name task name
1111+
* @param configurer nested workflow configurer
1112+
* @return a {@link FuncTaskConfigurer} that adds a workflow task to the tasks list
1113+
*/
1114+
public static FuncTaskConfigurer subflow(String name, Consumer<WorkflowTaskBuilder> configurer) {
1115+
Objects.requireNonNull(name, "name");
1116+
Objects.requireNonNull(configurer, "configurer");
1117+
return list -> list.subflow(name, configurer);
1118+
}
1119+
1120+
/**
1121+
* Create a {@link FuncTaskConfigurer} that adds an unnamed sub-workflow call task.
1122+
*
1123+
* @param configurer nested workflow configurer
1124+
* @return a {@link FuncTaskConfigurer} that adds a workflow task to the tasks list
1125+
*/
1126+
public static FuncTaskConfigurer subflow(Consumer<WorkflowTaskBuilder> configurer) {
1127+
Objects.requireNonNull(configurer, "configurer");
1128+
return list -> list.subflow(configurer);
1129+
}
1130+
1131+
/**
1132+
* Alias for {@link #subflow(WorkflowConfigurer)}.
1133+
*
1134+
* @param configurer nested workflow configurer
1135+
* @return a {@link FuncTaskConfigurer} that adds a workflow task to the tasks list
1136+
*/
1137+
public static FuncTaskConfigurer workflowTask(WorkflowConfigurer configurer) {
1138+
return subflow(configurer);
1139+
}
1140+
1141+
/**
1142+
* Create a {@link FuncTaskConfigurer} that adds a workflow subflow task.
1143+
*
1144+
* @param configurer configurer for the nested workflow task
1145+
* @return a {@link FuncTaskConfigurer} that adds a workflow task to the tasks list
1146+
* @deprecated use {@link #subflow(WorkflowConfigurer)} to avoid ambiguity with spec-side factory
1147+
* methods
1148+
*/
1149+
@Deprecated
1150+
public static FuncTaskConfigurer workflow(WorkflowConfigurer configurer) {
1151+
return subflow(configurer);
1152+
}
1153+
1154+
/**
1155+
* Create a new {@link WorkflowSpec} to be used as a factory for workflow definitions.
1156+
*
1157+
* @param namespace workflow namespace
1158+
* @param name workflow name
1159+
* @param version workflow version
1160+
* @return a new {@link WorkflowSpec} instance
1161+
*/
1162+
public static WorkflowSpec workflow(String namespace, String name, String version) {
1163+
return DSL.workflow(namespace, name, version);
1164+
}
1165+
1166+
/**
1167+
* Create a new {@link WorkflowSpec} to be used as a factory for workflow definitions.
1168+
*
1169+
* @param namespace workflow namespace
1170+
* @param name workflow name
1171+
* @return a new {@link WorkflowSpec} instance
1172+
*/
1173+
public static WorkflowSpec workflow(String namespace, String name) {
1174+
return DSL.workflow(namespace, name);
1175+
}
1176+
1177+
/**
1178+
* Create a new {@link WorkflowSpec} to be used as a factory for workflow definitions.
1179+
*
1180+
* @return a new {@link WorkflowSpec} instance
1181+
*/
1182+
public static WorkflowSpec workflow() {
1183+
return DSL.workflow();
1184+
}
1185+
10751186
// ---------------------------------------------------------------------------
10761187
// HTTP / OpenAPI
10771188
// ---------------------------------------------------------------------------

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder;
2525
import io.serverlessworkflow.fluent.func.FuncSetTaskBuilder;
2626
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
27+
import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder;
2728
import io.serverlessworkflow.fluent.spec.spi.CallHttpFluent;
2829
import io.serverlessworkflow.fluent.spec.spi.CallOpenAPIFluent;
2930
import io.serverlessworkflow.fluent.spec.spi.EmitFluent;
@@ -32,6 +33,8 @@
3233
import io.serverlessworkflow.fluent.spec.spi.ListenFluent;
3334
import io.serverlessworkflow.fluent.spec.spi.SetFluent;
3435
import io.serverlessworkflow.fluent.spec.spi.SwitchFluent;
36+
import io.serverlessworkflow.fluent.spec.spi.WorkflowFluent;
37+
import java.util.function.Consumer;
3538

3639
public interface FuncDoFluent<SELF extends FuncDoFluent<SELF>>
3740
extends SetFluent<FuncSetTaskBuilder, SELF>,
@@ -42,4 +45,14 @@ public interface FuncDoFluent<SELF extends FuncDoFluent<SELF>>
4245
ListenFluent<FuncListenTaskBuilder, SELF>,
4346
CallFnFluent<FuncCallTaskBuilder, SELF>,
4447
CallHttpFluent<FuncCallHttpTaskBuilder, SELF>,
45-
CallOpenAPIFluent<FuncCallOpenAPITaskBuilder, SELF> {}
48+
CallOpenAPIFluent<FuncCallOpenAPITaskBuilder, SELF>,
49+
WorkflowFluent<WorkflowTaskBuilder, SELF> {
50+
51+
default SELF subflow(String name, Consumer<WorkflowTaskBuilder> itemsConfigurer) {
52+
return this.workflow(name, itemsConfigurer);
53+
}
54+
55+
default SELF subflow(Consumer<WorkflowTaskBuilder> itemsConfigurer) {
56+
return this.workflow(itemsConfigurer);
57+
}
58+
}

experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http;
2424
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen;
2525
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.produced;
26+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.subflow;
2627
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse;
2728
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne;
29+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.workflow;
2830
import static io.serverlessworkflow.fluent.spec.dsl.DSL.use;
2931
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertFalse;
3033
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
3134
import static org.junit.jupiter.api.Assertions.assertNotNull;
3235
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -174,6 +177,50 @@ void mixed_chaining_order_and_exports() {
174177
assertNotNull(t2.getListenTask().getExport(), "listen step should carry export");
175178
}
176179

180+
@Test
181+
void subflow_task_builds_from_func_dsl() {
182+
Workflow wf =
183+
FuncWorkflowBuilder.workflow("step-subflow")
184+
.tasks(
185+
subflow(workflow("child.ns", "child-flow", "2.3.4").input("id", 99).await(false)))
186+
.build();
187+
188+
List<TaskItem> items = wf.getDo();
189+
assertEquals(1, items.size());
190+
191+
Task t = items.get(0).getTask();
192+
assertNotNull(t.getRunTask(), "RunTask expected");
193+
var run = t.getRunTask().getRun().getRunWorkflow();
194+
assertNotNull(run, "RunWorkflow should be present");
195+
assertEquals("child.ns", run.getWorkflow().getNamespace());
196+
assertEquals("child-flow", run.getWorkflow().getName());
197+
assertEquals("2.3.4", run.getWorkflow().getVersion());
198+
assertEquals(99, run.getWorkflow().getInput().getAdditionalProperties().get("id"));
199+
assertFalse(run.isAwait(), "await(false) should be preserved as false");
200+
}
201+
202+
@Test
203+
void subflow_builder_style_builds_correctly() {
204+
Workflow wf =
205+
FuncWorkflowBuilder.workflow("subflow-builder")
206+
.tasks(
207+
subflow(
208+
"my-subflow",
209+
sub ->
210+
sub.namespace("child.ns")
211+
.name("child-flow")
212+
.version("1.0.0")
213+
.input(Map.of("key", "val"))))
214+
.build();
215+
216+
Task t = wf.getDo().get(0).getTask();
217+
assertNotNull(t.getRunTask());
218+
var run = t.getRunTask().getRun().getRunWorkflow();
219+
assertEquals("my-subflow", wf.getDo().get(0).getName());
220+
assertEquals("child-flow", run.getWorkflow().getName());
221+
assertEquals("val", run.getWorkflow().getInput().getAdditionalProperties().get("key"));
222+
}
223+
177224
@Test
178225
void switchWhenOrElse_jq_to_taskName() {
179226
Workflow wf =

0 commit comments

Comments
 (0)