Skip to content

Commit 974f475

Browse files
authored
Support retries for long running RPCs. With this We will optionally retry a failed long running file upload/download. (#1147)
* Support retries for long running RPCs * . * . * . * . * Update halibut * . * . * . * . * . * . * . * . * . * . * . * . * . * . * .
1 parent 755d870 commit 974f475

File tree

12 files changed

+449
-30
lines changed

12 files changed

+449
-30
lines changed

source/Octopus.Tentacle.Client.Tests/RpcCallRetryHandlerFixture.cs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
using FluentAssertions;
88
using Halibut;
99
using Halibut.Exceptions;
10+
using Halibut.Transport;
1011
using NUnit.Framework;
1112
using Octopus.Tentacle.Client.Retries;
13+
using Octopus.Tentacle.CommonTestUtils.Assertions;
1214
using Polly.Timeout;
1315

1416
namespace Octopus.Tentacle.Client.Tests
@@ -687,5 +689,184 @@ private TimeSpan GetMinTimeoutDuration(RpcCallRetryHandler handler)
687689
{
688690
return handler.RetryTimeout - handler.RetryIfRemainingDurationAtLeast - retryBackoffBuffer;
689691
}
692+
693+
[Test]
694+
public async Task MinimumAttempts_WhenSetTo1_DoesNotRetryAfterTimeoutExceeded()
695+
{
696+
var expectedResult = Guid.NewGuid();
697+
var callCount = 0;
698+
var onRetryActionCalled = false;
699+
var onTimeoutActionCalled = false;
700+
701+
// Short timeout to ensure it's exceeded, but minimumAttemptsForInterruptedLongRunningCalls = 1
702+
var handler = new RpcCallRetryHandler(TimeSpan.FromSeconds(1), minimumAttemptsForInterruptedLongRunningCalls: 1);
703+
704+
var result = await handler.ExecuteWithRetries(
705+
async ct =>
706+
{
707+
callCount++;
708+
709+
// Sleep for 3 seconds to simulate a long-running operation
710+
// must exceed retry duration
711+
await Task.Delay(TimeSpan.FromSeconds(3), ct);
712+
713+
// Succeed on the first (and only) attempt since minimumAttemptsForInterruptedLongRunningCalls = 1
714+
if (callCount == 1)
715+
{
716+
return expectedResult;
717+
}
718+
719+
throw new HalibutClientException($"An error has occurred on attempt {callCount}");
720+
},
721+
onRetryAction: async (_, _, _, _, _, _) =>
722+
{
723+
await Task.CompletedTask;
724+
onRetryActionCalled = true;
725+
},
726+
onTimeoutAction: async (_, _, _, _) =>
727+
{
728+
await Task.CompletedTask;
729+
onTimeoutActionCalled = true;
730+
},
731+
CancellationToken.None);
732+
733+
// With minimumAttemptsForInterruptedLongRunningCalls = 1, we should only make one attempt (no retries)
734+
result.Should().Be(expectedResult);
735+
callCount.Should().Be(1);
736+
onRetryActionCalled.Should().BeFalse();
737+
onTimeoutActionCalled.Should().BeFalse();
738+
}
739+
740+
[Test]
741+
public async Task MinimumAttempts_WhenSetTo2_MakesOneRetryEvenAfterTimeoutExceeded()
742+
{
743+
var expectedResult = Guid.NewGuid();
744+
var callCount = 0;
745+
var onRetryActionCalled = false;
746+
var onTimeoutActionCalled = false;
747+
748+
// Short timeout to ensure it's exceeded, but minimumAttemptsForInterruptedLongRunningCalls = 2
749+
var handler = new RpcCallRetryHandler(TimeSpan.FromSeconds(5), minimumAttemptsForInterruptedLongRunningCalls: 2);
750+
751+
var result = await handler.ExecuteWithRetries(
752+
async ct =>
753+
{
754+
callCount++;
755+
756+
// Delay 2 seconds to ensure the ct doesn't get canceled.
757+
await Task.Delay(TimeSpan.FromSeconds(2), ct);
758+
759+
// Fail on first attempt, succeed on second
760+
if (callCount == 1)
761+
{
762+
throw new HalibutClientException($"An error has occurred on attempt {callCount}");
763+
}
764+
765+
return expectedResult;
766+
},
767+
onRetryAction: async (_, _, _, _, _, _) =>
768+
{
769+
await Task.CompletedTask;
770+
onRetryActionCalled = true;
771+
},
772+
onTimeoutAction: async (_, _, _, _) =>
773+
{
774+
await Task.CompletedTask;
775+
onTimeoutActionCalled = true;
776+
},
777+
CancellationToken.None);
778+
779+
// With minimumAttemptsForInterruptedLongRunningCalls = 2, we should make 2 attempts (1 initial + 1 retry)
780+
result.Should().Be(expectedResult);
781+
callCount.Should().Be(2);
782+
onRetryActionCalled.Should().BeTrue();
783+
onTimeoutActionCalled.Should().BeFalse(); // Should succeed before timeout
784+
}
785+
786+
[Test]
787+
public async Task MinimumAttempts_WhenSetTo3_MakesTwoRetriesEvenAfterTimeoutExceeded()
788+
{
789+
var expectedResult = Guid.NewGuid();
790+
var callCount = 0;
791+
var retryCount = 0;
792+
var onTimeoutActionCalled = false;
793+
794+
// Short timeout to ensure it's exceeded, but minimumAttemptsForInterruptedLongRunningCalls = 3
795+
var handler = new RpcCallRetryHandler(TimeSpan.FromSeconds(8), minimumAttemptsForInterruptedLongRunningCalls: 3);
796+
797+
var result = await handler.ExecuteWithRetries(
798+
async ct =>
799+
{
800+
callCount++;
801+
802+
// Always add 2 second delay as requested
803+
await Task.Delay(TimeSpan.FromSeconds(2), ct);
804+
805+
// Fail on first two attempts, succeed on third
806+
if (callCount <= 2)
807+
{
808+
throw new HalibutClientException($"An error has occurred on attempt {callCount}");
809+
}
810+
811+
return expectedResult;
812+
},
813+
onRetryAction: async (_, _, _, _, _, _) =>
814+
{
815+
await Task.CompletedTask;
816+
retryCount++;
817+
},
818+
onTimeoutAction: async (_, _, _, _) =>
819+
{
820+
await Task.CompletedTask;
821+
onTimeoutActionCalled = true;
822+
},
823+
CancellationToken.None);
824+
825+
// With minimumAttemptsForInterruptedLongRunningCalls = 3, we should make 3 attempts (1 initial + 2 retries)
826+
result.Should().Be(expectedResult);
827+
callCount.Should().Be(3);
828+
retryCount.Should().Be(2);
829+
onTimeoutActionCalled.Should().BeFalse(); // Should succeed before timeout
830+
}
831+
832+
/// <summary>
833+
/// Connection error means the tentacle did not get the request, which means the tentacle is considered to be offline.
834+
/// This shows that we won't exceed the retry duration attempting to connect to an offline tentacle to meet the
835+
/// minimumAttemptsForInterruptedLongRunningCalls count.
836+
/// </summary>
837+
[Test]
838+
public async Task WhenConfiguredToMakeAMinimumNumberOfAttempts_AndTheFirstAttemptExceedsTheRetryDuration_AndTheFailureIsAConnectingFailure_ARetryIsNotMade()
839+
{
840+
var callCount = 0;
841+
842+
// Short timeout to ensure it's exceeded, but minimumAttemptsForInterruptedLongRunningCalls = 2
843+
var handler = new RpcCallRetryHandler(TimeSpan.FromSeconds(2), minimumAttemptsForInterruptedLongRunningCalls: 9999);
844+
845+
var exception = await AssertThrowsAny.Exception(async () =>
846+
await handler.ExecuteWithRetries(
847+
async ct =>
848+
{
849+
callCount++;
850+
851+
// Must exceed retry duration
852+
await Task.Delay(TimeSpan.FromSeconds(5), ct);
853+
854+
if(callCount == -1) return Guid.NewGuid(); // Never called used to make the typing work.
855+
856+
throw new HalibutClientException("", new Exception(), ConnectionState.Connecting);
857+
},
858+
onRetryAction: async (_, _, _, _, _, _) =>
859+
{
860+
await Task.CompletedTask;
861+
},
862+
onTimeoutAction: async (_, _, _, _) =>
863+
{
864+
await Task.CompletedTask;
865+
},
866+
CancellationToken.None));
867+
exception.Should().BeAssignableTo<HalibutClientException>();
868+
869+
callCount.Should().Be(1);
870+
}
690871
}
691872
}

