diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj
index e1837226..d9b461d8 100644
--- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj
+++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj
@@ -43,7 +43,7 @@
-
+
diff --git a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
index 132ec7a1..06d31262 100644
--- a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
+++ b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
@@ -45,7 +45,7 @@
-
+
diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
index 67605767..77eeac22 100644
--- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
+++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
@@ -35,7 +35,7 @@
-
+
diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj
index 8ceee191..0808cab4 100644
--- a/src/cli/Synapse.Cli/Synapse.Cli.csproj
+++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj
@@ -33,7 +33,7 @@
-
+
diff --git a/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs b/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs
index 41f919bb..6fc558d3 100644
--- a/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs
+++ b/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs
@@ -15,9 +15,8 @@
using Neuroglia.Serialization;
using ServerlessWorkflow.Sdk;
using System.Net;
-using System.Xml.Schema;
using System.Xml;
-using Avro.Generic;
+using System.Xml.Schema;
namespace Synapse.Core.Infrastructure.Services;
diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj
index fedb0392..90a9fdfc 100644
--- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj
+++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj
@@ -51,7 +51,7 @@
-
+
diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj
index e1deacf5..63e4011b 100644
--- a/src/core/Synapse.Core/Synapse.Core.csproj
+++ b/src/core/Synapse.Core/Synapse.Core.csproj
@@ -70,7 +70,7 @@
-
+
diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
index 7046016b..d67a4e3c 100644
--- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
+++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
@@ -42,8 +42,8 @@
-
-
+
+
diff --git a/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs b/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs
index 45d2acd3..027730a6 100644
--- a/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs
+++ b/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs
@@ -226,7 +226,7 @@ protected INodeViewModel BuildTaskNode(TaskNodeRenderingContext context)
protected virtual NodeViewModel BuildCallTaskNode(TaskNodeRenderingContext context)
{
ArgumentNullException.ThrowIfNull(context);
- var content = string.Empty;
+ string content; ;
string callType;
switch (context.TaskDefinition.Call.ToLower())
{
@@ -234,7 +234,7 @@ protected virtual NodeViewModel BuildCallTaskNode(TaskNodeRenderingContext
-
+
diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs
index 292bd663..48662b4a 100644
--- a/src/runner/Synapse.Runner/Program.cs
+++ b/src/runner/Synapse.Runner/Program.cs
@@ -12,6 +12,9 @@
// limitations under the License.
using Moq;
+using Neuroglia.AsyncApi;
+using Neuroglia.AsyncApi.Client;
+using Neuroglia.AsyncApi.Client.Bindings;
using Neuroglia.Serialization.Xml;
using NReco.Logging.File;
using ServerlessWorkflow.Sdk.IO;
@@ -97,6 +100,8 @@
services.AddServerlessWorkflowIO();
services.AddNodeJSScriptExecutor();
services.AddPythonScriptExecutor();
+ services.AddAsyncApi();
+ services.AddAsyncApiClient(options => options.AddAllBindingHandlers());
services.AddSingleton();
services.AddSingleton(provider => provider.GetRequiredService());
services.AddSingleton(provider => provider.GetRequiredService());
diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
new file mode 100644
index 00000000..9f0843f2
--- /dev/null
+++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
@@ -0,0 +1,231 @@
+// Copyright © 2024-Present The Synapse Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"),
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Neuroglia;
+using Neuroglia.AsyncApi;
+using Neuroglia.AsyncApi.Client;
+using Neuroglia.AsyncApi.Client.Services;
+using Neuroglia.AsyncApi.IO;
+using Neuroglia.AsyncApi.v3;
+using Neuroglia.Data.Expressions;
+
+namespace Synapse.Runner.Services.Executors;
+
+///
+/// Represents an used to execute AsyncAPI s using an
+///
+/// The current
+/// The service used to perform logging
+/// The service used to create s
+/// The service used to create s
+/// The current
+/// The service used to provide implementations
+/// The service used to serialize/deserialize objects to/from JSON
+/// The service used to create s
+/// The service used to read s
+/// The service used to create s
+public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory,
+ ITaskExecutionContext context, Core.Infrastructure.Services.ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, IHttpClientFactory httpClientFactory, IAsyncApiDocumentReader asyncApiDocumentReader, IAsyncApiClientFactory asyncApiClientFactory)
+ : TaskExecutor(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer)
+{
+
+ ///
+ /// Gets the service used to create s
+ ///
+ protected IHttpClientFactory HttpClientFactory { get; } = httpClientFactory;
+
+ ///
+ /// Gets the service used to read s
+ ///
+ protected IAsyncApiDocumentReader AsyncApiDocumentReader { get; } = asyncApiDocumentReader;
+
+ ///
+ /// Gets the service used to create s
+ ///
+ protected IAsyncApiClientFactory AsyncApiClientFactory { get; } = asyncApiClientFactory;
+
+ ///
+ /// Gets the definition of the AsyncAPI call to perform
+ ///
+ protected AsyncApiCallDefinition? AsyncApi { get; set; }
+
+ ///
+ /// Gets/sets the that defines the AsyncAPI operation to call
+ ///
+ protected V3AsyncApiDocument? Document { get; set; }
+
+ ///
+ /// Gets the to call
+ ///
+ protected KeyValuePair Operation { get; set; }
+
+ ///
+ /// Gets an object used to describe the credentials, if any, used to authenticate a user agent with the AsyncAPI application
+ ///
+ protected AuthorizationInfo? Authorization { get; set; }
+
+ ///
+ /// Gets/sets the payload, if any, of the message to publish, in case the 's has been set to
+ ///
+ protected object? MessagePayload { get; set; }
+
+ ///
+ /// Gets/sets the headers, if any, of the message to publish, in case the 's has been set to
+ ///
+ protected object? MessageHeaders { get; set; }
+
+ ///
+ protected override async Task DoInitializeAsync(CancellationToken cancellationToken)
+ {
+ this.AsyncApi = (AsyncApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(AsyncApiCallDefinition))!;
+ using var httpClient = this.HttpClientFactory.CreateClient();
+ await httpClient.ConfigureAuthenticationAsync(this.AsyncApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false);
+ var uriString = StringFormatter.NamedFormat(this.AsyncApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary());
+ if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
+ if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The AsyncAPI endpoint URI cannot be null or empty");
+ if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI");
+ using var request = new HttpRequestMessage(HttpMethod.Get, uriString);
+ using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
+ if (!response.IsSuccessStatusCode)
+ {
+ var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
+ this.Logger.LogInformation("Failed to retrieve the AsyncAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", uri, response.StatusCode);
+ this.Logger.LogDebug("Response content:\r\n{responseContent}", responseContent ?? "None");
+ response.EnsureSuccessStatusCode();
+ }
+ using var responseStream = await response.Content!.ReadAsStreamAsync(cancellationToken)!;
+ var document = await this.AsyncApiDocumentReader.ReadAsync(responseStream, cancellationToken).ConfigureAwait(false);
+ if (document is not V3AsyncApiDocument v3Document) throw new NotSupportedException("Synapse only supports AsyncAPI v3.0.0 at the moment");
+ this.Document = v3Document;
+ if (string.IsNullOrWhiteSpace(this.AsyncApi.Operation)) throw new NullReferenceException("The 'operation' parameter must be set when performing an AsyncAPI v3 call");
+ var operationId = this.AsyncApi.Operation;
+ if (operationId.IsRuntimeExpression()) operationId = await this.Task.Workflow.Expressions.EvaluateAsync(operationId, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
+ if (string.IsNullOrWhiteSpace(operationId)) throw new NullReferenceException("The operation ref cannot be null or empty");
+ this.Operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId);
+ if (this.Operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'");
+ if (this.AsyncApi.Authentication != null) this.Authorization = await AuthorizationInfo.CreateAsync(this.AsyncApi.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false);
+ switch (this.Operation.Value.Action)
+ {
+ case V3OperationAction.Receive:
+ await this.BuildMessagePayloadAsync(cancellationToken).ConfigureAwait(false);
+ await this.BuildMessageHeadersAsync(cancellationToken).ConfigureAwait(false);
+ break;
+ case V3OperationAction.Send: break;
+ default: throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported");
+ }
+ }
+
+ ///
+ /// Builds the payload, if any, of the message to publish, in case the 's has been set to
+ ///
+ /// A
+ /// A new awaitable
+ protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancellationToken = default)
+ {
+ if (this.AsyncApi == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
+ if (this.Task.Input == null) this.MessagePayload = new { };
+ if (this.AsyncApi.Message?.Payload == null) return;
+ var arguments = this.GetExpressionEvaluationArguments();
+ if (this.Authorization != null)
+ {
+ arguments ??= new Dictionary();
+ arguments.Add("authorization", this.Authorization);
+ }
+ this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync