Skip to content

Commit fd830b3

Browse files
committed
feat: improve sequence activity handling
1 parent 9f70ef8 commit fd830b3

File tree

17 files changed

+452
-296
lines changed

17 files changed

+452
-296
lines changed

docs/changelog/500.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ In the following sections, you'll find more a more detailed and canonical list o
100100
* Add `IgnoreNoMatchingSubscribersError` setting to MQTT producer endpoints, to prevent throwing when no subscriber is consuming the produced message
101101
* Basic support for MQTT 5 request-response
102102
* Add <xref:Silverback.Messaging.Subscribers.ConsumerNameFilterAttribute> to filter the subscribers by consumer name (replaces <xref:Silverback.Messaging.Subscribers.KafkaGroupIdFilterAttribute> and <xref:Silverback.Messaging.Subscribers.MqttClientIdFilterAttribute>)
103-
* Add new built-in (de-)serializers to support raw `string`, `Stream`, `byte[]` and `JsonDocument` (see <xref:serialization> and <xref:deserialization>)
103+
* Add new built-in (de-)serializers to support raw `string`, `Stream` and `byte[]` (see <xref:serialization> and <xref:deserialization>)
104+
* Improve [Activity](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity) handling for sequences (batch processing, chunking, etc.)
104105
* Upgrade to [Confluent.Kafka 2.8.0](https://github.com/confluentinc/confluent-kafka-dotnet/releases/tag/v2.2.0)
105106
* Upgrade to [MQTTnet 5.0.1.1416](https://github.com/chkr1011/MQTTnet/releases/tag/v5.0.1.1416)
106107

src/Silverback.Core/Messaging/Messages/IMessageStreamEnumerable.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,30 @@ namespace Silverback.Messaging.Messages;
1313
internal interface IMessageStreamEnumerable
1414
{
1515
/// <summary>
16-
/// Adds the specified message to the stream. The returned <see cref="ValueTask" /> will complete only when the
17-
/// message has actually been pulled and processed.
16+
/// Adds the specified message to the stream. The returned <see cref="ValueTask" /> will complete only when the message has actually
17+
/// been pulled and processed.
1818
/// </summary>
1919
/// <param name="message">
2020
/// The message to be added.
2121
/// </param>
22+
/// <param name="onPullAction">
23+
/// An action to be executed when the message is pulled.
24+
/// </param>
25+
/// <param name="onPullActionArgument">
26+
/// The argument to be passed to the <paramref name="onPullAction" />.
27+
/// </param>
2228
/// <param name="cancellationToken">
2329
/// A <see cref="CancellationToken" /> used to cancel the operation.
2430
/// </param>
2531
/// <returns>
2632
/// A <see cref="Task" /> representing the asynchronous operation. The <see cref="Task" /> will complete
2733
/// only when the message has actually been pulled and processed.
2834
/// </returns>
29-
Task PushAsync(object message, CancellationToken cancellationToken = default);
35+
Task PushAsync(object message, Action<object?>? onPullAction, object? onPullActionArgument, CancellationToken cancellationToken);
3036

3137
/// <summary>
32-
/// Aborts the ongoing enumeration and the pending calls to <see cref="PushAsync" />, then marks the
33-
/// stream as complete. Calling this method will cause an <see cref="OperationCanceledException" /> to be
34-
/// thrown by the enumerator and the <see cref="PushAsync" /> method.
38+
/// Aborts the ongoing enumeration and the pending calls to <see cref="PushAsync" />, then marks the stream as complete. Calling this
39+
/// method will cause an <see cref="OperationCanceledException" /> to be thrown by the enumerator and the <see cref="PushAsync" /> method.
3540
/// </summary>
3641
void Abort();
3742

src/Silverback.Core/Messaging/Messages/MessageStreamEnumerable`1.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,16 @@ internal sealed class MessageStreamEnumerable<TMessage> : IMessageStreamEnumerab
3737

3838
private bool _isComplete;
3939

40-
Task IMessageStreamEnumerable.PushAsync(object message, CancellationToken cancellationToken) =>
41-
PushAsync((TMessage)message, cancellationToken);
40+
private Action<object?>? _onPullAction;
4241

43-
/// <inheritdoc cref="IMessageStreamEnumerable.PushAsync(object,System.Threading.CancellationToken)" />
42+
private object? _onPullActionArgument;
43+
44+
Task IMessageStreamEnumerable.PushAsync(object message, Action<object?>? onPullAction, object? onPullActionArgument, CancellationToken cancellationToken) =>
45+
PushAsync((TMessage)message, onPullAction, onPullActionArgument, cancellationToken);
46+
47+
/// <inheritdoc cref="IMessageStreamEnumerable.PushAsync(object,Action{object},object,CancellationToken)" />
4448
[SuppressMessage("ReSharper", "InconsistentlySynchronizedField", Justification = "The lock is important to avoid multiple complete/abort, here is not important")]
45-
public async Task PushAsync(TMessage message, CancellationToken cancellationToken = default)
49+
public async Task PushAsync(TMessage message, Action<object?>? onPullAction = null, object? onPullActionArgument = null, CancellationToken cancellationToken = default)
4650
{
4751
Check.NotNull(message, nameof(message));
4852

@@ -55,6 +59,8 @@ public async Task PushAsync(TMessage message, CancellationToken cancellationToke
5559

5660
_current = message;
5761
_hasCurrent = true;
62+
_onPullAction = onPullAction;
63+
_onPullActionArgument = onPullActionArgument;
5864
SafelyRelease(_readSemaphore);
5965

6066
await _processedSemaphore.WaitAsync(linkedTokenSource.Token).ConfigureAwait(false);
@@ -137,6 +143,8 @@ private IEnumerable<TMessage> GetEnumerable()
137143
if (!_hasCurrent || _current == null)
138144
continue;
139145

146+
_onPullAction?.Invoke(_onPullActionArgument);
147+
140148
yield return _current;
141149
}
142150
}
@@ -149,6 +157,8 @@ private async IAsyncEnumerable<TMessage> GetAsyncEnumerable([EnumeratorCancellat
149157
if (!_hasCurrent || _current == null)
150158
continue;
151159

160+
_onPullAction?.Invoke(_onPullActionArgument);
161+
152162
yield return _current;
153163
}
154164
}

src/Silverback.Core/Messaging/Messages/MessageStreamProvider.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ public abstract ILazyMessageStreamEnumerable<object> CreateLazyStream(
4040
public abstract void AbortIfPending();
4141

4242
/// <summary>
43-
/// Aborts the ongoing enumerations and the pending calls to
44-
/// <see cref="MessageStreamProvider{TMessage}.PushAsync(TMessage,System.Threading.CancellationToken)" />, then marks the
45-
/// stream as complete. Calling this method will cause an <see cref="OperationCanceledException" /> to be
46-
/// thrown by the enumerators and the <see cref="MessageStreamProvider{TMessage}.PushAsync(TMessage,System.Threading.CancellationToken)" /> method.
43+
/// Aborts the ongoing enumerations and the pending calls to <see cref="MessageStreamProvider{TMessage}.PushAsync" />, then marks the
44+
/// stream as complete. Calling this method will cause an <see cref="OperationCanceledException" /> to be thrown by the enumerators
45+
/// and the <see cref="MessageStreamProvider{TMessage}.PushAsync" /> method.
4746
/// </summary>
4847
public abstract void Abort();
4948

src/Silverback.Core/Messaging/Messages/MessageStreamProvider`1.cs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,33 +37,20 @@ internal sealed class MessageStreamProvider<TMessage> : MessageStreamProvider
3737
public override int StreamsCount => _lazyStreams.Count;
3838

3939
/// <summary>
40-
/// Adds the specified message to the stream. The returned <see cref="Task" /> will complete only when the
41-
/// message has actually been pulled and processed.
40+
/// Adds the specified message to the stream. The returned <see cref="Task" /> will complete only when the message has actually been
41+
/// pulled and processed.
4242
/// </summary>
4343
/// <param name="message">
4444
/// The message to be added.
4545
/// </param>
46-
/// <param name="cancellationToken">
47-
/// A <see cref="CancellationToken" /> used to cancel the operation.
46+
/// <param name="throwIfUnhandled">
47+
/// A boolean value indicating whether an exception must be thrown if no subscriber is handling the message.
4848
/// </param>
49-
/// <returns>
50-
/// A <see cref="Task{TResult}" /> representing the asynchronous operation. The task will complete only
51-
/// when the message has actually been pulled and processed and its result contains the number of
52-
/// <see cref="IMessageStreamEnumerable{TMessage}" /> that have been pushed.
53-
/// </returns>
54-
public Task<int> PushAsync(TMessage message, CancellationToken cancellationToken = default) =>
55-
PushAsync(message, true, cancellationToken);
56-
57-
/// <summary>
58-
/// Adds the specified message to the stream. The returned <see cref="Task" /> will complete only when the
59-
/// message has actually been pulled and processed.
60-
/// </summary>
61-
/// <param name="message">
62-
/// The message to be added.
49+
/// <param name="onPullAction">
50+
/// An action to be executed when the message is pulled.
6351
/// </param>
64-
/// <param name="throwIfUnhandled">
65-
/// A boolean value indicating whether an exception must be thrown if no subscriber is handling the
66-
/// message.
52+
/// <param name="onPullActionArgument">
53+
/// The argument to be passed to the <paramref name="onPullAction" />.
6754
/// </param>
6855
/// <param name="cancellationToken">
6956
/// A <see cref="CancellationToken" /> used to cancel the operation.
@@ -74,14 +61,19 @@ public Task<int> PushAsync(TMessage message, CancellationToken cancellationToken
7461
/// <see cref="IMessageStreamEnumerable{TMessage}" /> that have been pushed.
7562
/// </returns>
7663
[SuppressMessage("ReSharper", "ParameterOnlyUsedForPreconditionCheck.Global", Justification = "False positive")]
77-
public async Task<int> PushAsync(TMessage message, bool throwIfUnhandled, CancellationToken cancellationToken = default)
64+
public async Task<int> PushAsync(
65+
TMessage message,
66+
bool throwIfUnhandled = true,
67+
Action<object?>? onPullAction = null,
68+
object? onPullActionArgument = null,
69+
CancellationToken cancellationToken = default)
7870
{
7971
Check.NotNull(message, nameof(message));
8072

8173
if (_completed || _aborted)
8274
throw new InvalidOperationException("The streams are already completed or aborted.");
8375

84-
List<Task> processingTasks = PushToCompatibleStreams(message, cancellationToken).ToList();
76+
List<Task> processingTasks = PushToCompatibleStreams(message, onPullAction, onPullActionArgument, cancellationToken).ToList();
8577

8678
if (processingTasks.Count > 0)
8779
await Task.WhenAll(processingTasks).ConfigureAwait(false);
@@ -199,6 +191,8 @@ private static LazyMessageStreamEnumerable<TMessageLinked> CreateLazyStreamCore<
199191
private static bool PushIfCompatibleType(
200192
ILazyMessageStreamEnumerable lazyStream,
201193
TMessage message,
194+
Action<object?>? onPullAction,
195+
object? onPullActionArgument,
202196
CancellationToken cancellationToken,
203197
out Task messageProcessingTask)
204198
{
@@ -210,14 +204,14 @@ private static bool PushIfCompatibleType(
210204

211205
if (lazyStream.MessageType.IsInstanceOfType(message))
212206
{
213-
messageProcessingTask = lazyStream.GetOrCreateStream().PushAsync(message, cancellationToken);
207+
messageProcessingTask = lazyStream.GetOrCreateStream().PushAsync(message, onPullAction, onPullActionArgument, cancellationToken);
214208
return true;
215209
}
216210

217211
if (message is IEnvelope { Message: not null } envelope &&
218212
lazyStream.MessageType.IsInstanceOfType(envelope.Message))
219213
{
220-
messageProcessingTask = lazyStream.GetOrCreateStream().PushAsync(envelope.Message, cancellationToken);
214+
messageProcessingTask = lazyStream.GetOrCreateStream().PushAsync(envelope.Message, onPullAction, onPullActionArgument, cancellationToken);
221215
return true;
222216
}
223217

@@ -228,7 +222,7 @@ private static bool PushIfCompatibleType(
228222
private static bool IsFiltered(IReadOnlyCollection<IMessageFilter>? filters, object message) =>
229223
filters != null && filters.Count != 0 && !filters.All(filter => filter.MustProcess(message));
230224

231-
private IEnumerable<Task> PushToCompatibleStreams(TMessage message, CancellationToken cancellationToken)
225+
private IEnumerable<Task> PushToCompatibleStreams(TMessage message, Action<object?>? onPullAction, object? onPullActionArgument, CancellationToken cancellationToken)
232226
{
233227
foreach (ILazyMessageStreamEnumerable? lazyStream in _lazyStreams)
234228
{
@@ -237,7 +231,7 @@ private IEnumerable<Task> PushToCompatibleStreams(TMessage message, Cancellation
237231
if (cancellationToken.IsCancellationRequested)
238232
yield break;
239233

240-
if (PushIfCompatibleType(lazyStream, message, cancellationToken, out Task processingTask))
234+
if (PushIfCompatibleType(lazyStream, message, onPullAction, onPullActionArgument, cancellationToken, out Task processingTask))
241235
{
242236
yield return processingTask;
243237
}

src/Silverback.Integration/Messaging/Diagnostics/ActivityExtensions.cs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ namespace Silverback.Messaging.Diagnostics;
1212

1313
internal static class ActivityExtensions
1414
{
15-
public static void AddBaggageRange(
16-
this Activity activity,
17-
IEnumerable<KeyValuePair<string, string>> baggageItems)
15+
public static void AddBaggageRange(this Activity activity, IEnumerable<KeyValuePair<string, string>> baggageItems)
1816
{
1917
Check.NotNull(activity, nameof(activity));
2018
Check.NotNull(baggageItems, nameof(baggageItems));
@@ -25,6 +23,16 @@ public static void AddBaggageRange(
2523
}
2624
}
2725

26+
public static void ClearBaggage(this Activity activity)
27+
{
28+
Check.NotNull(activity, nameof(activity));
29+
30+
foreach (KeyValuePair<string, string?> pair in activity.Baggage)
31+
{
32+
activity.SetBaggage(pair.Key, null);
33+
}
34+
}
35+
2836
public static void SetMessageHeaders(this Activity activity, MessageHeaderCollection headers)
2937
{
3038
if (activity.Id == null)
@@ -36,16 +44,10 @@ public static void SetMessageHeaders(this Activity activity, MessageHeaderCollec
3644

3745
string? traceState = activity.TraceStateString;
3846
if (traceState != null)
39-
{
4047
headers.Add(DefaultMessageHeaders.TraceState, traceState);
41-
}
4248

4349
if (activity.Baggage.Any())
44-
{
45-
headers.Add(
46-
DefaultMessageHeaders.TraceBaggage,
47-
ActivityBaggageSerializer.Serialize(activity.Baggage));
48-
}
50+
headers.Add(DefaultMessageHeaders.TraceBaggage, ActivityBaggageSerializer.Serialize(activity.Baggage));
4951
}
5052

5153
public static void SetTraceIdAndState(this Activity activity, string? traceId, string? traceState)
@@ -61,7 +63,7 @@ public static void SetTraceIdAndState(this Activity activity, string? traceId, s
6163
}
6264
}
6365

64-
public static void AddEndpointName(this Activity activity, string endpointName) =>
66+
public static void SetEndpointName(this Activity activity, string endpointName) =>
6567
activity.SetTag(ActivityTagNames.MessageDestination, endpointName);
6668

6769
public static Activity? StartWithTraceId(
@@ -72,15 +74,10 @@ public static void AddEndpointName(this Activity activity, string endpointName)
7274
string? traceState)
7375
{
7476
if (!activitySource.HasListeners())
75-
{
7677
return null;
77-
}
78-
79-
if (traceId != null && ActivityContext.TryParse(traceId, traceState, out ActivityContext context))
80-
{
81-
return activitySource.StartActivity(name, activityKind, context);
82-
}
8378

84-
return activitySource.StartActivity(name, activityKind);
79+
return traceId != null && ActivityContext.TryParse(traceId, traceState, out ActivityContext context)
80+
? activitySource.StartActivity(name, activityKind, context)
81+
: activitySource.StartActivity(name, activityKind);
8582
}
8683
}

0 commit comments

Comments
 (0)