Skip to content

Commit d274143

Browse files
committed
- Add a new foreach property to both listen task and asyncapi call, used to configure the iterator used to process consumed events/messages
- Add a new `read` property to the `listen` task, used to configure how to read consumed events Signed-off-by: Charles d'Avernas <charles.davernas@neuroglia.io>
1 parent a0e15df commit d274143

5 files changed

+203
-6
lines changed

dsl-reference.md

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
+ [Container Lifetime](#container-lifetime)
5959
+ [Process Result](#process-result)
6060
+ [AsyncAPI Server](#asyncapi-server)
61-
+ [AsyncAPI Message](#asyncapi-message)
61+
+ [AsyncAPI Outbound Message](#asyncapi-outbound-message)
6262
+ [AsyncAPI Subscription](#asyncapi-subscription)
6363

6464
## Abstract
@@ -311,7 +311,7 @@ The [AsyncAPI Call](#asyncapi-call) enables workflows to interact with external
311311
| operation | `string` | `yes` | A reference to the AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call.<br>*Used only in case the referenced document uses AsyncAPI `v3.0.0`.* |
312312
| server | [`asyncApiServer`](#asyncapi-server) | `no` | An object used to configure to the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) on.<br>If not set, default to the first [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) matching the operation's channel. |
313313
| protocol | `string` | `no` | The [protocol](https://www.asyncapi.com/docs/reference/specification/v3.0.0#definitionsProtocol) to use to select the target [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject). <br>Ignored if `server` has been set.<br>*Supported values are: `amqp`, `amqp1`, `anypointmq`, `googlepubsub`, `http`, `ibmmq`, `jms`, `kafka`, `mercure`, `mqtt`, `mqtt5`, `nats`, `pulsar`, `redis`, `sns`, `solace`, `sqs`, `stomp` and `ws`* |
314-
| message | [`asyncApiMessage`](#asyncapi-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
314+
| message | [`asyncApiMessage`](#asyncapi-outbound-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
315315
| subscription | [`asyncApiSubscription`](#asyncapi-subscription) | `no` | An object used to configure the subscription to messages consumed using the target operation.<br>*Required if `message` has not been set.* |
316316
| authentication | `string`<br>[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |
317317

@@ -650,7 +650,15 @@ Provides a mechanism for workflows to await and react to external events, enabli
650650

651651
| Name | Type | Required | Description|
652652
|:--|:---:|:---:|:---|
653-
| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the event(s) the workflow must listen to. |
653+
| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the [event(s)](https://cloudevents.io/) the workflow must listen to. |
654+
| listen.read | `string` | `no` | Specifies how [events](https://cloudevents.io/) are read during the listen operation.<br>*Supported values are:*<br>*- `data`: Reads the [event's](https://cloudevents.io/) data.*<br>*- `envelope`: Reads the [event's](https://cloudevents.io/) envelope, including its [context attributes](https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#context-attributes).*<br>*- `raw`: Reads the [event's](https://cloudevents.io/) raw data.*<br>*Defaults to `data`.*|
655+
| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [event](https://cloudevents.io/). |
656+
657+
> [!NOTE]
658+
> A `listen` task produces a sequentially ordered array of all the [events](https://cloudevents.io/) it has consumed, and potentially transformed using `foreach.output.as`.
659+
660+
> [!NOTE]
661+
> When `foreach` is set, the configured operations for a [events](https://cloudevents.io/) must complete before moving on to the next one. As a result, consumed [events](https://cloudevents.io/) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.
654662

655663
##### Examples
656664

@@ -2038,7 +2046,7 @@ do:
20382046
bar: baz
20392047
```
20402048

2041-
### AsyncAPI Message
2049+
### AsyncAPI Outbound Message
20422050

20432051
Configures an AsyncAPI message to publish.
20442052

@@ -2073,6 +2081,29 @@ do:
20732081
bar: baz
20742082
```
20752083

2084+
### AsyncAPI Inbound Message
2085+
2086+
Configures an AsyncAPI message consumed by a subscription.
2087+
2088+
#### Properties
2089+
2090+
| Name | Type | Required | Description |
2091+
|:-------|:------:|:----------:|:--------------|
2092+
| payload | `object` | `no` | The message's payload, if any. |
2093+
| headers | `object` | `no` | The message's headers, if any. |
2094+
| correlationId | `string` | `no` | The message's correlation id, if any. |
2095+
2096+
#### Examples
2097+
2098+
```yaml
2099+
payload:
2100+
greetings: Hello, World!
2101+
headers:
2102+
foo: bar
2103+
bar: baz
2104+
correlationid: '123456'
2105+
```
2106+
20762107
### AsyncAPI Subscription
20772108

20782109
Configures a subscription to an AsyncAPI operation.
@@ -2081,8 +2112,15 @@ Configures a subscription to an AsyncAPI operation.
20812112

20822113
| Name | Type | Required | Description |
20832114
|:-------|:------:|:----------:|:--------------|
2084-
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed messages. |
2115+
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed [messages](#asyncapi-inbound-message). |
20852116
| consume | [`subscriptionLifetime`](#asyncapi-subscription-lifetime) | `yes` | An object used to configure the subscription's lifetime. |
2117+
| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [message](#asyncapi-inbound-message). |
2118+
2119+
> [!NOTE]
2120+
> An AsyncAPI subscribe operation call produces a sequentially ordered array of all the [messages](#asyncapi-inbound-message) it has consumed, and potentially transformed using `foreach.output.as`.
2121+
2122+
> [!NOTE]
2123+
> When `foreach` is set, the configured operations for a [message](#asyncapi-inbound-message) must complete before moving on to the next one. As a result, consumed [messages](#asyncapi-inbound-message) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.
20862124

20872125
#### Examples
20882126

@@ -2115,7 +2153,7 @@ Configures the lifetime of an AsyncAPI subscription
21152153
#### Properties
21162154

21172155
| Name | Type | Required | Description |
2118-
|:-------|:------:|:----------:|:--------------|
2156+
|:-----|:----:|:--------:|:------------|
21192157
| amount | `integer` | `no` | The amount of messages to consume.<br>*Required if `while` and `until` have not been set.* |
21202158
| for | [`duration`](#duration) | `no` | The [`duration`](#duration) that defines for how long to consume messages. |
21212159
| while | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine whether or not to keep consuming messages.<br>*Required if `amount` and `until` have not been set.* |
@@ -2143,4 +2181,54 @@ do:
21432181
until: '${ ($context.messages | length) == 5 }'
21442182
for:
21452183
seconds: 10
2184+
```
2185+
2186+
### Subscription Iterator
2187+
2188+
Configures the iteration over each item (event or message) consumed by a subscription. It encapsulates configuration for processing tasks, output formatting, and export behavior for every item encountered.
2189+
2190+
#### Properties
2191+
2192+
| Name | Type | Required | Description |
2193+
|:-----|:----:|:--------:|:------------|
2194+
| item | `string` | `no` | The name of the variable used to store the current item being enumerated.<br>*Defaults to `item`.* |
2195+
| at | `string` | `no` | The name of the variable used to store the index of the current item being enumerated.<br>*Defaults to `index`.* |
2196+
| do | [`map[string, task][]`](#task) | `no` | The tasks to perform for each consumed item. |
2197+
| output | [`output`](#output) | `no` | An object, if any, used to customize the item's output and to document its schema. |
2198+
| export | [`export`](#export) | `no` | An object, if any, used to customize the content of the workflow context. |
2199+
2200+
#### Examples
2201+
2202+
```yaml
2203+
document:
2204+
dsl: '1.0.0-alpha5'
2205+
namespace: test
2206+
name: asyncapi-example
2207+
version: '0.1.0'
2208+
do:
2209+
- subscribeToChatInboxUntil:
2210+
call: asyncapi
2211+
with:
2212+
document:
2213+
endpoint: https://fake.com/docs/asyncapi.json
2214+
operation: chat-inbox
2215+
protocol: http
2216+
subscription:
2217+
filter: ${ . == $workflow.input.chat.roomId }
2218+
consume:
2219+
until: '${ ($context.messages | length) == 5 }'
2220+
for:
2221+
seconds: 10
2222+
foreach:
2223+
item: message
2224+
at: index
2225+
do:
2226+
- emitEvent:
2227+
emit:
2228+
event:
2229+
with:
2230+
source: https://serverlessworkflow.io/samples
2231+
type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
2232+
data:
2233+
message: '${ $message }'
21462234
```
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: examples
4+
name: bearer-auth
5+
version: '0.1.0'
6+
do:
7+
- getNotifications:
8+
call: asyncapi
9+
with:
10+
document:
11+
endpoint: https://fake.com/docs/asyncapi.json
12+
operation: getNotifications
13+
subscription:
14+
filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
15+
consume:
16+
while: '${ true }'
17+
foreach:
18+
item: message
19+
do:
20+
- publishCloudEvent:
21+
emit:
22+
event:
23+
with:
24+
source: https://serverlessworkflow.io/samples
25+
type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
26+
data:
27+
message: '${ $message }'
28+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-to-all-read-envelope
5+
version: '0.1.0'
6+
do:
7+
- callDoctor:
8+
listen:
9+
to:
10+
all:
11+
- with:
12+
type: com.fake-hospital.vitals.measurements.temperature
13+
data: ${ .temperature > 38 }
14+
- with:
15+
type: com.fake-hospital.vitals.measurements.bpm
16+
data: ${ .bpm < 60 or .bpm > 100 }
17+
read: envelope
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
document:
2+
dsl: '1.0.0-alpha1'
3+
namespace: test
4+
name: listen-to-any-while-foreach
5+
version: '0.1.0'
6+
do:
7+
- listenToGossips:
8+
listen:
9+
to:
10+
any: []
11+
while: '${ true }'
12+
foreach:
13+
item: event
14+
at: i
15+
do:
16+
- postToChatApi:
17+
call: http
18+
with:
19+
method: post
20+
endpoint: https://fake-chat-api.com/room/{roomId}
21+
body:
22+
event: ${ $event }

schema/workflow.yaml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,17 @@ $defs:
558558
$ref: '#/$defs/eventConsumptionStrategy'
559559
title: ListenTo
560560
description: Defines the event(s) to listen to.
561+
read:
562+
type: string
563+
enum: [ data, envelope, raw ]
564+
default: data
565+
title: ListenAndReadAs
566+
description: Specifies how events are read during the listen operation.
561567
required: [ to ]
568+
foreach:
569+
$ref: '#/$defs/subscriptionIterator'
570+
title: ListenIterator
571+
description: Configures the iterator, if any, for processing consumed event(s).
562572
raiseTask:
563573
type: object
564574
$ref: '#/$defs/taskBase'
@@ -1710,6 +1720,10 @@ $defs:
17101720
$ref: '#/$defs/asyncApiMessageConsumptionPolicy'
17111721
title: AsyncApiMessageConsumptionPolicy
17121722
description: An object used to configure the subscription's message consumption policy.
1723+
foreach:
1724+
$ref: '#/$defs/subscriptionIterator'
1725+
title: AsyncApiSubscriptionIterator
1726+
description: Configures the iterator, if any, for processing consumed messages(s).
17131727
required: [ consume ]
17141728
asyncApiMessageConsumptionPolicy:
17151729
type: object
@@ -1740,3 +1754,31 @@ $defs:
17401754
title: AsyncApiMessageConsumptionPolicyUntil
17411755
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
17421756
required: [ until ]
1757+
subscriptionIterator:
1758+
type: object
1759+
title: SubscriptionIterator
1760+
description: Configures the iteration over each item (event or message) consumed by a subscription.
1761+
unevaluatedProperties: false
1762+
properties:
1763+
item:
1764+
type: string
1765+
title: SubscriptionIteratorItem
1766+
description: The name of the variable used to store the current item being enumerated.
1767+
default: item
1768+
at:
1769+
type: string
1770+
title: SubscriptionIteratorIndex
1771+
description: The name of the variable used to store the index of the current item being enumerated.
1772+
default: index
1773+
do:
1774+
$ref: '#/$defs/taskList'
1775+
title: SubscriptionIteratorTasks
1776+
description: The tasks to perform for each consumed item.
1777+
output:
1778+
$ref: '#/$defs/output'
1779+
title: SubscriptionIteratorOutput
1780+
description: An object, if any, used to customize the item's output and to document its schema.
1781+
export:
1782+
$ref: '#/$defs/export'
1783+
title: SubscriptionIteratorExport
1784+
description: An object, if any, used to customize the content of the workflow context.

0 commit comments

Comments
 (0)