Skip to content

Commit 0cc396a

Browse files
authored
Merge pull request #483 from fjtirado/Fix_#474
[Fix_#474] For task implementation
2 parents 9087cb9 + 399ad9d commit 0cc396a

26 files changed

+332
-119
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.Map;
19+
20+
public interface ContextAware {
21+
Map<String, Object> variables();
22+
}

impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,19 @@
1717

1818
public class DefaultWorkflowPosition implements WorkflowPosition {
1919

20-
private StringBuilder sb = new StringBuilder("");
20+
private StringBuilder sb;
21+
22+
DefaultWorkflowPosition() {
23+
this.sb = new StringBuilder("");
24+
}
25+
26+
private DefaultWorkflowPosition(WorkflowPosition position) {
27+
this.sb = new StringBuilder(position.toString());
28+
}
29+
30+
public DefaultWorkflowPosition copy() {
31+
return new DefaultWorkflowPosition(this);
32+
}
2133

2234
@Override
2335
public WorkflowPosition addIndex(int index) {
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
class DefaultWorkflowPositionFactory implements WorkflowPositionFactory {
19+
20+
private static WorkflowPositionFactory instance = new DefaultWorkflowPositionFactory();
21+
22+
public static WorkflowPositionFactory get() {
23+
return instance;
24+
}
25+
26+
private DefaultWorkflowPositionFactory() {}
27+
28+
@Override
29+
public WorkflowPosition buildPosition() {
30+
return new DefaultWorkflowPosition();
31+
}
32+
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,48 @@
1919
import io.serverlessworkflow.api.types.FlowDirective;
2020
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
2121
import io.serverlessworkflow.api.types.TaskBase;
22+
import java.util.HashMap;
23+
import java.util.Map;
2224

23-
public class TaskContext<T extends TaskBase> {
25+
public class TaskContext<T extends TaskBase> implements ContextAware {
2426

2527
private final JsonNode rawInput;
2628
private final T task;
29+
private final WorkflowPosition position;
2730

2831
private JsonNode input;
2932
private JsonNode output;
3033
private JsonNode rawOutput;
3134
private FlowDirective flowDirective;
35+
private Map<String, Object> contextVariables;
3236

33-
public TaskContext(JsonNode rawInput, T task) {
34-
this.rawInput = rawInput;
37+
public TaskContext(JsonNode input, WorkflowPosition position) {
38+
this.rawInput = input;
39+
this.position = position;
40+
this.task = null;
41+
this.contextVariables = new HashMap<>();
42+
init();
43+
}
44+
45+
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
46+
this.rawInput = input;
47+
this.position = taskContext.position.copy();
48+
this.task = task;
49+
this.flowDirective = task.getThen();
50+
this.contextVariables = new HashMap<>(taskContext.variables());
51+
init();
52+
}
53+
54+
private void init() {
3555
this.input = rawInput;
3656
this.rawOutput = rawInput;
3757
this.output = rawInput;
38-
this.task = task;
39-
this.flowDirective = task.getThen();
4058
}
4159

4260
public void input(JsonNode input) {
4361
this.input = input;
62+
this.rawOutput = input;
63+
this.output = input;
4464
}
4565

4666
public JsonNode input() {
@@ -81,4 +101,12 @@ public FlowDirective flowDirective() {
81101
? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE)
82102
: flowDirective;
83103
}
104+
105+
public Map<String, Object> variables() {
106+
return contextVariables;
107+
}
108+
109+
public WorkflowPosition position() {
110+
return position;
111+
}
84112
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
2626
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
2727
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
28-
2928
import java.util.Collection;
3029
import java.util.Collections;
3130
import java.util.HashSet;
@@ -40,17 +39,20 @@ public class WorkflowApplication implements AutoCloseable {
4039
private final SchemaValidatorFactory schemaValidatorFactory;
4140
private final Collection<WorkflowExecutionListener> listeners;
4241
private final Map<WorkflowId, WorkflowDefinition> definitions;
42+
private final WorkflowPositionFactory positionFactory;
4343

4444
public WorkflowApplication(
4545
TaskExecutorFactory taskFactory,
4646
ExpressionFactory exprFactory,
4747
ResourceLoaderFactory resourceLoaderFactory,
4848
SchemaValidatorFactory schemaValidatorFactory,
49+
WorkflowPositionFactory positionFactory,
4950
Collection<WorkflowExecutionListener> listeners) {
5051
this.taskFactory = taskFactory;
5152
this.exprFactory = exprFactory;
5253
this.resourceLoaderFactory = resourceLoaderFactory;
5354
this.schemaValidatorFactory = schemaValidatorFactory;
55+
this.positionFactory = positionFactory;
5456
this.listeners = listeners;
5557
this.definitions = new ConcurrentHashMap<>();
5658
}
@@ -85,6 +87,7 @@ public static class Builder {
8587
private Collection<WorkflowExecutionListener> listeners;
8688
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
8789
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();
90+
private WorkflowPositionFactory positionFactory = DefaultWorkflowPositionFactory.get();
8891

8992
private Builder() {}
9093

@@ -111,6 +114,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
111114
return this;
112115
}
113116

117+
public Builder withPositionFactory(WorkflowPositionFactory positionFactory) {
118+
this.positionFactory = positionFactory;
119+
return this;
120+
}
121+
114122
public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) {
115123
this.schemaValidatorFactory = factory;
116124
return this;
@@ -122,6 +130,7 @@ public WorkflowApplication build() {
122130
exprFactory,
123131
resourceLoaderFactory,
124132
schemaValidatorFactory,
133+
positionFactory,
125134
listeners == null
126135
? Collections.emptySet()
127136
: Collections.unmodifiableCollection(listeners));
@@ -146,4 +155,8 @@ public void close() throws Exception {
146155
}
147156
definitions.clear();
148157
}
158+
159+
public WorkflowPositionFactory positionFactory() {
160+
return positionFactory;
161+
}
149162
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,49 +19,16 @@
1919
import io.serverlessworkflow.impl.json.JsonUtils;
2020

2121
public class WorkflowContext {
22-
private final WorkflowPosition position;
2322
private final WorkflowDefinition definition;
2423
private final JsonNode input;
25-
private JsonNode current;
2624
private JsonNode context;
2725

28-
private WorkflowContext(
29-
WorkflowPosition position, WorkflowDefinition definition, JsonNode input) {
30-
this.position = position;
26+
WorkflowContext(WorkflowDefinition definition, JsonNode input) {
3127
this.definition = definition;
3228
this.input = input;
33-
this.current = input.deepCopy();
3429
this.context = JsonUtils.mapper().createObjectNode();
3530
}
3631

37-
public static Builder builder(WorkflowDefinition definition, JsonNode input) {
38-
return new Builder(definition, input);
39-
}
40-
41-
public static class Builder {
42-
private WorkflowPosition position = new DefaultWorkflowPosition();
43-
private WorkflowDefinition definition;
44-
private JsonNode input;
45-
46-
private Builder(WorkflowDefinition definition, JsonNode input) {
47-
this.definition = definition;
48-
this.input = input;
49-
}
50-
51-
public Builder position(WorkflowPosition position) {
52-
this.position = position;
53-
return this;
54-
}
55-
56-
public WorkflowContext build() {
57-
return new WorkflowContext(position, definition, input);
58-
}
59-
}
60-
61-
public WorkflowPosition position() {
62-
return position;
63-
}
64-
6532
public JsonNode context() {
6633
return context;
6734
}
@@ -74,14 +41,6 @@ public JsonNode rawInput() {
7441
return input;
7542
}
7643

77-
public void current(JsonNode output) {
78-
this.current = output;
79-
}
80-
81-
public JsonNode current() {
82-
return current;
83-
}
84-
8544
public WorkflowDefinition definition() {
8645
return definition;
8746
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
2929
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
3030
import io.serverlessworkflow.impl.resources.ResourceLoader;
31-
3231
import java.nio.file.Path;
3332
import java.util.Collection;
3433
import java.util.Map;
@@ -47,6 +46,7 @@ public class WorkflowDefinition implements AutoCloseable {
4746
private final ExpressionFactory exprFactory;
4847
private final ResourceLoader resourceLoader;
4948
private final SchemaValidatorFactory schemaValidatorFactory;
49+
private final WorkflowPositionFactory positionFactory;
5050
private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
5151
new ConcurrentHashMap<>();
5252

@@ -56,12 +56,14 @@ private WorkflowDefinition(
5656
TaskExecutorFactory taskFactory,
5757
ResourceLoader resourceLoader,
5858
ExpressionFactory exprFactory,
59-
SchemaValidatorFactory schemaValidatorFactory) {
59+
SchemaValidatorFactory schemaValidatorFactory,
60+
WorkflowPositionFactory positionFactory) {
6061
this.workflow = workflow;
6162
this.listeners = listeners;
6263
this.taskFactory = taskFactory;
6364
this.exprFactory = exprFactory;
6465
this.schemaValidatorFactory = schemaValidatorFactory;
66+
this.positionFactory = positionFactory;
6567
this.resourceLoader = resourceLoader;
6668
if (workflow.getInput() != null) {
6769
Input input = workflow.getInput();
@@ -90,7 +92,8 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
9092
application.taskFactory(),
9193
application.resourceLoaderFactory().getResourceLoader(path),
9294
application.expressionFactory(),
93-
application.validatorFactory());
95+
application.validatorFactory(),
96+
application.positionFactory());
9497
}
9598

9699
public WorkflowInstance execute(Object input) {
@@ -142,6 +145,10 @@ public ResourceLoader resourceLoader() {
142145
return resourceLoader;
143146
}
144147

148+
public WorkflowPositionFactory positionFactory() {
149+
return positionFactory;
150+
}
151+
145152
@Override
146153
public void close() {
147154
// TODO close resourcers hold for uncompleted process instances, if any

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
19-
import java.util.Optional;
2019

2120
@FunctionalInterface
2221
public interface WorkflowFilter {
23-
JsonNode apply(WorkflowContext workflow, Optional<TaskContext<?>> task, JsonNode node);
22+
JsonNode apply(WorkflowContext workflow, TaskContext<?> task, JsonNode node);
2423
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,37 @@
1818
import static io.serverlessworkflow.impl.json.JsonUtils.toJavaValue;
1919

2020
import com.fasterxml.jackson.databind.JsonNode;
21-
import java.util.Optional;
2221

2322
public class WorkflowInstance {
2423
private WorkflowState state;
2524
private WorkflowContext context;
25+
private TaskContext<?> taskContext;
2626

2727
WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
2828
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
29-
context = WorkflowContext.builder(definition, input).build();
29+
context = new WorkflowContext(definition, input);
30+
taskContext = new TaskContext<>(input, definition.positionFactory().buildPosition());
3031
definition
3132
.inputFilter()
32-
.ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current())));
33+
.ifPresent(f -> taskContext.input(f.apply(context, taskContext, input)));
3334
state = WorkflowState.STARTED;
34-
WorkflowUtils.processTaskList(definition.workflow().getDo(), context);
35+
taskContext.rawOutput(
36+
WorkflowUtils.processTaskList(definition.workflow().getDo(), context, taskContext));
3537
definition
3638
.outputFilter()
37-
.ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current())));
38-
definition.outputSchemaValidator().ifPresent(v -> v.validate(context.current()));
39+
.ifPresent(f -> taskContext.output(f.apply(context, taskContext, taskContext.rawOutput())));
40+
definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output()));
3941
}
4042

4143
public WorkflowState state() {
4244
return state;
4345
}
4446

4547
public Object output() {
46-
return toJavaValue(context.current());
48+
return toJavaValue(taskContext.output());
4749
}
4850

4951
public Object outputAsJsonNode() {
50-
return context.current();
52+
return taskContext.output();
5153
}
5254
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public interface WorkflowPosition {
2424
WorkflowPosition addIndex(int index);
2525

2626
WorkflowPosition back();
27+
28+
WorkflowPosition copy();
2729
}

0 commit comments

Comments
 (0)