Skip to content

Commit be474b2

Browse files
authored
Merge pull request #1070 from neuroglia-io/feat-act-upon-streaming
Add streaming capabilities to both the `listen` task and to the `asyncapi` (subscribe operation) call
2 parents a0e15df + 21e0ece commit be474b2

File tree

5 files changed

+208
-6
lines changed

5 files changed

+208
-6
lines changed

dsl-reference.md

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@
5858
+ [Container Lifetime](#container-lifetime)
5959
+ [Process Result](#process-result)
6060
+ [AsyncAPI Server](#asyncapi-server)
61-
+ [AsyncAPI Message](#asyncapi-message)
61+
+ [AsyncAPI Outbound Message](#asyncapi-outbound-message)
6262
+ [AsyncAPI Subscription](#asyncapi-subscription)
63+
+ [Subscription Iterator](#subscription-iterator)
6364

6465
## Abstract
6566

@@ -311,7 +312,7 @@ The [AsyncAPI Call](#asyncapi-call) enables workflows to interact with external
311312
| operation | `string` | `yes` | A reference to the AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call.<br>*Used only in case the referenced document uses AsyncAPI `v3.0.0`.* |
312313
| server | [`asyncApiServer`](#asyncapi-server) | `no` | An object used to configure to the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) on.<br>If not set, default to the first [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) matching the operation's channel. |
313314
| protocol | `string` | `no` | The [protocol](https://www.asyncapi.com/docs/reference/specification/v3.0.0#definitionsProtocol) to use to select the target [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject). <br>Ignored if `server` has been set.<br>*Supported values are: `amqp`, `amqp1`, `anypointmq`, `googlepubsub`, `http`, `ibmmq`, `jms`, `kafka`, `mercure`, `mqtt`, `mqtt5`, `nats`, `pulsar`, `redis`, `sns`, `solace`, `sqs`, `stomp` and `ws`* |
314-
| message | [`asyncApiMessage`](#asyncapi-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
315+
| message | [`asyncApiMessage`](#asyncapi-outbound-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
315316
| subscription | [`asyncApiSubscription`](#asyncapi-subscription) | `no` | An object used to configure the subscription to messages consumed using the target operation.<br>*Required if `message` has not been set.* |
316317
| authentication | `string`<br>[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |
317318

@@ -650,7 +651,19 @@ Provides a mechanism for workflows to await and react to external events, enabli
650651

651652
| Name | Type | Required | Description|
652653
|:--|:---:|:---:|:---|
653-
| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the event(s) the workflow must listen to. |
654+
| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the [event(s)](https://cloudevents.io/) the workflow must listen to. |
655+
| listen.read | `string` | `no` | Specifies how [events](https://cloudevents.io/) are read during the listen operation.<br>*Supported values are:*<br>*- `data`: Reads the [event's](https://cloudevents.io/) data.*<br>*- `envelope`: Reads the [event's](https://cloudevents.io/) envelope, including its [context attributes](https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#context-attributes).*<br>*- `raw`: Reads the [event's](https://cloudevents.io/) raw data.*<br>*Defaults to `data`.*|
656+
| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [event](https://cloudevents.io/). |
657+
658+
> [!NOTE]
659+
> A `listen` task produces a sequentially ordered array of all the [events](https://cloudevents.io/) it has consumed, and potentially transformed using `foreach.output.as`.
660+
661+
> [!NOTE]
662+
> When `foreach` is set, the configured operations for a [events](https://cloudevents.io/) must complete before moving on to the next one. As a result, consumed [events](https://cloudevents.io/) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.
663+
664+
> [!WARNING]
665+
> [Events](https://cloudevents.io/) consumed by an `until` clause should not be included in the task's output. These [events](https://cloudevents.io/) are used solely to determine when the until condition has been met, and they do not contribute to the result or data produced by the task itself
666+
654667

655668
##### Examples
656669

@@ -2038,7 +2051,7 @@ do:
20382051
bar: baz
20392052
```
20402053

2041-
### AsyncAPI Message
2054+
### AsyncAPI Outbound Message
20422055

20432056
Configures an AsyncAPI message to publish.
20442057

@@ -2073,6 +2086,29 @@ do:
20732086
bar: baz
20742087
```
20752088

2089+
### AsyncAPI Inbound Message
2090+
2091+
Configures an AsyncAPI message consumed by a subscription.
2092+
2093+
#### Properties
2094+
2095+
| Name | Type | Required | Description |
2096+
|:-------|:------:|:----------:|:--------------|
2097+
| payload | `object` | `no` | The message's payload, if any. |
2098+
| headers | `object` | `no` | The message's headers, if any. |
2099+
| correlationId | `string` | `no` | The message's correlation id, if any. |
2100+
2101+
#### Examples
2102+
2103+
```yaml
2104+
payload:
2105+
greetings: Hello, World!
2106+
headers:
2107+
foo: bar
2108+
bar: baz
2109+
correlationid: '123456'
2110+
```
2111+
20762112
### AsyncAPI Subscription
20772113

20782114
Configures a subscription to an AsyncAPI operation.
@@ -2081,8 +2117,15 @@ Configures a subscription to an AsyncAPI operation.
20812117

20822118
| Name | Type | Required | Description |
20832119
|:-------|:------:|:----------:|:--------------|
2084-
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed messages. |
2120+
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed [messages](#asyncapi-inbound-message). |
20852121
| consume | [`subscriptionLifetime`](#asyncapi-subscription-lifetime) | `yes` | An object used to configure the subscription's lifetime. |
2122+
| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [message](#asyncapi-inbound-message). |
2123+
2124+
> [!NOTE]
2125+
> An AsyncAPI subscribe operation call produces a sequentially ordered array of all the [messages](#asyncapi-inbound-message) it has consumed, and potentially transformed using `foreach.output.as`.
2126+
2127+
> [!NOTE]
2128+
> When `foreach` is set, the configured operations for a [message](#asyncapi-inbound-message) must complete before moving on to the next one. As a result, consumed [messages](#asyncapi-inbound-message) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.
20862129

20872130
#### Examples
20882131

@@ -2115,7 +2158,7 @@ Configures the lifetime of an AsyncAPI subscription
21152158
#### Properties
21162159

21172160
| Name | Type | Required | Description |
2118-
|:-------|:------:|:----------:|:--------------|
2161+
|:-----|:----:|:--------:|:------------|
21192162
| amount | `integer` | `no` | The amount of messages to consume.<br>*Required if `while` and `until` have not been set.* |
21202163
| for | [`duration`](#duration) | `no` | The [`duration`](#duration) that defines for how long to consume messages. |
21212164
| while | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine whether or not to keep consuming messages.<br>*Required if `amount` and `until` have not been set.* |
@@ -2143,4 +2186,54 @@ do:
21432186
until: '${ ($context.messages | length) == 5 }'
21442187
for:
21452188
seconds: 10
2189+
```
2190+
2191+
### Subscription Iterator
2192+
2193+
Configures the iteration over each item (event or message) consumed by a subscription. It encapsulates configuration for processing tasks, output formatting, and export behavior for every item encountered.
2194+
2195+
#### Properties
2196+
2197+
| Name | Type | Required | Description |
2198+
|:-----|:----:|:--------:|:------------|
2199+
| item | `string` | `no` | The name of the variable used to store the current item being enumerated.<br>*Defaults to `item`.* |
2200+
| at | `string` | `no` | The name of the variable used to store the index of the current item being enumerated.<br>*Defaults to `index`.* |
2201+
| do | [`map[string, task][]`](#task) | `no` | The tasks to perform for each consumed item. |
2202+
| output | [`output`](#output) | `no` | An object, if any, used to customize the item's output and to document its schema. |
2203+
| export | [`export`](#export) | `no` | An object, if any, used to customize the content of the workflow context. |
2204+
2205+
#### Examples
2206+
2207+
```yaml
2208+
document:
2209+
dsl: '1.0.0-alpha5'
2210+
namespace: test
2211+
name: asyncapi-example
2212+
version: '0.1.0'
2213+
do:
2214+
- subscribeToChatInboxUntil:
2215+
call: asyncapi
2216+
with:
2217+
document:
2218+
endpoint: https://fake.com/docs/asyncapi.json
2219+
operation: chat-inbox
2220+
protocol: http
2221+
subscription:
2222+
filter: ${ . == $workflow.input.chat.roomId }
2223+
consume:
2224+
until: '${ ($context.messages | length) == 5 }'
2225+
for:
2226+
seconds: 10
2227+
foreach:
2228+
item: message
2229+
at: index
2230+
do:
2231+
- emitEvent:
2232+
emit:
2233+
event:
2234+
with:
2235+
source: https://serverlessworkflow.io/samples
2236+
type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
2237+
data:
2238+
message: '${ $message }'
21462239
```
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: examples
4+
name: bearer-auth
5+
version: '0.1.0'
6+
do:
7+
- getNotifications:
8+
call: asyncapi
9+
with:
10+
document:
11+
endpoint: https://fake.com/docs/asyncapi.json
12+
operation: getNotifications
13+
subscription:
14+
filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
15+
consume:
16+
while: '${ true }'
17+
foreach:
18+
item: message
19+
do:
20+
- publishCloudEvent:
21+
emit:
22+
event:
23+
with:
24+
source: https://serverlessworkflow.io/samples
25+
type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
26+
data:
27+
message: '${ $message }'
28+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-to-all-read-envelope
5+
version: '0.1.0'
6+
do:
7+
- callDoctor:
8+
listen:
9+
to:
10+
all:
11+
- with:
12+
type: com.fake-hospital.vitals.measurements.temperature
13+
data: ${ .temperature > 38 }
14+
- with:
15+
type: com.fake-hospital.vitals.measurements.bpm
16+
data: ${ .bpm < 60 or .bpm > 100 }
17+
read: envelope
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
document:
2+
dsl: '1.0.0-alpha1'
3+
namespace: test
4+
name: listen-to-any-while-foreach
5+
version: '0.1.0'
6+
do:
7+
- listenToGossips:
8+
listen:
9+
to:
10+
any: []
11+
until: '${ false }'
12+
foreach:
13+
item: event
14+
at: i
15+
do:
16+
- postToChatApi:
17+
call: http
18+
with:
19+
method: post
20+
endpoint: https://fake-chat-api.com/room/{roomId}
21+
body:
22+
event: ${ $event }

schema/workflow.yaml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,17 @@ $defs:
558558
$ref: '#/$defs/eventConsumptionStrategy'
559559
title: ListenTo
560560
description: Defines the event(s) to listen to.
561+
read:
562+
type: string
563+
enum: [ data, envelope, raw ]
564+
default: data
565+
title: ListenAndReadAs
566+
description: Specifies how events are read during the listen operation.
561567
required: [ to ]
568+
foreach:
569+
$ref: '#/$defs/subscriptionIterator'
570+
title: ListenIterator
571+
description: Configures the iterator, if any, for processing consumed event(s).
562572
raiseTask:
563573
type: object
564574
$ref: '#/$defs/taskBase'
@@ -1710,6 +1720,10 @@ $defs:
17101720
$ref: '#/$defs/asyncApiMessageConsumptionPolicy'
17111721
title: AsyncApiMessageConsumptionPolicy
17121722
description: An object used to configure the subscription's message consumption policy.
1723+
foreach:
1724+
$ref: '#/$defs/subscriptionIterator'
1725+
title: AsyncApiSubscriptionIterator
1726+
description: Configures the iterator, if any, for processing consumed messages(s).
17131727
required: [ consume ]
17141728
asyncApiMessageConsumptionPolicy:
17151729
type: object
@@ -1740,3 +1754,31 @@ $defs:
17401754
title: AsyncApiMessageConsumptionPolicyUntil
17411755
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
17421756
required: [ until ]
1757+
subscriptionIterator:
1758+
type: object
1759+
title: SubscriptionIterator
1760+
description: Configures the iteration over each item (event or message) consumed by a subscription.
1761+
unevaluatedProperties: false
1762+
properties:
1763+
item:
1764+
type: string
1765+
title: SubscriptionIteratorItem
1766+
description: The name of the variable used to store the current item being enumerated.
1767+
default: item
1768+
at:
1769+
type: string
1770+
title: SubscriptionIteratorIndex
1771+
description: The name of the variable used to store the index of the current item being enumerated.
1772+
default: index
1773+
do:
1774+
$ref: '#/$defs/taskList'
1775+
title: SubscriptionIteratorTasks
1776+
description: The tasks to perform for each consumed item.
1777+
output:
1778+
$ref: '#/$defs/output'
1779+
title: SubscriptionIteratorOutput
1780+
description: An object, if any, used to customize the item's output and to document its schema.
1781+
export:
1782+
$ref: '#/$defs/export'
1783+
title: SubscriptionIteratorExport
1784+
description: An object, if any, used to customize the content of the workflow context.

0 commit comments

Comments
 (0)