diff --git a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj index 698aa3c87..5a56b1552 100644 --- a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj +++ b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj @@ -44,7 +44,7 @@ - + diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj index d9b461d81..b0083eb23 100644 --- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj +++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj @@ -42,8 +42,8 @@ - - + + diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj index 77eeac22f..093aad1a3 100644 --- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj +++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj @@ -31,9 +31,9 @@ - - - + + + diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj index 0808cab49..70079e456 100644 --- a/src/cli/Synapse.Cli/Synapse.Cli.csproj +++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj @@ -29,11 +29,11 @@ - - + + - + diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj index 90a9fdfcd..a2ee6abe0 100644 --- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj +++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj @@ -51,7 +51,7 @@ - + diff --git a/src/core/Synapse.Core/Resources/CorrelationContext.cs b/src/core/Synapse.Core/Resources/CorrelationContext.cs index a46ba1a56..ccd3d732a 100644 --- a/src/core/Synapse.Core/Resources/CorrelationContext.cs +++ b/src/core/Synapse.Core/Resources/CorrelationContext.cs @@ -37,13 +37,19 @@ public record CorrelationContext /// /// Gets a key/value mapping of the context's correlation keys /// - [DataMember(Name = "keys", Order = 2), JsonPropertyName("keys"), JsonPropertyOrder(2), YamlMember(Alias = "keys", Order = 2)] + [DataMember(Name = "keys", Order = 3), JsonPropertyName("keys"), JsonPropertyOrder(3), YamlMember(Alias = "keys", Order = 3)] public virtual EquatableDictionary Keys { get; set; } = []; /// /// Gets a key/value mapping of all correlated events, with the key being the index of the matched correlation filter /// - [DataMember(Name = "events", Order = 3), JsonPropertyName("events"), JsonPropertyOrder(3), YamlMember(Alias = "events", Order = 3)] + [DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)] public virtual EquatableDictionary Events { get; set; } = []; + /// + /// Gets the offset that serves as the index of the event being processed by the consumer, if streaming has been enabled for the correlation associated with the context. + /// + [DataMember(Name = "offset", Order = 5), JsonPropertyName("offset"), JsonPropertyOrder(5), YamlMember(Alias = "offset", Order = 5)] + public virtual uint? Offset { get; set; } + } \ No newline at end of file diff --git a/src/core/Synapse.Core/Resources/CorrelationSpec.cs b/src/core/Synapse.Core/Resources/CorrelationSpec.cs index fe56f5063..544ce215f 100644 --- a/src/core/Synapse.Core/Resources/CorrelationSpec.cs +++ b/src/core/Synapse.Core/Resources/CorrelationSpec.cs @@ -46,10 +46,16 @@ public record CorrelationSpec [DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)] public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!; + /// + /// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete + /// + [DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)] + public virtual bool Stream { get; set; } + /// /// Gets/sets an object used to configure the correlation's outcome /// - [DataMember(Name = "outcome", Order = 5), JsonPropertyName("outcome"), JsonPropertyOrder(5), YamlMember(Alias = "outcome", Order = 5)] + [DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)] public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!; } diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index 63e4011b5..76c0e3b63 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -66,11 +66,11 @@ - + - + diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs index 2a445a71a..852608349 100644 --- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs +++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs @@ -334,7 +334,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte { var index = updatedResource.Status.Contexts.IndexOf(existingContext); updatedResource.Status.Contexts.Remove(existingContext); - if (!completed) updatedResource.Status.Contexts.Insert(index, context); + updatedResource.Status.Contexts.Insert(index, context); } if (completed) { diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj index d67a4e3cf..af3a358d7 100644 --- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj +++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj @@ -33,8 +33,8 @@ - - + + diff --git a/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj b/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj index 4bcda49da..285011f3a 100644 --- a/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj +++ b/src/dashboard/Synapse.Dashboard.StateManagement/Synapse.Dashboard.StateManagement.csproj @@ -9,9 +9,9 @@ - - - + + + diff --git a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj index 158649aba..4559655e9 100644 --- a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj +++ b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj @@ -13,9 +13,9 @@ - - - + + + diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index 60c97a5fa..a70c4c474 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -156,7 +156,7 @@ protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowIn } catch(Exception ex) { - this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex); + this.Logger.LogError("An error occurred while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex); } } diff --git a/src/operator/Synapse.Operator/Synapse.Operator.csproj b/src/operator/Synapse.Operator/Synapse.Operator.csproj index 0374b6e36..e11fe1559 100644 --- a/src/operator/Synapse.Operator/Synapse.Operator.csproj +++ b/src/operator/Synapse.Operator/Synapse.Operator.csproj @@ -51,8 +51,8 @@ - - + + diff --git a/src/runner/Synapse.Runner/IStreamedCloudEvent.cs b/src/runner/Synapse.Runner/IStreamedCloudEvent.cs new file mode 100644 index 000000000..7ad3f56de --- /dev/null +++ b/src/runner/Synapse.Runner/IStreamedCloudEvent.cs @@ -0,0 +1,39 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Runner; + +/// +/// Defines the fundamentals of an object used to wrap a streamed +/// +public interface IStreamedCloudEvent +{ + + /// + /// Gets the streamed + /// + CloudEvent Event { get; } + + /// + /// Gets the position of the within its originating stream + /// + uint Offset { get; } + + /// + /// Acknowledges that the has been successfully processed + /// + /// A + /// A new awaitable + Task AckAsync(CancellationToken cancellationToken = default); + +} diff --git a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs index 501b7c4ce..adb47df53 100644 --- a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs @@ -20,6 +20,8 @@ using Synapse.Events.Tasks; using Synapse.Events.Workflows; using System.Net.Mime; +using System.Reactive.Disposables; +using System.Reactive.Threading.Tasks; namespace Synapse.Runner.Services; @@ -304,11 +306,125 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa this.Logger.LogInformation("The workflow's execution has been resumed."); } + /// + public virtual async Task> StreamAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(task); + if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task)); + if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task doesn't use streaming, the {nameof(CorrelateAsync)} method must be used instead"); + if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return Observable.Empty(); + var @namespace = task.Workflow.Instance.GetNamespace()!; + var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}"; + Correlation? correlation = null; + try { correlation = await this.Api.Correlations.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false); } + catch { } + if (correlation == null) + { + correlation = await this.Api.Correlations.CreateAsync(new() + { + Metadata = new() + { + Namespace = @namespace, + Name = name, + Labels = new Dictionary() + { + { SynapseDefaults.Resources.Labels.WorkflowInstance, this.Instance.GetQualifiedName() } + } + }, + Spec = new() + { + Source = new ResourceReference(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()), + Lifetime = CorrelationLifetime.Ephemeral, + Events = listenTask.Listen.To, + Stream = true, + Expressions = task.Workflow.Definition.Evaluate ?? new(), + Outcome = new() + { + Correlate = new() + { + Instance = task.Workflow.Instance.GetQualifiedName(), + Task = task.Instance.Reference.OriginalString + } + } + } + }, cancellationToken).ConfigureAwait(false); + } + var taskCompletionSource = new TaskCompletionSource(); + var cancellationTokenRegistration = cancellationToken.Register(() => taskCompletionSource.TrySetCanceled()); + var correlationSubscription = this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken) + .ToObservable() + .Where(e => e.Type == ResourceWatchEventType.Updated) + .Select(e => e.Resource.Status?.Correlation?.Contexts) + .Scan((Previous: (EquatableDictionary?)null, Current: (EquatableDictionary?)null), (accumulator, current) => (accumulator.Current ?? [], current)) + .Where(v => v.Current?.Count > v.Previous?.Count) //ensures we are not handling changes in a circular loop: if length of current is smaller than previous, it means a context has been processed + .Subscribe(value => + { + var patch = JsonPatchUtility.CreateJsonPatchFromDiff(value.Previous, value.Current); + var patchOperation = patch.Operations.FirstOrDefault(o => o.Op == OperationType.Add && o.Path[0] == task.Instance.Reference.OriginalString); + if (patchOperation == null) return; + context = this.JsonSerializer.Deserialize(patchOperation.Value!)!; + taskCompletionSource.SetResult(context); + }); + var endOfStream = false; + var stopObservable = taskCompletionSource.Task.ToObservable(); + var stopSubscription = stopObservable.Take(1).Subscribe(_ => endOfStream = true); + return Observable.Create(observer => + { + var subscription = Observable.Using( + () => new CompositeDisposable + { + cancellationTokenRegistration, + correlationSubscription + }, + disposable => this.Api.Correlations.MonitorAsync(correlation.GetName(), correlation.GetNamespace()!, cancellationToken) + .ToObservable() + .Where(e => e.Type == ResourceWatchEventType.Updated) + .Select(e => e.Resource.Status?.Contexts?.FirstOrDefault()) + .Where(c => c != null) + .SelectMany(c => + { + var acknowledgedOffset = c!.Offset.HasValue ? (int)c.Offset.Value : 0; + return c.Events.Values + .Skip(acknowledgedOffset) + .Select((evt, index) => new + { + ContextId = c.Id, + Event = evt, + Offset = (uint)(acknowledgedOffset + index + 1) + }); + }) + .Distinct(e => e.Offset) + .Select(e => new StreamedCloudEvent(e.Event, e.Offset, async (offset, token) => + { + var original = await this.Api.Correlations.GetAsync(name, @namespace, token).ConfigureAwait(false); + var updated = original.Clone()!; + var context = updated.Status?.Contexts.FirstOrDefault(c => c.Id == e.ContextId); + if (context == null) + { + this.Logger.LogError("Failed to find a context with the specified id '{contextId}' in correlation '{name}.{@namespace}'", e.ContextId, name, @namespace); + throw new Exception($"Failed to find a context with the specified id '{e.ContextId}' in correlation '{name}.{@namespace}'"); + } + context.Offset = offset; + var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); + await this.Api.Correlations.PatchStatusAsync(name, @namespace, new Patch(PatchType.JsonPatch, patch), cancellationToken: token).ConfigureAwait(false); + }))) + .Subscribe(e => + { + observer.OnNext(e); + if (endOfStream) observer.OnCompleted(); + }, + ex => observer.OnError(ex), + () => observer.OnCompleted()); + return new CompositeDisposable(subscription, stopSubscription); + }); + } + /// public virtual async Task CorrelateAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(task); if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task)); + if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead"); if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return context; var @namespace = task.Workflow.Instance.GetNamespace()!; var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}"; diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs index 9f0843f2d..3159cca90 100644 --- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs @@ -84,6 +84,36 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger protected object? MessageHeaders { get; set; } + /// + /// Gets the 's subscription + /// + protected IDisposable? Subscription { get; set; } + + /// + /// Gets/sets the position of the within its originating stream + /// + protected uint? Offset { get; set; } + + /// + /// Gets the path for the specified message + /// + /// The offset of the message to get the path for + /// The path for the specified message + protected virtual string GetPathFor(uint offset) => $"{nameof(AsyncApiSubscriptionDefinition.Foreach).ToCamelCase()}/{offset}/{nameof(ForTaskDefinition.Do).ToCamelCase()}"; + + /// + protected override async Task CreateTaskExecutorAsync(TaskInstance task, TaskDefinition definition, IDictionary contextData, IDictionary? arguments = null, CancellationToken cancellationToken = default) + { + var executor = await base.CreateTaskExecutorAsync(task, definition, contextData, arguments, cancellationToken).ConfigureAwait(false); + executor.SubscribeAsync + ( + _ => System.Threading.Tasks.Task.CompletedTask, + async ex => await this.OnMessageProcessingErrorAsync(executor, this.CancellationTokenSource!.Token).ConfigureAwait(false), + async () => await this.OnMessageProcessingCompletedAsync(executor, this.CancellationTokenSource!.Token).ConfigureAwait(false) + ); + return executor; + } + /// protected override async Task DoInitializeAsync(CancellationToken cancellationToken) { @@ -224,8 +254,127 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)); return (message, keepGoing); })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); - var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false); - await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + if (this.AsyncApi.Subscription.Foreach == null) + { + var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false); + await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + } + else + { + this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync); + } + } + + /// + /// Handles the streaming of a + /// + /// The streamed + /// A new awaitable + protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation"); + if (this.AsyncApi.Subscription.Foreach?.Do != null) + { + var taskDefinition = new DoTaskDefinition() + { + Do = this.AsyncApi.Subscription.Foreach.Do, + Metadata = + [ + new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false) + ] + }; + var arguments = this.GetExpressionEvaluationArguments(); + var messageData = message as object; + if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(fromExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Subscription.Foreach.Output.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + if (this.AsyncApi.Subscription.Foreach.Export?.As is string toExpression) + { + var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(toExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; + await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + else if (this.AsyncApi.Subscription.Foreach.Export?.As != null) + { + var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(this.AsyncApi.Subscription.Foreach.Export.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; + await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + var offset = this.Offset ?? 0; + if (!this.Offset.HasValue) this.Offset = 0; + arguments ??= new Dictionary(); + arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!; + arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset; + var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, this.GetPathFor(offset), this.Task.Input, null, this.Task, false, this.CancellationTokenSource!.Token).ConfigureAwait(false); + var taskExecutor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + await taskExecutor.ExecuteAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + this.Offset++; + } + } + + /// + /// Handles an that occurred while streaming messages + /// + /// The to handle + /// A new awaitable + protected virtual Task OnStreamingErrorAsync(Exception ex) => this.SetErrorAsync(new Error() + { + Type = ServerlessWorkflow.Sdk.ErrorType.Communication, + Title = ServerlessWorkflow.Sdk.ErrorTitle.Communication, + Status = ServerlessWorkflow.Sdk.ErrorStatus.Communication, + Detail = ex.Message, + Instance = this.Task.Instance.Reference + }, this.CancellationTokenSource!.Token); + + /// + /// Handles the completion of the message streaming + /// + /// A new awaitable + protected virtual async Task OnStreamingCompletedAsync() + { + var last = await this.Task.GetSubTasksAsync(this.CancellationTokenSource!.Token).OrderBy(t => t.CreatedAt).LastOrDefaultAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + var output = (object?)null; + if (last != null && !string.IsNullOrWhiteSpace(last.OutputReference)) output = (await this.Task.Workflow.Documents.GetAsync(last.OutputReference, this.CancellationTokenSource.Token).ConfigureAwait(false)).Content; + await this.SetResultAsync(output, this.Task.Definition.Then, this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + + /// + /// Handles an that occurred while processing a streamed message + /// + /// The service used to execute the message processing that has faulted + /// A + /// A new awaitable + protected virtual async Task OnMessageProcessingErrorAsync(ITaskExecutor executor, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(executor); + var error = executor.Task.Instance.Error ?? throw new NullReferenceException(); + this.Executors.Remove(executor); + await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false); + } + + /// + /// Handles the completion of a message's processing + /// + /// The service used to execute the message processing that has completed + /// A + /// A new awaitable + protected virtual async Task OnMessageProcessingCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(executor); + this.Executors.Remove(executor); + if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false); + } + + /// + protected override ValueTask DisposeAsync(bool disposing) + { + if (disposing) this.Subscription?.Dispose(); + return base.DisposeAsync(disposing); + } + + /// + protected override void Dispose(bool disposing) + { + if (disposing) this.Subscription?.Dispose(); + base.Dispose(disposing); } } \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs index 8520c263d..5c6f617fe 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs @@ -11,6 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia; +using Neuroglia.Data.Expressions; + namespace Synapse.Runner.Services.Executors; /// @@ -33,11 +36,165 @@ public class ListenTaskExecutor(IServiceProvider serviceProvider, ILogger protected IDisposable? Subscription { get; set; } + /// + /// Gets the path for the specified event + /// + /// The offset of the event to get the path for + /// The path for the specified event + protected virtual string GetPathFor(uint offset) => $"{nameof(ListenTaskDefinition.Foreach).ToCamelCase()}/{offset - 1}/{nameof(ForTaskDefinition.Do).ToCamelCase()}"; + + /// + protected override async Task CreateTaskExecutorAsync(TaskInstance task, TaskDefinition definition, IDictionary contextData, IDictionary? arguments = null, CancellationToken cancellationToken = default) + { + var executor = await base.CreateTaskExecutorAsync(task, definition, contextData, arguments, cancellationToken).ConfigureAwait(false); + executor.SubscribeAsync + ( + _ => System.Threading.Tasks.Task.CompletedTask, + async ex => await this.OnEventProcessingErrorAsync(executor, this.CancellationTokenSource!.Token).ConfigureAwait(false), + async () => await this.OnEventProcessingCompletedAsync(executor, this.CancellationTokenSource!.Token).ConfigureAwait(false) + ); + return executor; + } + /// protected override async Task DoExecuteAsync(CancellationToken cancellationToken) { - var context = await this.Task.CorrelateAsync(cancellationToken).ConfigureAwait(false); - await this.SetResultAsync(context, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + if (this.Task.Definition.Foreach == null) + { + var context = await this.Task.CorrelateAsync(cancellationToken).ConfigureAwait(false); + await this.SetResultAsync(context, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + } + else + { + if(this.Task.Definition.Foreach.Do != null) + { + var task = await this.Task.GetSubTasksAsync(cancellationToken).OrderBy(t => t.CreatedAt).LastOrDefaultAsync(cancellationToken).ConfigureAwait(false); + if (task != null && task.IsOperative) + { + var taskDefinition = new DoTaskDefinition() + { + Do = this.Task.Definition.Foreach.Do, + Metadata = + [ + new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false) + ] + }; + var arguments = this.GetExpressionEvaluationArguments(); + var taskExecutor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + await taskExecutor.ExecuteAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + } + var events = await this.Task.StreamAsync(cancellationToken).ConfigureAwait(false); + this.Subscription = events.SubscribeAsync(this.OnStreamingEventAsync, this.OnStreamingErrorAsync, this.OnStreamingCompletedAsync); + } + } + + /// + /// Handles the streaming of a + /// + /// The streamed + /// A new awaitable + protected virtual async Task OnStreamingEventAsync(IStreamedCloudEvent e) + { + if (this.Task.Definition.Foreach!.Do != null) + { + var taskDefinition = new DoTaskDefinition() + { + Do = this.Task.Definition.Foreach.Do, + Metadata = + [ + new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false) + ] + }; + var arguments = this.GetExpressionEvaluationArguments(); + var eventData = e.Event as object; + if (this.Task.Definition.Foreach.Output?.As is string fromExpression) eventData = await this.Task.Workflow.Expressions.EvaluateAsync(fromExpression, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + else if (this.Task.Definition.Foreach.Output?.As != null) eventData = await this.Task.Workflow.Expressions.EvaluateAsync(this.Task.Definition.Foreach.Output.As!, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + if (this.Task.Definition.Foreach.Export?.As is string toExpression) + { + var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(toExpression, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; + await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + else if (this.Task.Definition.Foreach.Export?.As != null) + { + var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(this.Task.Definition.Foreach.Export.As!, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; + await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + arguments ??= new Dictionary(); + arguments[this.Task.Definition.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = eventData!; + arguments[this.Task.Definition.Foreach.At ?? RuntimeExpressions.Arguments.Index] = e.Offset - 1; + var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, this.GetPathFor(e.Offset), this.Task.Input, null, this.Task, false, this.CancellationTokenSource!.Token).ConfigureAwait(false); + var taskExecutor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + await taskExecutor.ExecuteAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + await e.AckAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + + /// + /// Handles an that occurred while streaming events + /// + /// The to handle + /// A new awaitable + protected virtual Task OnStreamingErrorAsync(Exception ex) => this.SetErrorAsync(new Error() + { + Type = ErrorType.Communication, + Title = ErrorTitle.Communication, + Status = (ushort)ErrorStatus.Communication, + Detail = ex.Message, + Instance = this.Task.Instance.Reference + }, this.CancellationTokenSource!.Token); + + /// + /// Handles the completion of the event streaming + /// + /// A new awaitable + protected virtual async Task OnStreamingCompletedAsync() + { + var last = await this.Task.GetSubTasksAsync(this.CancellationTokenSource!.Token).OrderBy(t => t.CreatedAt).LastOrDefaultAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + var output = (object?)null; + if (last != null && !string.IsNullOrWhiteSpace(last.OutputReference)) output = (await this.Task.Workflow.Documents.GetAsync(last.OutputReference, this.CancellationTokenSource.Token).ConfigureAwait(false)).Content; + await this.SetResultAsync(output, this.Task.Definition.Then, this.CancellationTokenSource!.Token).ConfigureAwait(false); + } + + /// + /// Handles an that occurred while processing a streamed event + /// + /// The service used to execute the event processing that has faulted + /// A + /// A new awaitable + protected virtual async Task OnEventProcessingErrorAsync(ITaskExecutor executor, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(executor); + var error = executor.Task.Instance.Error ?? throw new NullReferenceException(); + this.Executors.Remove(executor); + await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false); + } + + /// + /// Handles the completion of an event's processing + /// + /// The service used to execute the event processing that has completed + /// A + /// A new awaitable + protected virtual async Task OnEventProcessingCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(executor); + this.Executors.Remove(executor); + if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false); + } + + /// + protected override ValueTask DisposeAsync(bool disposing) + { + if (disposing) this.Subscription?.Dispose(); + return base.DisposeAsync(disposing); + } + + /// + protected override void Dispose(bool disposing) + { + if (disposing) this.Subscription?.Dispose(); + base.Dispose(disposing); } } diff --git a/src/runner/Synapse.Runner/Services/Interfaces/ITaskExecutionContext.cs b/src/runner/Synapse.Runner/Services/Interfaces/ITaskExecutionContext.cs index ad21051a3..ad73026ab 100644 --- a/src/runner/Synapse.Runner/Services/Interfaces/ITaskExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/Interfaces/ITaskExecutionContext.cs @@ -68,6 +68,13 @@ public interface ITaskExecutionContext /// A new awaitable Task ExecuteAsync(CancellationToken cancellationToken = default); + /// + /// Streams events + /// + /// A + /// The resulting + Task> StreamAsync(CancellationToken cancellationToken = default); + /// /// Begins correlating events /// diff --git a/src/runner/Synapse.Runner/Services/Interfaces/IWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/Interfaces/IWorkflowExecutionContext.cs index a41c55386..6942b413e 100644 --- a/src/runner/Synapse.Runner/Services/Interfaces/IWorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/Interfaces/IWorkflowExecutionContext.cs @@ -137,6 +137,14 @@ public interface IWorkflowExecutionContext /// A new awaitable Task ResumeAsync(CancellationToken cancellationToken = default); + /// + /// Streams the events defined by the specified task + /// + /// The execution of the task to stream events for + /// A + /// A new used to stream events + Task> StreamAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default); + /// /// Begins correlating the events defined by the specified task /// diff --git a/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs index 39e5b7cc9..aaa555249 100644 --- a/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/StandAloneWorkflowExecutionContext.cs @@ -287,6 +287,9 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa this.Logger.LogInformation("The workflow's execution has been resumed."); } + /// + public virtual Task> StreamAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default) => throw new NotSupportedException("Event streaming is not supported in stand-alone execution mode"); + /// public virtual Task CorrelateAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default) => throw new NotSupportedException("Event correlation is not supported in stand-alone execution mode"); diff --git a/src/runner/Synapse.Runner/Services/TaskExecutionContext.cs b/src/runner/Synapse.Runner/Services/TaskExecutionContext.cs index 169509fac..bc20b1f71 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutionContext.cs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - namespace Synapse.Runner.Services; /// @@ -68,6 +67,9 @@ public virtual async Task ExecuteAsync(CancellationToken cancellationToken = def this.Instance = await this.Workflow.StartAsync(this.Instance, cancellationToken).ConfigureAwait(false); } + /// + public virtual Task> StreamAsync(CancellationToken cancellationToken) => this.Workflow.StreamAsync(this, cancellationToken); + /// public virtual Task CorrelateAsync(CancellationToken cancellationToken = default) => this.Workflow.CorrelateAsync(this, cancellationToken); diff --git a/src/runner/Synapse.Runner/StreamedCloudEvent.cs b/src/runner/Synapse.Runner/StreamedCloudEvent.cs new file mode 100644 index 000000000..69fd5e849 --- /dev/null +++ b/src/runner/Synapse.Runner/StreamedCloudEvent.cs @@ -0,0 +1,40 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Synapse.Runner; + +/// +/// Represents the default implementation of the interface +/// +/// The streamed +/// The position of the within its originating stream +/// A delegate used to ack that the has been successfully processed +public class StreamedCloudEvent(CloudEvent e, uint offset, Func ackDelegate) + : IStreamedCloudEvent +{ + + /// + public virtual CloudEvent Event { get; } = e; + + /// + public virtual uint Offset { get; } = offset; + + /// + /// Gets a delegate used to ack that the has been successfully processed + /// + protected Func AckDelegate { get; } = ackDelegate; + + /// + public virtual Task AckAsync(CancellationToken cancellationToken = default) => this.AckDelegate(this.Offset, cancellationToken); + +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index 176f1413e..31b8025af 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -54,9 +54,9 @@ - - - + + + diff --git a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/NamespacesControllerTests.cs b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/NamespacesControllerTests.cs index a84673e53..968cf8bda 100644 --- a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/NamespacesControllerTests.cs +++ b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/NamespacesControllerTests.cs @@ -36,7 +36,7 @@ public async Task Create_Namespace_Should_Work() var response = await client.PostAsJsonAsync(this.Path, resource); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); (await response.Content.ReadFromJsonAsync()).Should().NotBeNull(); } @@ -84,7 +84,7 @@ public async Task Delete_Namespace_Should_Work() var response = await client.DeleteAsync($"{this.Path}/{resource.GetName()}"); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); (await response.Content.ReadFromJsonAsync()).Should().NotBeNull(); } diff --git a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowDataControllerTests.cs b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowDataControllerTests.cs index 857e0c3e9..26d8037d9 100644 --- a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowDataControllerTests.cs +++ b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowDataControllerTests.cs @@ -43,7 +43,7 @@ public async Task Create_Data_Document_Should_Work() var result = await response.Content.ReadFromJsonAsync(); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); //result.Should().BeEquivalentTo(document); } diff --git a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowInstancesControllerTests.cs b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowInstancesControllerTests.cs index f9a8cf70c..6ea3cb999 100644 --- a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowInstancesControllerTests.cs +++ b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowInstancesControllerTests.cs @@ -37,7 +37,7 @@ public async Task Create_WorkflowInstance_Should_Work() var response = await client.PostAsJsonAsync(this.Path, resource); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); (await response.Content.ReadFromJsonAsync()).Should().NotBeNull(); } @@ -88,7 +88,7 @@ public async Task Delete_WorkflowInstance_Should_Work() var response = await client.DeleteAsync($"{this.Path}/{resource.GetNamespace()}/{resource.GetName()}"); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); (await response.Content.ReadFromJsonAsync()).Should().NotBeNull(); } diff --git a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowsControllerTests.cs b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowsControllerTests.cs index 93d7f1bd3..84db2eb31 100644 --- a/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowsControllerTests.cs +++ b/tests/Synapse.IntegrationTests/Cases/Api/Controllers/WorkflowsControllerTests.cs @@ -37,7 +37,7 @@ public async Task Create_Workflow_Should_Work() var response = await client.PostAsJsonAsync(this.Path, resource); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); (await response.Content.ReadFromJsonAsync()).Should().NotBeNull(); } @@ -88,7 +88,7 @@ public async Task Delete_Workflow_Should_Work() var response = await client.DeleteAsync($"{this.Path}/{resource.GetNamespace()}/{resource.GetName()}"); //assert - response.Should().BeSuccessful(); + response.IsSuccessStatusCode.Should().BeTrue(); (await response.Content.ReadFromJsonAsync()).Should().NotBeNull(); } diff --git a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj index 153c360c1..8d28aa017 100644 --- a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj +++ b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj @@ -14,10 +14,10 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + - + diff --git a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj index 7006de666..33ad199ea 100644 --- a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj +++ b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj @@ -14,16 +14,16 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + - - + +