Skip to content

Commit b860edb

Browse files
authored
Merge pull request #398 from serverlessworkflow/fix-workflow-completed-event-output
Fix `WorkflowCompletedEventV1` by adding a new `output` property
2 parents c44282e + 1200eac commit b860edb

File tree

21 files changed

+81
-28
lines changed

21 files changed

+81
-28
lines changed

src/api/Synapse.Api.Application/Commands/Resources/CreateResourceCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class CreateResourceCommandHandler(IResourceRepository repository)
7979
/// <inheritdoc/>
8080
public virtual async Task<IOperationResult<IResource>> HandleAsync(CreateResourceCommand command, CancellationToken cancellationToken)
8181
{
82-
if (command.Resource.GetName().Trim().EndsWith('-')) command.Resource.Metadata.Name = $"{command.Resource.GetName().Trim()}{Guid.NewGuid().ToString("N")[..15]}";
82+
if (command.Resource.GetName().Trim().EndsWith('-')) command.Resource.Metadata.Name = $"{command.Resource.GetName().Trim()}{Guid.NewGuid().ToString("N")[..12]}";
8383
var resource = await repository.AddAsync(command.Resource, command.Group, command.Version, command.Plural, command.DryRun, cancellationToken);
8484
return new OperationResult<IResource>((int)HttpStatusCode.Created, resource);
8585
}

src/api/Synapse.Api.Application/Commands/Resources/Generic/CreateResourceCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class CreateResourceCommandHandler<TResource>(IResourceRepository reposit
4646
/// <inheritdoc/>
4747
public virtual async Task<IOperationResult<TResource>> HandleAsync(CreateResourceCommand<TResource> command, CancellationToken cancellationToken)
4848
{
49-
if (command.Resource.GetName().Trim().EndsWith('-')) command.Resource.Metadata.Name = $"{command.Resource.GetName().Trim()}{Guid.NewGuid().ToString("N")[..15]}";
49+
if (command.Resource.GetName().Trim().EndsWith('-')) command.Resource.Metadata.Name = $"{command.Resource.GetName().Trim()}{Guid.NewGuid().ToString("N")[..12]}";
5050
var resource = await repository.AddAsync(command.Resource, false, cancellationToken).ConfigureAwait(false);
5151
return new OperationResult<TResource>((int)HttpStatusCode.Created, resource);
5252
}

src/core/Synapse.Core.Infrastructure.Containers.Docker/DockerContainerPlatform.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public virtual async Task<IContainer> CreateAsync(ContainerProcessDefinition def
9393
}
9494
var parameters = new CreateContainerParameters()
9595
{
96+
Name = $"{System.Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name)}-{definition.Image}-{Guid.NewGuid().ToString("N")[..6].ToLowerInvariant()}",
9697
Image = definition.Image,
9798
Cmd = string.IsNullOrWhiteSpace(definition.Command) ? null : ["/bin/sh", "-c", definition.Command],
9899
Env = definition.Environment?.Select(e => $"{e.Key}={e.Value}").ToList(),

src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/KubernetesContainer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ protected virtual async Task WaitForReadyAsync(CancellationToken cancellationTok
107107
/// <inheritdoc/>
108108
public virtual async Task WaitForExitAsync(CancellationToken cancellationToken = default)
109109
{
110-
var response = this.Kubernetes.CoreV1.ListNamespacedPodWithHttpMessagesAsync(this.Pod.Namespace(), fieldSelector: $"metadata.name={Pod.Name()}", cancellationToken: cancellationToken);
110+
var response = this.Kubernetes.CoreV1.ListNamespacedPodWithHttpMessagesAsync(this.Pod.Namespace(), fieldSelector: $"metadata.name={Pod.Name()}", watch: true, cancellationToken: cancellationToken);
111111
await foreach (var (_, item) in response.WatchAsync<V1Pod, V1PodList>(cancellationToken: cancellationToken).ConfigureAwait(false))
112112
{
113-
if (item.Status.Phase != "Succeeded" || item.Status.Phase != "Failed") continue;
113+
if (item.Status.Phase != "Succeeded" && item.Status.Phase != "Failed") continue;
114114
var containerStatus = item.Status.ContainerStatuses.FirstOrDefault();
115115
this.ExitCode = containerStatus?.State.Terminated?.ExitCode ?? -1;
116116
break;

src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/KubernetesContainerPlatform.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,22 @@ public virtual Task<IContainer> CreateAsync(ContainerProcessDefinition definitio
7777
if (this.Kubernetes == null) throw new NullReferenceException("The KubernetesContainerPlatform has not been properly initialized");
7878
var pod = new V1Pod()
7979
{
80+
Metadata = new()
81+
{
82+
NamespaceProperty = $"{System.Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace)}",
83+
Name = $"{System.Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name)}-{definition.Image}-{Guid.NewGuid().ToString("N")[..6].ToLowerInvariant()}"
84+
},
8085
Spec = new()
8186
{
87+
RestartPolicy = "Never",
8288
Containers =
8389
[
84-
new()
90+
new(definition.Image)
8591
{
8692
Image = definition.Image,
8793
ImagePullPolicy = this.Options.ImagePullPolicy,
8894
Command = string.IsNullOrWhiteSpace(definition.Command) ? null : ["/bin/sh", "-c", definition.Command],
89-
Env = definition.Environment?.Select(e => new V1EnvVar(e.Key, e.Value)).ToList(),
90-
RestartPolicy = "Never"
95+
Env = definition.Environment?.Select(e => new V1EnvVar(e.Key, e.Value)).ToList()
9196
}
9297
]
9398
}

src/core/Synapse.Core/Events/Workflows/WorkflowCompletedEventV1.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,10 @@ public record WorkflowCompletedEventV1
3232
[DataMember(Name = "completedAt", Order = 2), JsonPropertyName("completedAt"), JsonPropertyOrder(2), YamlMember(Alias = "completedAt", Order = 2)]
3333
public DateTimeOffset CompletedAt { get; set; } = DateTimeOffset.Now;
3434

35+
/// <summary>
36+
/// Gets/sets the workflow instance's output
37+
/// </summary>
38+
[DataMember(Name = "output", Order = 3), JsonPropertyName("output"), JsonPropertyOrder(3), YamlMember(Alias = "output", Order = 3)]
39+
public object? Output { get; set; }
40+
3541
}

src/core/Synapse.Core/Resources/Document.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public record Document
2323

2424
/// <inheritdoc/>
2525
[DataMember(Name = "id", Order = 1), JsonPropertyName("id"), JsonPropertyOrder(1), YamlMember(Alias = "id", Order = 1)]
26-
public string Id { get; set; } = Guid.NewGuid().ToString("N")[..15];
26+
public string Id { get; set; } = Guid.NewGuid().ToString("N")[..12];
2727

2828
/// <summary>
2929
/// Gets/sets the document's name

src/core/Synapse.Core/Resources/NativeRuntimeConfiguration.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public record NativeRuntimeConfiguration
2828
/// Gets the default path to the runner executable file
2929
/// </summary>
3030
public const string DefaultExecutable = "Synapse.Runner";
31+
/// <summary>
32+
/// Gets the default path to the directory that contains the secrets made available to runners
33+
/// </summary>
34+
public static readonly string DefaultSecretsDirectory = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile), ".synapse", "secrets");
3135

3236
/// <summary>
3337
/// Initializes a new <see cref="NativeRuntimeConfiguration"/>
@@ -47,6 +51,8 @@ public NativeRuntimeConfiguration()
4751
if (!File.Exists(filePath)) throw new FileNotFoundException("The runner executable file does not exist or cannot be found", filePath);
4852
this.Executable = env;
4953
}
54+
env = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Native.SecretsDirectory);
55+
if (!string.IsNullOrWhiteSpace(env)) this.SecretsDirectory = env;
5056
}
5157

5258
/// <summary>
@@ -61,5 +67,10 @@ public NativeRuntimeConfiguration()
6167
[DataMember(Order = 2, Name = "executable"), JsonPropertyOrder(2), JsonPropertyName("executable"), YamlMember(Order = 2, Alias = "executable")]
6268
public virtual string Executable { get; set; } = DefaultExecutable;
6369

70+
/// <summary>
71+
/// Gets/sets the path to the directory that contains the secrets made available to runners
72+
/// </summary>
73+
[DataMember(Order = 3, Name = "secretsDirectory"), JsonPropertyOrder(3), JsonPropertyName("secretsDirectory"), YamlMember(Order = 3, Alias = "secretsDirectory")]
74+
public virtual string SecretsDirectory { get; set; } = DefaultSecretsDirectory;
6475

6576
}

src/core/Synapse.Core/Resources/TaskInstance.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public record TaskInstance
2424
/// Gets the task's id
2525
/// </summary>
2626
[DataMember(Name = "id", Order = 1), JsonPropertyName("id"), JsonPropertyOrder(1), YamlMember(Alias = "id", Order = 1)]
27-
public virtual string Id { get; set; } = Guid.NewGuid().ToString("N")[..15];
27+
public virtual string Id { get; set; } = Guid.NewGuid().ToString("N")[..12];
2828

2929
/// <summary>
3030
/// Gets the task's name, if any

src/core/Synapse.Core/Synapse.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
<Copyright>Copyright © 2024-Present The Synapse Authors. All Rights Reserved.</Copyright>
1616
<RepositoryUrl>https://github.com/serverlessworkflow/synapse</RepositoryUrl>
1717
<RepositoryType>git</RepositoryType>
18+
<PackageId>Synapse</PackageId>
1819
<PackageProjectUrl>https://github.com/serverlessworkflow/synapse</PackageProjectUrl>
1920
<PackageTags>synapse core</PackageTags>
2021
<IsPackable>true</IsPackable>

src/core/Synapse.Core/SynapseDefaults.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,14 @@ public static class Runner
562562
/// Gets the environment variable used to configure whether or not runners should publish lifecycle events
563563
/// </summary>
564564
public const string LifecycleEvents = Prefix + "LIFECYCLE_EVENTS";
565+
/// <summary>
566+
/// Gets the environment variable used to configure the runner's namespace
567+
/// </summary>
568+
public const string Namespace = Prefix + "NAMESPACE";
569+
/// <summary>
570+
/// Gets the environment variable used to configure the runner's name
571+
/// </summary>
572+
public const string Name = Prefix + "NAME";
565573

566574
}
567575

@@ -740,6 +748,10 @@ public static class Native
740748
/// Gets the environment variable used to configure the path to the runner's executable file
741749
/// </summary>
742750
public const string Executable = Prefix + "EXECUTABLE";
751+
/// <summary>
752+
/// Gets the environment variable used to configure the directory that contains the secrets made available to runners
753+
/// </summary>
754+
public const string SecretsDirectory = Prefix + "SECRETS_DIRECTORY";
743755

744756
}
745757

src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
121121
this.Logger.LogInformation("Creating a new correlation context...");
122122
context = new CorrelationContext()
123123
{
124-
Id = Guid.NewGuid().ToString("N")[..15],
124+
Id = Guid.NewGuid().ToString("N")[..12],
125125
Events = [new(filter.Key, e)],
126126
Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
127127
};
@@ -150,7 +150,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
150150
this.Logger.LogInformation("Creating a new correlation context...");
151151
context = new CorrelationContext()
152152
{
153-
Id = Guid.NewGuid().ToString("N")[..15],
153+
Id = Guid.NewGuid().ToString("N")[..12],
154154
Events = [new(filter.Key, e)],
155155
Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
156156
};

src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ namespace Synapse.Runner.Services.Executors;
2424
/// <param name="context">The current <see cref="ITaskExecutionContext"/></param>
2525
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
2626
/// <param name="serializer">The service used to serialize/deserialize objects to/from JSON</param>
27-
public class ContainerProcessExecutor(IServiceProvider serviceProvider, ILogger<ContainerProcessExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, IContainerPlatform containers, ITaskExecutionContext<RunTaskDefinition> context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer)
27+
/// <param name="options">The service used to access the current <see cref="RunnerOptions"/></param>
28+
public class ContainerProcessExecutor(IServiceProvider serviceProvider, ILogger<ContainerProcessExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory,
29+
IContainerPlatform containers, ITaskExecutionContext<RunTaskDefinition> context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, IOptions<RunnerOptions> options)
2830
: TaskExecutor<RunTaskDefinition>(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer)
2931
{
3032

@@ -38,6 +40,11 @@ public class ContainerProcessExecutor(IServiceProvider serviceProvider, ILogger<
3840
/// </summary>
3941
protected ContainerProcessDefinition ProcessDefinition => this.Task.Definition.Run.Container!;
4042

43+
/// <summary>
44+
/// Gets the current <see cref="RunnerOptions"/>
45+
/// </summary>
46+
protected RunnerOptions Options { get; } = options.Value;
47+
4148
/// <summary>
4249
/// Gets the <see cref="IContainer"/> to run
4350
/// </summary>
@@ -56,7 +63,8 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
5663
{
5764
await this.Container!.StartAsync(cancellationToken).ConfigureAwait(false);
5865
await this.Container.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
59-
var standardOutput = (this.Container.StandardOutput == null ? null : await this.Container.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false))?.Trim()[8..];
66+
var standardOutput = (this.Container.StandardOutput == null ? null : await this.Container.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false))?.Trim();
67+
if (this.Options.Containers.Platform == ContainerPlatform.Docker) standardOutput = standardOutput?[8..];
6068
var standardError = (this.Container.StandardError == null ? null : await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false))?.Trim();
6169
var result = standardOutput; //todo: do something with return data encoding (ex: plain-text, json);
6270
await this.SetResultAsync(result, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);

src/runner/Synapse.Runner/Services/SecretsManager.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
5151
try
5252
{
5353
var path = string.IsNullOrWhiteSpace(this.Options.Secrets.Directory)
54-
? RunnerSecretsOptions.DefaultDirectory
55-
: this.Options.Secrets.Directory;
54+
? RunnerSecretsOptions.DefaultDirectory
55+
: this.Options.Secrets.Directory;
5656
var directory = new DirectoryInfo(path);
5757
if (!directory.Exists) directory.Create();
5858
foreach (var file in directory.GetFiles())
@@ -72,14 +72,14 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
7272
}
7373
catch (Exception ex)
7474
{
75-
this.Logger.LogWarning("Skipped loading secret '{secretFile}': an exception occurred while deserializing the secret object: {ex}", file.Name, ex.ToString());
75+
this.Logger.LogWarning("Skipped loading secret '{secretFile}': an exception occurred while deserializing the secret object: {ex}", file.Name, ex.Message);
7676
continue;
7777
}
7878
}
7979
}
8080
catch(Exception ex)
8181
{
82-
this.Logger.LogWarning("Failed to load secrets because there are none or because they are improperly configured. Error: {ex}", ex);
82+
this.Logger.LogWarning("Failed to load secrets because there are none or because they are improperly configured. Error: {ex}", ex.Message);
8383
}
8484
return Task.CompletedTask;
8585
}

src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,8 @@ await this.Api.Events.PublishAsync(new CloudEvent()
577577
Data = new WorkflowCompletedEventV1()
578578
{
579579
Name = this.Instance.GetQualifiedName(),
580-
CompletedAt = run?.EndedAt ?? DateTimeOffset.Now
580+
CompletedAt = run?.EndedAt ?? DateTimeOffset.Now,
581+
Output = this.Output
581582
}
582583
}, cancellationToken).ConfigureAwait(false);
583584
await this.Api.Events.PublishAsync(new CloudEvent()

src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
7878
this.Logger.LogDebug("Creating a new Docker container for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName());
7979
if (this.Docker == null) await this.InitializeAsync(cancellationToken).ConfigureAwait(false);
8080
var container = this.Runner.Runtime.Docker!.ContainerTemplate.Clone()!;
81+
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace, workflowInstance.GetNamespace()!);
82+
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name, $"{workflowInstance.GetName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}");
8183
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Runner.Api.Uri.OriginalString);
8284
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.Runner.ContainerPlatform);
8385
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.Runner.PublishLifecycleEvents ?? true).ToString());
@@ -111,7 +113,7 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
111113
];
112114
var parameters = new CreateContainerParameters(container)
113115
{
114-
Name = $"{workflowInstance.GetQualifiedName()}-{Guid.NewGuid().ToString("N")[..15].ToLowerInvariant()}",
116+
Name = $"{workflowInstance.GetQualifiedName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}",
115117
HostConfig = hostConfig
116118
};
117119
var result = await this.Docker!.Containers.CreateContainerAsync(parameters, cancellationToken).ConfigureAwait(false);

src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
8787
var workflowDefinition = workflow.Spec.Versions.Get(workflowInstance.Spec.Definition.Version) ?? throw new NullReferenceException($"Failed to find version '{workflowInstance.Spec.Definition.Version}' of workflow '{workflow.GetQualifiedName()}'");
8888
var pod = this.Runner.Runtime.Kubernetes!.PodTemplate.Clone()!;
8989
pod.Metadata ??= new();
90-
pod.Metadata.Name = $"{workflowInstance.GetQualifiedName()}-{Guid.NewGuid().ToString("N")[..15].ToLowerInvariant()}";
90+
pod.Metadata.Name = $"{workflowInstance.GetQualifiedName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}";
9191
if (!string.IsNullOrWhiteSpace(this.Runner.Runtime.Kubernetes.Namespace)) pod.Metadata.NamespaceProperty = this.Runner.Runtime.Kubernetes.Namespace;
9292
if (pod.Spec == null || pod.Spec.Containers == null || !pod.Spec.Containers.Any()) throw new InvalidOperationException("The specified Kubernetes runtime pod template is not valid");
9393
var volumeMounts = new List<V1VolumeMount>();
@@ -119,6 +119,8 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
119119
foreach (var container in pod.Spec.Containers)
120120
{
121121
container.Env ??= [];
122+
container.Env.Add(new(SynapseDefaults.EnvironmentVariables.Runner.Namespace, valueFrom: new(fieldRef: new("metadata.namespace"))));
123+
container.Env.Add(new(SynapseDefaults.EnvironmentVariables.Runner.Name, valueFrom: new(fieldRef: new("metadata.name"))));
122124
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Runner.Api.Uri.OriginalString);
123125
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.Runner.ContainerPlatform);
124126
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.Runner.PublishLifecycleEvents ?? true).ToString());

0 commit comments

Comments
 (0)