Skip to content

Commit 94d8edb

Browse files
committed
Added disable of flush lock to effects and added more retry-policy tests
1 parent bb6a092 commit 94d8edb

File tree

8 files changed

+130
-2
lines changed

8 files changed

+130
-2
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/EffectTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,12 @@ public override Task CaptureEffectWithRetryPolicy()
8989
[TestMethod]
9090
public override Task CaptureEffectWithRetryPolicyWithResult()
9191
=> CaptureEffectWithRetryPolicyWithResult(FunctionStoreFactory.Create());
92+
93+
[TestMethod]
94+
public override Task CaptureEffectWithRetryPolicyWithoutSuspension()
95+
=> CaptureEffectWithRetryPolicyWithoutSuspension(FunctionStoreFactory.Create());
96+
97+
[TestMethod]
98+
public override Task ExceptionPredicateIsUsedForRetryPolicy()
99+
=> ExceptionPredicateIsUsedForRetryPolicy(FunctionStoreFactory.Create());
92100
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,4 +828,86 @@ public async Task CaptureEffectWithRetryPolicyWithResult(Task<IFunctionStore> st
828828

829829
syncedCounter.Current.ShouldBe(3);
830830
}
831+
832+
public abstract Task CaptureEffectWithRetryPolicyWithoutSuspension();
833+
public async Task CaptureEffectWithRetryPolicyWithoutSuspension(Task<IFunctionStore> storeTask)
834+
{
835+
var store = await storeTask;
836+
var flowId = TestFlowId.Create();
837+
using var registry = new FunctionsRegistry(store);
838+
var syncedCounter = new SyncedCounter();
839+
840+
var retryPolicy = RetryPolicy.Create(suspendThreshold: TimeSpan.MaxValue, initialInterval: TimeSpan.FromMilliseconds(100), backoffCoefficient: 1);
841+
var registration = registry.RegisterFunc<string, string>(
842+
flowType: flowId.Type,
843+
async (param, workflow) =>
844+
{
845+
var effect = workflow.Effect;
846+
return await effect.Capture(() =>
847+
{
848+
if (syncedCounter.Current < 2)
849+
{
850+
syncedCounter.Increment();
851+
throw new TimeoutException();
852+
}
853+
854+
syncedCounter.Increment();
855+
return Task.FromResult(param);
856+
}, retryPolicy);
857+
}
858+
);
859+
860+
var result = await registration.Invoke(flowId.Instance, "Hello World!");
861+
result.ShouldBe("Hello World!");
862+
863+
syncedCounter.Current.ShouldBe(3);
864+
}
865+
866+
public abstract Task ExceptionPredicateIsUsedForRetryPolicy();
867+
public async Task ExceptionPredicateIsUsedForRetryPolicy(Task<IFunctionStore> storeTask)
868+
{
869+
var store = await storeTask;
870+
var flowId = TestFlowId.Create();
871+
using var registry = new FunctionsRegistry(store);
872+
var syncedCounter = new SyncedCounter();
873+
874+
var retryPolicy = RetryPolicy.Create(
875+
suspendThreshold: TimeSpan.MaxValue,
876+
initialInterval: TimeSpan.FromMilliseconds(100),
877+
backoffCoefficient: 1,
878+
shouldRetry: e => e is TimeoutException
879+
);
880+
var registration = registry.RegisterParamless(
881+
flowType: flowId.Type,
882+
async workflow =>
883+
{
884+
var effect = workflow.Effect;
885+
await effect.Capture(() =>
886+
{
887+
if (syncedCounter.Current == 0)
888+
{
889+
syncedCounter.Increment();
890+
throw new TimeoutException();
891+
}
892+
else
893+
{
894+
syncedCounter.Increment();
895+
throw new InvalidOperationException();
896+
}
897+
}, retryPolicy);
898+
}
899+
);
900+
901+
try
902+
{
903+
await registration.Invoke(flowId.Instance);
904+
Assert.Fail("Expected InvalidOperationException");
905+
}
906+
catch (FatalWorkflowException e)
907+
{
908+
e.ErrorType.ShouldBe(typeof(InvalidOperationException));
909+
}
910+
911+
syncedCounter.Current.ShouldBe(2);
912+
}
831913
}

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ public async Task<bool> Mark(string id)
4343
public async Task Upsert<T>(string id, T value, bool flush = true) => await Upsert(CreateEffectId(id, EffectType.Effect), value, flush);
4444
internal Task Upsert<T>(EffectId effectId, T value, bool flush) => effectResults.Upsert(effectId, value, flush);
4545

46+
/// <summary>
47+
/// Take flush lock - thereby posting any new flushes until the lock is released.
48+
/// Useful for adding multiple effects without risking only some of the effects are persisted.
49+
/// </summary>
50+
/// <returns>A disposable instance needed to release the lock</returns>
51+
internal Task<IDisposable> DisableFlush() => effectResults.DisableFlush();
52+
4653
public async Task<Option<T>> TryGet<T>(string id) => await TryGet<T>(CreateEffectId(id, EffectType.Effect));
4754
internal Task<Option<T>> TryGet<T>(EffectId effectId) => effectResults.TryGet<T>(effectId);
4855
public async Task<T> Get<T>(string id) => await Get<T>(CreateEffectId(id, EffectType.Effect));

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
77
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
8+
using Cleipnir.ResilientFunctions.Helpers.Disposables;
89
using Cleipnir.ResilientFunctions.Reactive.Utilities;
910
using Cleipnir.ResilientFunctions.Storage;
1011

@@ -123,6 +124,12 @@ await FlushOrAddToPending(
123124
delete: false
124125
);
125126
}
127+
128+
public async Task<IDisposable> DisableFlush()
129+
{
130+
await _flushSync.WaitAsync();
131+
return new ActionDisposable(() => _flushSync.Release());
132+
}
126133

127134
public async Task<Option<T>> TryGet<T>(EffectId effectId)
128135
{

Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,11 @@ public async Task<T> Invoke<T>(Func<Task<T>> work, Effect effect, UtcNow utcNow)
131131

132132
delayUntil = utcNow().Add(delay);
133133
iteration += 1;
134-
await effect.Upsert(delayUntilId, delayUntil.Ticks, flush: false);
135-
await effect.Upsert(iterationId, iteration, flush: false);
134+
{
135+
using var @lock = await effect.DisableFlush();
136+
await effect.Upsert(delayUntilId, delayUntil.Ticks, flush: false);
137+
await effect.Upsert(iterationId, iteration, flush: false);
138+
}
136139

137140
if (iteration >= maximumAttempts)
138141
throw;

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/EffectTests.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,11 @@ public override Task CaptureEffectWithRetryPolicy()
8989
public override Task CaptureEffectWithRetryPolicyWithResult()
9090
=> CaptureEffectWithRetryPolicyWithResult(FunctionStoreFactory.Create());
9191

92+
[TestMethod]
93+
public override Task CaptureEffectWithRetryPolicyWithoutSuspension()
94+
=> CaptureEffectWithRetryPolicyWithoutSuspension(FunctionStoreFactory.Create());
95+
96+
[TestMethod]
97+
public override Task ExceptionPredicateIsUsedForRetryPolicy()
98+
=> ExceptionPredicateIsUsedForRetryPolicy(FunctionStoreFactory.Create());
9299
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/EffectTests.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,11 @@ public override Task CaptureEffectWithRetryPolicy()
9090
public override Task CaptureEffectWithRetryPolicyWithResult()
9191
=> CaptureEffectWithRetryPolicyWithResult(FunctionStoreFactory.Create());
9292

93+
[TestMethod]
94+
public override Task CaptureEffectWithRetryPolicyWithoutSuspension()
95+
=> CaptureEffectWithRetryPolicyWithoutSuspension(FunctionStoreFactory.Create());
96+
97+
[TestMethod]
98+
public override Task ExceptionPredicateIsUsedForRetryPolicy()
99+
=> ExceptionPredicateIsUsedForRetryPolicy(FunctionStoreFactory.Create());
93100
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectTests.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,11 @@ public override Task CaptureEffectWithRetryPolicy()
9090
public override Task CaptureEffectWithRetryPolicyWithResult()
9191
=> CaptureEffectWithRetryPolicyWithResult(FunctionStoreFactory.Create());
9292

93+
[TestMethod]
94+
public override Task CaptureEffectWithRetryPolicyWithoutSuspension()
95+
=> CaptureEffectWithRetryPolicyWithoutSuspension(FunctionStoreFactory.Create());
96+
97+
[TestMethod]
98+
public override Task ExceptionPredicateIsUsedForRetryPolicy()
99+
=> ExceptionPredicateIsUsedForRetryPolicy(FunctionStoreFactory.Create());
93100
}

0 commit comments

Comments
 (0)