Skip to content

Commit 8d5fa0e

Browse files
committed
Fixed suspension issue when append messages to own flow
1 parent 6f91665 commit 8d5fa0e

File tree

6 files changed

+50
-3
lines changed

6 files changed

+50
-3
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,8 @@ public override Task ExecutingFlowIsReExecutedWhenSuspendedAfterInterrupt()
9191
[TestMethod]
9292
public override Task InterruptSuspendedFlows()
9393
=> InterruptSuspendedFlows(FunctionStoreFactory.Create());
94+
95+
[TestMethod]
96+
public override Task AwaitMessageAfterAppendShouldNotCauseSuspension()
97+
=> AwaitMessageAfterAppendShouldNotCauseSuspension(FunctionStoreFactory.Create());
9498
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SuspensionTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,4 +742,35 @@ protected async Task InterruptSuspendedFlows(Task<IFunctionStore> storeTask)
742742

743743
unhandledExceptionHandler.ShouldNotHaveExceptions();
744744
}
745+
746+
public abstract Task AwaitMessageAfterAppendShouldNotCauseSuspension();
747+
protected async Task AwaitMessageAfterAppendShouldNotCauseSuspension(Task<IFunctionStore> storeTask)
748+
{
749+
var store = await storeTask;
750+
var id = TestFlowId.Create();
751+
var (flowType, flowInstance) = id;
752+
753+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
754+
using var functionsRegistry = new FunctionsRegistry
755+
(
756+
store,
757+
new Settings(unhandledExceptionHandler.Catch)
758+
);
759+
760+
var registration = functionsRegistry.RegisterFunc<string, string>(
761+
flowType,
762+
inner: async Task<string> (param, workflow) =>
763+
{
764+
var messages = workflow.Messages;
765+
await messages.AppendMessage(param);
766+
767+
return await messages.FirstOfType<string>();
768+
}
769+
);
770+
771+
var result = await registration.Invoke("SomeInstance", "Hello World");
772+
result.ShouldBe("Hello World");
773+
774+
unhandledExceptionHandler.ShouldNotHaveExceptions();
775+
}
745776
}

Core/Cleipnir.ResilientFunctions/Messaging/MessagesPullerAndEmitter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ public async Task PullEvents(TimeSpan maxSinceLastSynced)
8484
try
8585
{
8686
var storedMessages = _initialMessages ?? await _messageStore.GetMessages(_storedId, _skip);
87-
_initialMessages = null;
87+
if (_initialMessages != null && maxSinceLastSynced == TimeSpan.Zero)
88+
storedMessages = storedMessages.Concat(await _messageStore.GetMessages(_storedId, _skip)).ToList();
8889

90+
_initialMessages = null;
8991
_lastSynced = _utcNow();
9092
_skip += storedMessages.Count;
9193

@@ -118,7 +120,5 @@ public async Task PullEvents(TimeSpan maxSinceLastSynced)
118120

119121
throw eventHandlingException;
120122
}
121-
122-
return;
123123
}
124124
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ public override Task InterruptSuspendedFlows()
7979
[TestMethod]
8080
public override Task ChildIsCreatedWithParentsId()
8181
=> ChildIsCreatedWithParentsId(FunctionStoreFactory.Create());
82+
83+
[TestMethod]
84+
public override Task AwaitMessageAfterAppendShouldNotCauseSuspension()
85+
=> AwaitMessageAfterAppendShouldNotCauseSuspension(FunctionStoreFactory.Create());
8286
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,8 @@ public override Task InterruptSuspendedFlows()
8080
[TestMethod]
8181
public override Task ChildIsCreatedWithParentsId()
8282
=> ChildIsCreatedWithParentsId(FunctionStoreFactory.Create());
83+
84+
[TestMethod]
85+
public override Task AwaitMessageAfterAppendShouldNotCauseSuspension()
86+
=> AwaitMessageAfterAppendShouldNotCauseSuspension(FunctionStoreFactory.Create());
8387
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/SuspensionTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,8 @@ public override Task ExecutingFlowIsReExecutedWhenSuspendedAfterInterrupt()
8080
[TestMethod]
8181
public override Task InterruptSuspendedFlows()
8282
=> InterruptSuspendedFlows(FunctionStoreFactory.Create());
83+
84+
[TestMethod]
85+
public override Task AwaitMessageAfterAppendShouldNotCauseSuspension()
86+
=> AwaitMessageAfterAppendShouldNotCauseSuspension(FunctionStoreFactory.Create());
8387
}

0 commit comments

Comments
 (0)