source/Octopus.Tentacle.Client/Execution/RpcCallExecutor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public async Task<T> ExecuteWithRetries<T>(
8484

8585
onErrorAction?.Invoke(lastException);
8686

87-
var remainingDurationInSeconds = (int)(totalRetryDuration - elapsedDuration).TotalSeconds;
87+
var remainingDurationInSeconds = Math.Max((int)(totalRetryDuration - elapsedDuration).TotalSeconds, 0);
8888
logger.Info($"An error occurred communicating with Tentacle. This action will be retried after {(int)sleepDuration.TotalSeconds} seconds. Retry attempt {retryCount}. Retries will be performed for up to {remainingDurationInSeconds} seconds.");
8989
logger.Verbose(lastException);
9090
},

source/Octopus.Tentacle.Client/Execution/RpcCallExecutorFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ namespace Octopus.Tentacle.Client.Execution
66
{
77
internal static class RpcCallExecutorFactory
88
{
9-
internal static RpcCallExecutor Create(TimeSpan retryDuration, ITentacleClientObserver tentacleClientObserver)
9+
internal static RpcCallExecutor Create(TimeSpan retryDuration, ITentacleClientObserver tentacleClientObserver, int? minimumAttemptsForInterruptedLongRunningCalls = null)
1010
{
11-
var rpcCallRetryHandler = new RpcCallRetryHandler(retryDuration);
11+
var rpcCallRetryHandler = new RpcCallRetryHandler(retryDuration, minimumAttemptsForInterruptedLongRunningCalls);
1212
var rpcCallExecutor = new RpcCallExecutor(rpcCallRetryHandler, tentacleClientObserver);
1313

1414
return rpcCallExecutor;

source/Octopus.Tentacle.Client/Retries/RpcCallRetryHandler.cs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,27 @@ internal class RpcCallRetryHandler
2424

2525
public delegate Task OnTimeoutAction(TimeSpan retryTimeout, TimeSpan elapsedDuration, int retryCount, CancellationToken cancellationToken);
2626

27-
public RpcCallRetryHandler(TimeSpan retryTimeout)
27+
public RpcCallRetryHandler(TimeSpan retryTimeout, int? minimumAttemptsForInterruptedLongRunningCalls = null)
2828
{
2929
RetryTimeout = retryTimeout;
30+
MinimumAttemptsForInterruptedLongRunningCalls = minimumAttemptsForInterruptedLongRunningCalls;
3031
}
3132

3233
public TimeSpan RetryTimeout { get; }
3334

35+
/// <summary>
36+
/// This is the minimum number of attempts that will be made to long-running calls,
37+
/// that are interrupted while executing.
38+
/// For example, if a long-running file upload is interrupted after ten minutes of transfering,
39+
/// and this is set to 2, we will make another attempt to upload the file even if the RetryTimeout
40+
/// is exceeded.
41+
/// If this is set to 9999, and a polling tentacle did not collect the file upload request from the queue,
42+
/// we will not continue to make attempts until we have made 9999 attempts. This is because the RPC call
43+
/// is not considered to be interrupted as the RPC call never started. This means we won't try 9999 times
44+
/// to send a request to a tentacle that is not connected.
45+
/// </summary>
46+
public int? MinimumAttemptsForInterruptedLongRunningCalls { get; }
47+
3448
public TimeSpan RetryIfRemainingDurationAtLeast { get; } = TimeSpan.FromSeconds(1);
3549

3650
public async Task<T> ExecuteWithRetries<T>(
@@ -43,9 +57,18 @@ public async Task<T> ExecuteWithRetries<T>(
4357
var started = new Stopwatch();
4458
var nextSleepDuration = TimeSpan.Zero;
4559
var totalRetryCount = 0;
60+
var totalAttemptCount = 0;
61+
bool shouldExecuteNextRetryUnderTimeout = true;
4662

4763
async Task OnRetryAction(Exception exception, TimeSpan sleepDuration, int retryCount, Context context)
4864
{
65+
// If tentacle was online (by virtue of NOT getting a connecting exception) AND we have been told
66+
// to make a min number of attempts, then our next attempt (if any) should be done without a timeout.
67+
// However, if tentacle was to be offline, then we would want to revert to retries being done under a timeout
68+
// so that we limit how long we try to communicate with a tentacle offline.
69+
// We must re-evaluate this each failed attempt, since the failure type can change on each attempt.
70+
shouldExecuteNextRetryUnderTimeout = !(MinimumAttemptsForInterruptedLongRunningCalls.HasValue && !exception.IsConnectionException());
71+
4972
if (lastException == null)
5073
{
5174
lastException = exception;
@@ -62,7 +85,7 @@ async Task OnRetryAction(Exception exception, TimeSpan sleepDuration, int retryC
6285
var elapsedDuration = started.Elapsed;
6386
var remainingRetryDuration = RetryTimeout - elapsedDuration - sleepDuration;
6487

65-
if (ShouldRetryWithRemainingDuration(remainingRetryDuration))
88+
if (ShouldRetry(remainingRetryDuration, totalAttemptCount, !shouldExecuteNextRetryUnderTimeout))
6689
{
6790
totalRetryCount = retryCount;
6891

@@ -96,25 +119,37 @@ async Task<T> ExecuteAction(CancellationToken ct)
96119
{
97120
if (isInitialAction)
98121
{
122+
totalAttemptCount++;
99123
isInitialAction = false;
100124
return await action(ct).ConfigureAwait(false);
101125
}
102126

103127
var remainingRetryDuration = RetryTimeout - started.Elapsed - nextSleepDuration;
104128

105-
if (!ShouldRetryWithRemainingDuration(remainingRetryDuration))
129+
if (!ShouldRetry(remainingRetryDuration, totalAttemptCount, !shouldExecuteNextRetryUnderTimeout))
106130
{
107-
// We are short circuiting as the retry duration has elapsed
131+
// We are short circuiting as the retry duration has elapsed and minimum attempts have been satisfied
108132
await OnTimeoutAction(null, RetryTimeout, null, null).ConfigureAwait(false);
109133
throw new TimeoutRejectedException("The delegate executed asynchronously through TimeoutPolicy did not complete within the timeout.");
110134
}
111135

112-
var timeoutPolicy = policyBuilder
113-
// Ensure the remaining retry time excludes the elapsed time
114-
.WithRetryTimeout(remainingRetryDuration)
115-
.BuildTimeoutPolicy();
116-
117-
return await timeoutPolicy.ExecuteAsync(action, ct).ConfigureAwait(false);
136+
totalAttemptCount++;
137+
138+
if (!shouldExecuteNextRetryUnderTimeout)
139+
{
140+
// Retry a long-running operation with no timeout
141+
return await action(ct);
142+
}
143+
else
144+
{
145+
var timeoutPolicy = policyBuilder
146+
// Ensure the remaining retry time excludes the elapsed time
147+
.WithRetryTimeout(remainingRetryDuration)
148+
.BuildTimeoutPolicy();
149+
150+
151+
return await timeoutPolicy.ExecuteAsync(action, ct).ConfigureAwait(false);
152+
}
118153
}
119154

120155
try
@@ -135,8 +170,12 @@ async Task<T> ExecuteAction(CancellationToken ct)
135170
throw;
136171
}
137172

138-
bool ShouldRetryWithRemainingDuration(TimeSpan remainingRetryDuration)
173+
bool ShouldRetry(TimeSpan remainingRetryDuration, int currentAttemptCount, bool retryRegardlessOfTimeout)
139174
{
175+
if (MinimumAttemptsForInterruptedLongRunningCalls.HasValue
176+
&& currentAttemptCount < MinimumAttemptsForInterruptedLongRunningCalls.Value
177+
&& retryRegardlessOfTimeout) return true;
178+
140179
return remainingRetryDuration > RetryIfRemainingDurationAtLeast;
141180
}
142181
}

source/Octopus.Tentacle.Client/Retries/RpcCallRetryPolicyBuilder.cs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,5 @@ async Task DefaultOnRetryAction(Exception exception, TimeSpan sleepDuration, int
6666
await Task.CompletedTask;
6767
}
6868
}
69-
70-
public AsyncPolicy Build()
71-
{
72-
var timeoutPolicy = BuildTimeoutPolicy();
73-
74-
var handleAndRetryPolicy = BuildRetryPolicy();
75-
76-
var policyWrap = Policy.WrapAsync(timeoutPolicy, handleAndRetryPolicy);
77-
78-
return policyWrap!;
79-
}
8069
}
8170
}

source/Octopus.Tentacle.Client/TentacleClient.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ internal TentacleClient(
7979

8080
public TimeSpan OnCancellationAbandonCompleteScriptAfter { get; set; } = TimeSpan.FromMinutes(1);
8181

82+
// Created on the fly since, most of the time we don't need this executor.
83+
RpcCallExecutor FileTransferRpcCallExecutor =>
84+
RpcCallExecutorFactory.Create(this.clientOptions.RpcRetrySettings.RetryDuration, this.tentacleClientObserver, clientOptions.MinimumAttemptsForInterruptedLongRunningCalls);
85+
8286
public async Task<UploadResult> UploadFile(string fileName, string path, DataStream package, ITentacleClientTaskLog logger, CancellationToken cancellationToken)
8387
{
8488
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();
@@ -94,7 +98,7 @@ async Task<UploadResult> UploadFileAction(CancellationToken ct)
9498

9599
try
96100
{
97-
return await rpcCallExecutor.Execute(
101+
return await FileTransferRpcCallExecutor.Execute(
98102
retriesEnabled: clientOptions.RpcRetrySettings.RetriesEnabled,
99103
RpcCall.Create<IFileTransferService>(nameof(IFileTransferService.UploadFile)),
100104
UploadFileAction,
@@ -129,7 +133,7 @@ async Task<DataStream> DownloadFileAction(CancellationToken ct)
129133

130134
try
131135
{
132-
return await rpcCallExecutor.Execute(
136+
return await FileTransferRpcCallExecutor.Execute(
133137
retriesEnabled: clientOptions.RpcRetrySettings.RetriesEnabled,
134138
RpcCall.Create<IFileTransferService>(nameof(IFileTransferService.DownloadFile)),
135139
DownloadFileAction,

source/Octopus.Tentacle.Client/TentacleClientOptions.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ namespace Octopus.Tentacle.Client
66
public class TentacleClientOptions
77
{
88
public RpcRetrySettings RpcRetrySettings { get; }
9+
10+
public int? MinimumAttemptsForInterruptedLongRunningCalls { get; }
911

10-
public TentacleClientOptions(RpcRetrySettings rpcRetrySettings)
12+
public TentacleClientOptions(RpcRetrySettings rpcRetrySettings, int? minimumAttemptsForInterruptedLongRunningCalls = null)
1113
{
1214
RpcRetrySettings = rpcRetrySettings;
15+
MinimumAttemptsForInterruptedLongRunningCalls = minimumAttemptsForInterruptedLongRunningCalls;
1316
}
1417
}
1518
}

0 commit comments

Comments
 (0)