diff --git a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj
index 698aa3c87..5a56b1552 100644
--- a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj
+++ b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj
@@ -44,7 +44,7 @@
-
+
diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj
index d9b461d81..b0083eb23 100644
--- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj
+++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj
@@ -42,8 +42,8 @@
-
-
+
+
diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
index 77eeac22f..093aad1a3 100644
--- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
+++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
@@ -31,9 +31,9 @@
-
-
-
+
+
+
diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj
index 0808cab49..70079e456 100644
--- a/src/cli/Synapse.Cli/Synapse.Cli.csproj
+++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj
@@ -29,11 +29,11 @@
-
-
+
+
-
+
diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj
index 90a9fdfcd..a2ee6abe0 100644
--- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj
+++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj
@@ -51,7 +51,7 @@
-
+
diff --git a/src/core/Synapse.Core/Resources/CorrelationContext.cs b/src/core/Synapse.Core/Resources/CorrelationContext.cs
index a46ba1a56..ccd3d732a 100644
--- a/src/core/Synapse.Core/Resources/CorrelationContext.cs
+++ b/src/core/Synapse.Core/Resources/CorrelationContext.cs
@@ -37,13 +37,19 @@ public record CorrelationContext
///
/// Gets a key/value mapping of the context's correlation keys
///
- [DataMember(Name = "keys", Order = 2), JsonPropertyName("keys"), JsonPropertyOrder(2), YamlMember(Alias = "keys", Order = 2)]
+ [DataMember(Name = "keys", Order = 3), JsonPropertyName("keys"), JsonPropertyOrder(3), YamlMember(Alias = "keys", Order = 3)]
public virtual EquatableDictionary Keys { get; set; } = [];
///
/// Gets a key/value mapping of all correlated events, with the key being the index of the matched correlation filter
///
- [DataMember(Name = "events", Order = 3), JsonPropertyName("events"), JsonPropertyOrder(3), YamlMember(Alias = "events", Order = 3)]
+ [DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
public virtual EquatableDictionary Events { get; set; } = [];
+ ///
+ /// Gets the offset that serves as the index of the event being processed by the consumer, if streaming has been enabled for the correlation associated with the context.
+ ///
+ [DataMember(Name = "offset", Order = 5), JsonPropertyName("offset"), JsonPropertyOrder(5), YamlMember(Alias = "offset", Order = 5)]
+ public virtual uint? Offset { get; set; }
+
}
\ No newline at end of file
diff --git a/src/core/Synapse.Core/Resources/CorrelationSpec.cs b/src/core/Synapse.Core/Resources/CorrelationSpec.cs
index fe56f5063..544ce215f 100644
--- a/src/core/Synapse.Core/Resources/CorrelationSpec.cs
+++ b/src/core/Synapse.Core/Resources/CorrelationSpec.cs
@@ -46,10 +46,16 @@ public record CorrelationSpec
[DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!;
+ ///
+ /// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete
+ ///
+ [DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)]
+ public virtual bool Stream { get; set; }
+
///
/// Gets/sets an object used to configure the correlation's outcome
///
- [DataMember(Name = "outcome", Order = 5), JsonPropertyName("outcome"), JsonPropertyOrder(5), YamlMember(Alias = "outcome", Order = 5)]
+ [DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)]
public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!;
}
diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj
index 63e4011b5..76c0e3b63 100644
--- a/src/core/Synapse.Core/Synapse.Core.csproj
+++ b/src/core/Synapse.Core/Synapse.Core.csproj
@@ -66,11 +66,11 @@
-
+
-
+
diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
index 2a445a71a..852608349 100644
--- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
+++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
@@ -334,7 +334,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
{
var index = updatedResource.Status.Contexts.IndexOf(existingContext);
updatedResource.Status.Contexts.Remove(existingContext);
- if (!completed) updatedResource.Status.Contexts.Insert(index, context);
+ updatedResource.Status.Contexts.Insert(index, context);
}
if (completed)
{
diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
index d67a4e3cf..af3a358d7 100644
--- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
+++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
@@ -33,8 +33,8 @@
-
-
+
+
diff --git a/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj b/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj
index 4bcda49da..285011f3a 100644
--- a/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj
+++ b/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj
@@ -9,9 +9,9 @@
-
-
-
+
+
+
diff --git a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
index 158649aba..4559655e9 100644
--- a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
+++ b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
@@ -13,9 +13,9 @@
-
-
-
+
+
+
diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs
index 60c97a5fa..a70c4c474 100644
--- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs
+++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs
@@ -156,7 +156,7 @@ protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowIn
}
catch(Exception ex)
{
- this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
+ this.Logger.LogError("An error occurred while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}
diff --git a/src/operator/Synapse.Operator/Synapse.Operator.csproj b/src/operator/Synapse.Operator/Synapse.Operator.csproj
index 0374b6e36..e11fe1559 100644
--- a/src/operator/Synapse.Operator/Synapse.Operator.csproj
+++ b/src/operator/Synapse.Operator/Synapse.Operator.csproj
@@ -51,8 +51,8 @@
-
-
+
+
diff --git a/src/runner/Synapse.Runner/IStreamedCloudEvent.cs b/src/runner/Synapse.Runner/IStreamedCloudEvent.cs
new file mode 100644
index 000000000..7ad3f56de
--- /dev/null
+++ b/src/runner/Synapse.Runner/IStreamedCloudEvent.cs
@@ -0,0 +1,39 @@
+// Copyright © 2024-Present The Synapse Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"),
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Synapse.Runner;
+
+///
+/// Defines the fundamentals of an object used to wrap a streamed
+///
+public interface IStreamedCloudEvent
+{
+
+ ///
+ /// Gets the streamed
+ ///
+ CloudEvent Event { get; }
+
+ ///
+ /// Gets the position of the within its originating stream
+ ///
+ uint Offset { get; }
+
+ ///
+ /// Acknowledges that the has been successfully processed
+ ///
+ /// A
+ /// A new awaitable
+ Task AckAsync(CancellationToken cancellationToken = default);
+
+}
diff --git a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
index 501b7c4ce..adb47df53 100644
--- a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
+++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
@@ -20,6 +20,8 @@
using Synapse.Events.Tasks;
using Synapse.Events.Workflows;
using System.Net.Mime;
+using System.Reactive.Disposables;
+using System.Reactive.Threading.Tasks;
namespace Synapse.Runner.Services;
@@ -304,11 +306,125 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa
this.Logger.LogInformation("The workflow's execution has been resumed.");
}
+ ///
+ public virtual async Task> StreamAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(task);
+ if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task));
+ if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task doesn't use streaming, the {nameof(CorrelateAsync)} method must be used instead");
+ if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return Observable.Empty();
+ var @namespace = task.Workflow.Instance.GetNamespace()!;
+ var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}";
+ Correlation? correlation = null;
+ try { correlation = await this.Api.Correlations.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false); }
+ catch { }
+ if (correlation == null)
+ {
+ correlation = await this.Api.Correlations.CreateAsync(new()
+ {
+ Metadata = new()
+ {
+ Namespace = @namespace,
+ Name = name,
+ Labels = new Dictionary()
+ {
+ { SynapseDefaults.Resources.Labels.WorkflowInstance, this.Instance.GetQualifiedName() }
+ }
+ },
+ Spec = new()
+ {
+ Source = new ResourceReference(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()),
+ Lifetime = CorrelationLifetime.Ephemeral,
+ Events = listenTask.Listen.To,
+ Stream = true,
+ Expressions = task.Workflow.Definition.Evaluate ?? new(),
+ Outcome = new()
+ {
+ Correlate = new()
+ {
+ Instance = task.Workflow.Instance.GetQualifiedName(),
+ Task = task.Instance.Reference.OriginalString
+ }
+ }
+ }
+ }, cancellationToken).ConfigureAwait(false);
+ }
+ var taskCompletionSource = new TaskCompletionSource();
+ var cancellationTokenRegistration = cancellationToken.Register(() => taskCompletionSource.TrySetCanceled());
+ var correlationSubscription = this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken)
+ .ToObservable()
+ .Where(e => e.Type == ResourceWatchEventType.Updated)
+ .Select(e => e.Resource.Status?.Correlation?.Contexts)
+ .Scan((Previous: (EquatableDictionary?)null, Current: (EquatableDictionary?)null), (accumulator, current) => (accumulator.Current ?? [], current))
+ .Where(v => v.Current?.Count > v.Previous?.Count) //ensures we are not handling changes in a circular loop: if length of current is smaller than previous, it means a context has been processed
+ .Subscribe(value =>
+ {
+ var patch = JsonPatchUtility.CreateJsonPatchFromDiff(value.Previous, value.Current);
+ var patchOperation = patch.Operations.FirstOrDefault(o => o.Op == OperationType.Add && o.Path[0] == task.Instance.Reference.OriginalString);
+ if (patchOperation == null) return;
+ context = this.JsonSerializer.Deserialize(patchOperation.Value!)!;
+ taskCompletionSource.SetResult(context);
+ });
+ var endOfStream = false;
+ var stopObservable = taskCompletionSource.Task.ToObservable();
+ var stopSubscription = stopObservable.Take(1).Subscribe(_ => endOfStream = true);
+ return Observable.Create(observer =>
+ {
+ var subscription = Observable.Using(
+ () => new CompositeDisposable
+ {
+ cancellationTokenRegistration,
+ correlationSubscription
+ },
+ disposable => this.Api.Correlations.MonitorAsync(correlation.GetName(), correlation.GetNamespace()!, cancellationToken)
+ .ToObservable()
+ .Where(e => e.Type == ResourceWatchEventType.Updated)
+ .Select(e => e.Resource.Status?.Contexts?.FirstOrDefault())
+ .Where(c => c != null)
+ .SelectMany(c =>
+ {
+ var acknowledgedOffset = c!.Offset.HasValue ? (int)c.Offset.Value : 0;
+ return c.Events.Values
+ .Skip(acknowledgedOffset)
+ .Select((evt, index) => new
+ {
+ ContextId = c.Id,
+ Event = evt,
+ Offset = (uint)(acknowledgedOffset + index + 1)
+ });
+ })
+ .Distinct(e => e.Offset)
+ .Select(e => new StreamedCloudEvent(e.Event, e.Offset, async (offset, token) =>
+ {
+ var original = await this.Api.Correlations.GetAsync(name, @namespace, token).ConfigureAwait(false);
+ var updated = original.Clone()!;
+ var context = updated.Status?.Contexts.FirstOrDefault(c => c.Id == e.ContextId);
+ if (context == null)
+ {
+ this.Logger.LogError("Failed to find a context with the specified id '{contextId}' in correlation '{name}.{@namespace}'", e.ContextId, name, @namespace);
+ throw new Exception($"Failed to find a context with the specified id '{e.ContextId}' in correlation '{name}.{@namespace}'");
+ }
+ context.Offset = offset;
+ var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
+ await this.Api.Correlations.PatchStatusAsync(name, @namespace, new Patch(PatchType.JsonPatch, patch), cancellationToken: token).ConfigureAwait(false);
+ })))
+ .Subscribe(e =>
+ {
+ observer.OnNext(e);
+ if (endOfStream) observer.OnCompleted();
+ },
+ ex => observer.OnError(ex),
+ () => observer.OnCompleted());
+ return new CompositeDisposable(subscription, stopSubscription);
+ });
+ }
+
///
public virtual async Task CorrelateAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(task);
if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task));
+ if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead");
if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return context;
var @namespace = task.Workflow.Instance.GetNamespace()!;
var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}";
diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
index 9f0843f2d..3159cca90 100644
--- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
+++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
@@ -84,6 +84,36 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger
protected object? MessageHeaders { get; set; }
+ ///
+ /// Gets the 's subscription
+ ///
+ protected IDisposable? Subscription { get; set; }
+
+ ///
+ /// Gets/sets the position of the within its originating stream
+ ///
+ protected uint? Offset { get; set; }
+
+ ///
+ /// Gets the path for the specified message
+ ///
+ /// The offset of the message to get the path for
+ /// The path for the specified message
+ protected virtual string GetPathFor(uint offset) => $"{nameof(AsyncApiSubscriptionDefinition.Foreach).ToCamelCase()}/{offset}/{nameof(ForTaskDefinition.Do).ToCamelCase()}";
+
+ ///
+ protected override async Task CreateTaskExecutorAsync(TaskInstance task, TaskDefinition definition, IDictionary contextData, IDictionary? arguments = null, CancellationToken cancellationToken = default)
+ {
+ var executor = await base.CreateTaskExecutorAsync(task, definition, contextData, arguments, cancellationToken).ConfigureAwait(false);
+ executor.SubscribeAsync
+ (
+ _ => System.Threading.Tasks.Task.CompletedTask,
+ async ex => await this.OnMessageProcessingErrorAsync(executor, this.CancellationTokenSource!.Token).ConfigureAwait(false),
+ async () => await this.OnMessageProcessingCompletedAsync(executor, this.CancellationTokenSource!.Token).ConfigureAwait(false)
+ );
+ return executor;
+ }
+
///
protected override async Task DoInitializeAsync(CancellationToken cancellationToken)
{
@@ -224,8 +254,127 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false));
return (message, keepGoing);
})).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
- var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false);
- await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
+ if (this.AsyncApi.Subscription.Foreach == null)
+ {
+ var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false);
+ await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync);
+ }
+ }
+
+ ///
+ /// Handles the streaming of a
+ ///
+ /// The streamed
+ /// A new awaitable
+ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
+ {
+ if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
+ if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
+ if (this.AsyncApi.Subscription.Foreach?.Do != null)
+ {
+ var taskDefinition = new DoTaskDefinition()
+ {
+ Do = this.AsyncApi.Subscription.Foreach.Do,
+ Metadata =
+ [
+ new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false)
+ ]
+ };
+ var arguments = this.GetExpressionEvaluationArguments();
+ var messageData = message as object;
+ if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync