Skip to content

Commit dc7eb06

Browse files
committed
fix(Runner): Fixed TaskExecutor implementations to not dispose of child executors
fix(Runner): Fixed the TaskExecutor to catch uncaught initialization and execution exceptions, thus enabling faulting the processed task Signed-off-by: Charles d'Avernas <charles.davernas@neuroglia.io>
1 parent 1674e65 commit dc7eb06

File tree

7 files changed

+52
-18
lines changed

7 files changed

+52
-18
lines changed

src/core/Synapse.Core/SynapseDefaults.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
using Synapse.Resources;
1514
using Neuroglia.Data.Infrastructure.ResourceOriented;
15+
using Synapse.Resources;
1616
using System.Diagnostics;
1717
using System.Reflection;
1818

src/runner/Synapse.Runner/AuthorizationInfo.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
using Docker.DotNet.Models;
1514
using Neuroglia.Data.Infrastructure.ResourceOriented;
1615
using ServerlessWorkflow.Sdk.Models.Authentication;
1716
using System.Text;

src/runner/Synapse.Runner/Services/Executors/ForTaskExecutor.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ protected virtual async Task OnIterationFaultAsync(ITaskExecutor executor, Cance
105105
ArgumentNullException.ThrowIfNull(executor);
106106
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
107107
this.Executors.Remove(executor);
108-
await executor.DisposeAsync().ConfigureAwait(false);
109108
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
110109
}
111110

@@ -123,7 +122,6 @@ protected virtual async Task OnIterationCompletedAsync(ITaskExecutor executor, C
123122
var output = executor.Task.Output!;
124123
this.Executors.Remove(executor);
125124
if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false);
126-
await executor.DisposeAsync().ConfigureAwait(false);
127125
var index = int.Parse(last.Reference.OriginalString.Split('/', StringSplitOptions.RemoveEmptyEntries)[^2]) + 1;
128126
if (index == this.Collection.Count)
129127
{

src/runner/Synapse.Runner/Services/Executors/ForkTaskExecutor.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,9 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell
8080
using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
8181
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
8282
this.Executors.Remove(executor);
83-
await executor.DisposeAsync().ConfigureAwait(false);
8483
foreach (var subExecutor in this.Executors)
8584
{
8685
await subExecutor.CancelAsync(cancellationToken).ConfigureAwait(false);
87-
await subExecutor.DisposeAsync().ConfigureAwait(false);
8886
this.Executors.Remove(executor);
8987
}
9088
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
@@ -102,11 +100,7 @@ protected virtual async Task OnSubTaskCompletedAsync(ITaskExecutor executor, Can
102100
using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
103101
if (this.Task.Instance.Status != TaskInstanceStatus.Running)
104102
{
105-
if (this.Executors.Remove(executor))
106-
{
107-
await executor.CancelAsync(cancellationToken).ConfigureAwait(false);
108-
await executor.DisposeAsync().ConfigureAwait(false);
109-
}
103+
if (this.Executors.Remove(executor)) await executor.CancelAsync(cancellationToken).ConfigureAwait(false);
110104
}
111105
if (this.Task.Definition.Fork.Compete == true)
112106
{
@@ -115,7 +109,6 @@ protected virtual async Task OnSubTaskCompletedAsync(ITaskExecutor executor, Can
115109
{
116110
this.Executors.Remove(concurrentTaskExecutor);
117111
await concurrentTaskExecutor.CancelAsync(cancellationToken).ConfigureAwait(false);
118-
await concurrentTaskExecutor.DisposeAsync().ConfigureAwait(false);
119112
}
120113
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
121114
}

src/runner/Synapse.Runner/Services/Executors/FunctionCallExecutor.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell
115115
ArgumentNullException.ThrowIfNull(executor);
116116
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
117117
this.Executors.Remove(executor);
118-
await executor.DisposeAsync().ConfigureAwait(false);
119118
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
120119
}
121120

src/runner/Synapse.Runner/Services/TaskExecutor.cs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,34 @@ public abstract class TaskExecutor<TDefinition>(IServiceProvider serviceProvider
115115
/// <inheritdoc/>
116116
public virtual async Task InitializeAsync(CancellationToken cancellationToken = default)
117117
{
118-
await this.DoInitializeAsync(cancellationToken).ConfigureAwait(false);
119-
await this.Task.InitializeAsync(cancellationToken).ConfigureAwait(false);
120-
this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Initialized));
118+
try
119+
{
120+
await this.DoInitializeAsync(cancellationToken).ConfigureAwait(false);
121+
await this.Task.InitializeAsync(cancellationToken).ConfigureAwait(false);
122+
this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Initialized));
123+
}
124+
catch(HttpRequestException ex)
125+
{
126+
await this.SetErrorAsync(new Error()
127+
{
128+
Type = ErrorType.Communication,
129+
Title = ErrorTitle.Communication,
130+
Status = ex.StatusCode.HasValue ? (ushort)ex.StatusCode : (ushort)ErrorStatus.Communication,
131+
Detail = ex.Message,
132+
Instance = this.Task.Instance.Reference
133+
}, cancellationToken).ConfigureAwait(false);
134+
}
135+
catch(Exception ex)
136+
{
137+
await this.SetErrorAsync(new Error()
138+
{
139+
Type = ErrorType.Runtime,
140+
Title = ErrorTitle.Runtime,
141+
Status = ErrorStatus.Runtime,
142+
Detail = ex.Message,
143+
Instance = this.Task.Instance.Reference
144+
}, cancellationToken).ConfigureAwait(false);
145+
}
121146
}
122147

123148
/// <summary>
@@ -171,6 +196,28 @@ await this.SetErrorAsync(new()
171196
await this.TaskCompletionSource.Task.ConfigureAwait(false);
172197
}
173198
catch (OperationCanceledException) { }
199+
catch (HttpRequestException ex)
200+
{
201+
await this.SetErrorAsync(new Error()
202+
{
203+
Type = ErrorType.Communication,
204+
Title = ErrorTitle.Communication,
205+
Status = ex.StatusCode.HasValue ? (ushort)ex.StatusCode : (ushort)ErrorStatus.Communication,
206+
Detail = ex.Message,
207+
Instance = this.Task.Instance.Reference
208+
}, cancellationToken).ConfigureAwait(false);
209+
}
210+
catch (Exception ex)
211+
{
212+
await this.SetErrorAsync(new Error()
213+
{
214+
Type = ErrorType.Runtime,
215+
Title = ErrorTitle.Runtime,
216+
Status = ErrorStatus.Runtime,
217+
Detail = ex.Message,
218+
Instance = this.Task.Instance.Reference
219+
}, cancellationToken).ConfigureAwait(false);
220+
}
174221
}
175222

176223
/// <summary>
@@ -198,7 +245,6 @@ protected virtual async Task BeforeExecuteAsync(CancellationToken cancellationTo
198245
}
199246
input = executor.Task.Output ?? new();
200247
this.Executors.Remove(executor);
201-
await executor.DisposeAsync().ConfigureAwait(false);
202248
}
203249
}
204250

src/runner/Synapse.Runner/Services/WorkflowExecutor.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ protected virtual async Task OnTaskCompletedAsync(ITaskExecutor executor, Cancel
272272
};
273273
var completedTask = executor.Task;
274274
this.Executors.Remove(executor);
275-
await executor.DisposeAsync().ConfigureAwait(false);
276275
if (nextDefinition == null)
277276
{
278277
await this.SetResultAsync(completedTask.Output, cancellationToken).ConfigureAwait(false);

0 commit comments

Comments
 (0)