Skip to content

Add completion callbacks and async handler support to PersistAllAsync/PersistAll #7935

@Aaronontheweb

Description

@Aaronontheweb

Problem Statement

The current PersistAllAsync and PersistAll APIs provide no built-in way to determine when all events in a batch have been persisted and their callbacks completed. This forces users to implement fragile workarounds that can fail silently.

Current API Limitations

No completion detection:

var events = new[] { evt1, evt2, evt3 };
PersistAllAsync(events, evt =>
{
    State = State.Apply(evt);
    
    // ❌ How do I know when ALL events are done?
    // ❌ When should I reply to the command sender?
    // ❌ When should I trigger side effects?
});

No async/await support:

// ❌ Cannot use async handlers - compilation error
PersistAllAsync(events, async evt =>
{
    State = State.Apply(evt);
    await SomeAsyncOperation(); // Not allowed!
});

Common Workarounds and Their Problems

Workaround 1: ReferenceEquals (FRAGILE)

var lastEvent = events[^1];
PersistAllAsync(events, evt =>
{
    State = State.Apply(evt);
    
    if (ReferenceEquals(evt, lastEvent))
    {
        // Execute after all events persisted
        Sender.Tell(new Success());
    }
});

Why it fails:

  • Events may be serialized/deserialized during persistence
  • Collection types might change (array → list)
  • Event wrapping or proxying by persistence layer
  • Fails silently - the completion block never executes, no errors logged

Workaround 2: Manual Counter (VERBOSE)

var eventsList = events.ToList();
var expectedCount = eventsList.Count;
var persistedCount = 0;

PersistAllAsync(events, evt =>
{
    State = State.Apply(evt);
    persistedCount++;
    
    if (persistedCount == expectedCount)
    {
        Sender.Tell(new Success());
    }
});

Why it's problematic:

  • Boilerplate code in every usage
  • Easy to get wrong (forgot to increment, wrong comparison)
  • Mutable state tracking adds complexity
  • Pollutes the business logic

Real-World Impact

This limitation has caused:

  • Flaky tests due to silent ReferenceEquals failures
  • Ask pattern timeouts when replies never sent
  • Production bugs where completion logic doesn't execute
  • User complaints about lack of natural async/await support

Proposed Solution

Add completion callback overloads and async handler support to all persistence methods while maintaining backward compatibility.

API Design

Batch Methods (PersistAllAsync, PersistAll)

Sync handler + sync completion:

void PersistAllAsync<TEvent>(
    IEnumerable<TEvent> events,
    Action<TEvent> handler,
    Action onComplete)

void PersistAll<TEvent>(
    IEnumerable<TEvent> events,
    Action<TEvent> handler,
    Action onComplete)

Sync handler + async completion:

void PersistAllAsync<TEvent>(
    IEnumerable<TEvent> events,
    Action<TEvent> handler,
    Func<Task> onCompleteAsync)

void PersistAll<TEvent>(
    IEnumerable<TEvent> events,
    Action<TEvent> handler,
    Func<Task> onCompleteAsync)

Async handler + sync completion:

void PersistAllAsync<TEvent>(
    IEnumerable<TEvent> events,
    Func<TEvent, Task> handler,
    Action onComplete)

void PersistAll<TEvent>(
    IEnumerable<TEvent> events,
    Func<TEvent, Task> handler,
    Action onComplete)

Async handler + async completion:

void PersistAllAsync<TEvent>(
    IEnumerable<TEvent> events,
    Func<TEvent, Task> handler,
    Func<Task> onCompleteAsync)

void PersistAll<TEvent>(
    IEnumerable<TEvent> events,
    Func<TEvent, Task> handler,
    Func<Task> onCompleteAsync)

Single-Event Methods (PersistAsync, Persist)

Async handler support:

void PersistAsync<TEvent>(TEvent @event, Func<TEvent, Task> handler)

void Persist<TEvent>(TEvent @event, Func<TEvent, Task> handler)

Note: No completion callbacks for single-event methods since there's no batch to complete.

Usage Examples

Example 1: Simple completion callback

var events = new[] { evt1, evt2, evt3 };

PersistAllAsync(
    events,
    handler: evt => State = State.Apply(evt),
    onComplete: () => Sender.Tell(new Success())
);

Example 2: Async completion for side effects

PersistAllAsync(
    events,
    handler: evt => State = State.Apply(evt),
    onCompleteAsync: async () =>
    {
        await _emailService.SendNotificationAsync();
        await _projectionService.UpdateReadModelAsync();
        Sender.Tell(new Success());
    }
);

Example 3: Async event handlers

PersistAllAsync(
    events,
    handler: async evt =>
    {
        State = State.Apply(evt);
        await _validationService.ValidateEventAsync(evt);
    },
    onComplete: () => Sender.Tell(new Success())
);

Implementation Details

Actor Context Safety with RunTask

All async operations will use RunTask to ensure execution in the actor's context:

// For async handlers
if (handler is Func<TEvent, Task> asyncHandler)
{
    RunTask(() => asyncHandler(evt));
}

// For async completion
if (onCompleteAsync != null)
{
    RunTask(onCompleteAsync);
}

This ensures:

  • Continuations execute on the actor's thread
  • Actor state remains consistent
  • No race conditions or concurrency issues

Stashing Semantics Preserved

PersistAllAsync (non-stashing):

  • New commands CAN interleave while event callbacks execute
  • Async operations run via RunTask, but actor continues processing
  • Maintains current behavior

PersistAll (stashing):

  • New commands are STASHED until ALL callbacks complete
  • Includes async handler completions and async completion callbacks
  • Stashing counter tracks pending async operations
  • Maintains current behavior

Ordering Guarantees

  • Events persist in order (unchanged)
  • Per-event handlers invoked in order (unchanged)
  • Completion callback fires AFTER all per-event handlers complete
  • Async operations via RunTask preserve actor context safety

Internal Implementation

Modify AsyncHandlerInvocation and StashingHandlerInvocation classes:

  • Add optional completion callback field
  • Support both Action<object> and Func<object, Task> handlers
  • Track last event in batch

Update PeekApplyHandler in Eventsourced.Recovery.cs:

  • Detect when processing last event in batch
  • Invoke completion callback after last handler
  • Use RunTask for async operations

Backward Compatibility

All existing overloads remain unchanged

  • Current PersistAllAsync(events, handler) continues to work
  • No breaking changes to method signatures

No wire format changes

  • Events serialized identically
  • No changes to persistence protocol

Additive only

  • New overloads are additions, not modifications
  • Existing code continues to compile and run

No behavioral changes to existing code

  • Current semantics preserved
  • Stashing vs non-stashing unchanged

Benefits

Eliminates Brittle Workarounds

  • No more ReferenceEquals checks that fail silently
  • No more manual event counting boilerplate
  • Clear, explicit intent

Natural .NET Patterns

  • Native async/await support
  • Idiomatic C# code
  • Better IntelliSense and tooling support

Improved Code Quality

  • Less boilerplate
  • Fewer bugs from manual tracking
  • Easier to read and maintain

Better Developer Experience

  • Addresses long-standing user pain points
  • Aligns with user expectations for async APIs
  • Reduces learning curve

Testing Requirements

New test coverage needed:

  • Completion callback fires exactly once per batch
  • Completion callback fires after all event handlers complete
  • Async handlers execute without blocking actor
  • Async completion callbacks work correctly
  • Stashing behavior with async operations
  • Error handling in async handlers
  • Interleaving behavior for PersistAllAsync
  • Backward compatibility - existing tests pass unchanged

Target

Milestone: 1.5.56

Related Issues

This addresses user complaints about:

  • Difficulty detecting batch completion
  • Lack of async/await support in persistence
  • Fragile workarounds that fail silently
  • Boilerplate code for common patterns

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions