Skip to content

Commit 6e710e3

Browse files
authored
Merge pull request #425: Add RpcRetryOption and use longer retry interval on RESOURCE_EXHAUSTED
2 parents 8cdcc18 + 90a2a1c commit 6e710e3

13 files changed

+415
-40
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"spiral/attributes": "^3.1.4",
3737
"spiral/roadrunner": "^2023.3.12 || ^2024.1",
3838
"spiral/roadrunner-cli": "^2.5",
39-
"spiral/roadrunner-kv": "^4.0",
39+
"spiral/roadrunner-kv": "^4.2",
4040
"spiral/roadrunner-worker": "^3.0",
4141
"symfony/filesystem": "^5.4 || ^6.0 || ^7.0",
4242
"symfony/http-client": "^5.4 || ^6.0 || ^7.0",

psalm-baseline.xml

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<files psalm-version="5.23.1@8471a896ccea3526b26d082f4461eeea467f10a4">
2+
<files psalm-version="5.24.0@462c80e31c34e58cc4f750c656be3927e80e550e">
33
<file src="src/Activity.php">
44
<ImplicitToStringCast>
55
<code><![CDATA[$type]]></code>
@@ -43,15 +43,6 @@
4343
<code><![CDATA[recordHeartbeatByToken]]></code>
4444
</MissingReturnType>
4545
</file>
46-
<file src="src/Client/GRPC/BaseClient.php">
47-
<ArgumentTypeCoercion>
48-
<code><![CDATA[(int)$waitRetry->totalMicroseconds]]></code>
49-
</ArgumentTypeCoercion>
50-
<InvalidArgument>
51-
<code><![CDATA[$retryOption->maximumInterval]]></code>
52-
<code><![CDATA[$waitRetry]]></code>
53-
</InvalidArgument>
54-
</file>
5546
<file src="src/Client/GRPC/Context.php">
5647
<ArgumentTypeCoercion>
5748
<code><![CDATA[$format]]></code>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Temporal\Client\Common;
6+
7+
/**
8+
* Used to throttle code execution in presence of failures using exponential backoff logic.
9+
*
10+
* The formula used to calculate the next sleep interval is:
11+
*
12+
* ```
13+
* jitter = rand(-maxJitterCoefficient, +maxJitterCoefficient)
14+
* wait = min(pow(backoffCoefficient, failureCount - 1) * initialInterval * (1 + jitter), maxInterval)
15+
* ```
16+
*
17+
* Note
18+
* `initialInterval` may be changed in runtime depending on the failure type.
19+
* That it means that attempt X can possibly get a shorter throttle than attempt X-1.
20+
*
21+
* Example:
22+
*
23+
* ```php
24+
* $throttler = new BackoffThrottler(maxInterval: 60_000, 0.1, 2.0);
25+
*
26+
* // First retry
27+
* // There 1000 is initial interval for the RESOURCE_EXHAUSTED exception
28+
* $throttler->calculateSleepTime(failureCount: 1, baseInterval: 1000);
29+
*
30+
* // Second retry
31+
* // There 500 is a common initial interval for all other exceptions
32+
* $throttler->calculateSleepTime(failureCount: 2, baseInterval: 500);
33+
* ```
34+
*
35+
* @internal
36+
*/
37+
final class BackoffThrottler
38+
{
39+
/**
40+
* @param int $maxInterval Maximum sleep interval in milliseconds. Must be greater than 0.
41+
* @param float $maxJitterCoefficient Maximum jitter to apply. Must be in the range [0.0, 1.0).
42+
* 0.1 means that actual retry time can be +/- 10% of the calculated time.
43+
* @param float $backoffCoefficient Coefficient used to calculate the next retry backoff interval.
44+
* The next retry interval is previous interval multiplied by this coefficient.
45+
* Must be greater than 1.0.
46+
*/
47+
public function __construct(
48+
private readonly int $maxInterval,
49+
private readonly float $maxJitterCoefficient,
50+
private readonly float $backoffCoefficient,
51+
) {
52+
$maxJitterCoefficient >= 0 && $maxJitterCoefficient < 1 or throw new \InvalidArgumentException(
53+
'$jitterCoefficient must be in the range [0.0, 1.0).',
54+
);
55+
$this->maxInterval > 0 or throw new \InvalidArgumentException('$maxInterval must be greater than 0.');
56+
$this->backoffCoefficient >= 1.0 or throw new \InvalidArgumentException(
57+
'$backoffCoefficient must be greater than 1.',
58+
);
59+
}
60+
61+
/**
62+
* Calculates the next sleep interval in milliseconds.
63+
*
64+
* @param int $failureCount number of failures
65+
* @param int $initialInterval in milliseconds
66+
*
67+
* @return int<0, max>
68+
*
69+
* @psalm-assert int<1, max> $failureCount
70+
* @psalm-assert int<1, max> $initialInterval
71+
*
72+
* @psalm-suppress InvalidOperand
73+
*/
74+
public function calculateSleepTime(int $failureCount, int $initialInterval): int
75+
{
76+
$failureCount > 0 or throw new \InvalidArgumentException('$failureCount must be greater than 0.');
77+
$initialInterval > 0 or throw new \InvalidArgumentException('$initialInterval must be greater than 0.');
78+
79+
// Choose a random number in the range -maxJitterCoefficient ... +maxJitterCoefficient
80+
$jitter = \random_int(-1000, 1000) * $this->maxJitterCoefficient / 1000;
81+
$sleepTime = \min(
82+
\pow($this->backoffCoefficient, $failureCount - 1) * $initialInterval * (1.0 + $jitter),
83+
$this->maxInterval,
84+
);
85+
86+
return \abs((int)$sleepTime);
87+
}
88+
}

src/Client/Common/ClientContextInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public function withTimeout(float $timeout): static;
2020
*/
2121
public function withDeadline(\DateTimeInterface $deadline): static;
2222

23-
public function withRetryOptions(RetryOptions $options): static;
23+
public function withRetryOptions(RpcRetryOptions $options): static;
2424

2525
/**
2626
* A metadata map to send to the server

src/Client/Common/ClientContextTrait.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public function withDeadline(\DateTimeInterface $deadline): static
4747
return $new;
4848
}
4949

50-
public function withRetryOptions(RetryOptions $options): static
50+
public function withRetryOptions(RpcRetryOptions $options): static
5151
{
5252
$new = clone $this;
5353
$context = $new->client->getContext();

src/Client/Common/RpcRetryOptions.php

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Temporal\Client\Common;
6+
7+
use JetBrains\PhpStorm\Pure;
8+
use Temporal\Common\RetryOptions;
9+
use Temporal\Internal\Support\DateInterval;
10+
11+
/**
12+
* @psalm-import-type DateIntervalValue from DateInterval
13+
* @psalm-immutable
14+
*/
15+
final class RpcRetryOptions extends RetryOptions
16+
{
17+
/**
18+
* Interval of the first retry, on congestion related failures (i.e. RESOURCE_EXHAUSTED errors).
19+
*
20+
* If coefficient is 1.0 then it is used for all retries. Defaults to 1000ms.
21+
*/
22+
public ?\DateInterval $congestionInitialInterval = null;
23+
24+
/**
25+
* Maximum amount of jitter to apply.
26+
* Must be lower than 1.
27+
*
28+
* 0.1 means that actual retry time can be +/- 10% of the calculated time.
29+
*/
30+
public float $maximumJitterCoefficient = 0.1;
31+
32+
/**
33+
* Converts {@see RetryOptions} to {@see RpcRetryOptions}.
34+
*
35+
* @internal
36+
*/
37+
public static function fromRetryOptions(RetryOptions $options): self
38+
{
39+
return $options instanceof self ? $options : (new self())
40+
->withInitialInterval($options->initialInterval)
41+
->withBackoffCoefficient($options->backoffCoefficient)
42+
->withMaximumInterval($options->maximumInterval)
43+
->withMaximumAttempts($options->maximumAttempts)
44+
->withNonRetryableExceptions($options->nonRetryableExceptions);
45+
}
46+
47+
/**
48+
* Interval of the first retry, on congestion related failures (i.e. RESOURCE_EXHAUSTED errors).
49+
* If coefficient is 1.0 then it is used for all retries. Defaults to 1000ms.
50+
*
51+
* @param DateIntervalValue|null $interval Interval to wait on first retry, on congestion failures.
52+
* Defaults to 1000ms, which is used if set to {@see null}.
53+
*
54+
* @return self
55+
*
56+
* @psalm-suppress ImpureMethodCall
57+
*/
58+
#[Pure]
59+
public function withCongestionInitialInterval($interval): self
60+
{
61+
$interval === null || DateInterval::assert($interval) or throw new \InvalidArgumentException(
62+
'Invalid interval value.'
63+
);
64+
65+
$self = clone $this;
66+
$self->congestionInitialInterval = DateInterval::parseOrNull($interval, DateInterval::FORMAT_SECONDS);
67+
return $self;
68+
}
69+
70+
/**
71+
* Maximum amount of jitter to apply.
72+
*
73+
* 0.2 means that actual retry time can be +/- 20% of the calculated time.
74+
* Set to 0 to disable jitter. Must be lower than 1.
75+
*
76+
* @param null|float $coefficient Maximum amount of jitter.
77+
* Default will be used if set to {@see null}.
78+
*
79+
* @return self
80+
*/
81+
#[Pure]
82+
public function withMaximumJitterCoefficient(?float $coefficient): self
83+
{
84+
$coefficient === null || ($coefficient >= 0.0 && $coefficient < 1.0) or throw new \InvalidArgumentException(
85+
'Maximum jitter coefficient must be in range [0, 1).'
86+
);
87+
88+
$self = clone $this;
89+
$self->maximumJitterCoefficient = $coefficient ?? 0.1;
90+
return $self;
91+
}
92+
}

src/Client/GRPC/BaseClient.php

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
use Grpc\UnaryCall;
2020
use Temporal\Api\Workflowservice\V1\GetSystemInfoRequest;
2121
use Temporal\Api\Workflowservice\V1\WorkflowServiceClient;
22+
use Temporal\Client\Common\BackoffThrottler;
23+
use Temporal\Client\Common\RpcRetryOptions;
2224
use Temporal\Client\Common\ServerCapabilities;
2325
use Temporal\Client\GRPC\Connection\Connection;
2426
use Temporal\Client\GRPC\Connection\ConnectionInterface;
@@ -272,18 +274,11 @@ protected function invoke(string $method, object $arg, ?ContextInterface $ctx =
272274
private function call(string $method, object $arg, ContextInterface $ctx): object
273275
{
274276
$attempt = 0;
275-
$retryOption = $ctx->getRetryOptions();
276-
277-
$maxInterval = null;
278-
if ($retryOption->maximumInterval !== null) {
279-
$maxInterval = CarbonInterval::create($retryOption->maximumInterval);
280-
}
281-
282-
$waitRetry = $retryOption->initialInterval ?? CarbonInterval::millisecond(500);
283-
$waitRetry = CarbonInterval::create($waitRetry);
277+
$retryOption = RpcRetryOptions::fromRetryOptions($ctx->getRetryOptions());
278+
$initialIntervalMs = $congestionInitialIntervalMs = $throttler = null;
284279

285280
do {
286-
$attempt++;
281+
++$attempt;
287282
try {
288283
$options = $ctx->getOptions();
289284
$deadline = $ctx->getDeadline();
@@ -312,23 +307,40 @@ private function call(string $method, object $arg, ContextInterface $ctx): objec
312307
}
313308

314309
if ($retryOption->maximumAttempts !== 0 && $attempt >= $retryOption->maximumAttempts) {
310+
// Reached maximum attempts
315311
throw $e;
316312
}
317313

318314
if ($ctx->getDeadline() !== null && $ctx->getDeadline() > new DateTimeImmutable()) {
315+
// Deadline is reached
319316
throw new TimeoutException('Call timeout has been reached');
320317
}
321318

322-
// wait till the next call
323-
$this->usleep((int)$waitRetry->totalMicroseconds);
324-
325-
$waitRetry = CarbonInterval::millisecond(
326-
$waitRetry->totalMilliseconds * $retryOption->backoffCoefficient
319+
// Init interval values in milliseconds
320+
$initialIntervalMs ??= $retryOption->initialInterval === null
321+
? (int)CarbonInterval::millisecond(50)->totalMilliseconds
322+
: (int)(new CarbonInterval($retryOption->initialInterval))->totalMilliseconds;
323+
$congestionInitialIntervalMs ??= $retryOption->congestionInitialInterval === null
324+
? (int)CarbonInterval::millisecond(1000)->totalMilliseconds
325+
: (int)(new CarbonInterval($retryOption->congestionInitialInterval))->totalMilliseconds;
326+
327+
$throttler ??= new BackoffThrottler(
328+
maxInterval: $retryOption->maximumInterval !== null
329+
? (int)(new CarbonInterval($retryOption->maximumInterval))->totalMilliseconds
330+
: $initialIntervalMs * 200,
331+
maxJitterCoefficient: $retryOption->maximumJitterCoefficient,
332+
backoffCoefficient: $retryOption->backoffCoefficient
327333
);
328334

329-
if ($maxInterval !== null && $maxInterval->totalMilliseconds < $waitRetry->totalMilliseconds) {
330-
$waitRetry = $maxInterval;
331-
}
335+
// Initial interval always depends on the *most recent* failure.
336+
$baseInterval = $e->getCode() === StatusCode::RESOURCE_EXHAUSTED
337+
? $congestionInitialIntervalMs
338+
: $initialIntervalMs;
339+
340+
$wait = $throttler->calculateSleepTime(failureCount: $attempt, initialInterval: $baseInterval);
341+
342+
// wait till the next call
343+
$this->usleep($wait);
332344
}
333345
} while (true);
334346
}

src/Client/GRPC/Context.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Temporal\Client\GRPC;
1313

1414
use Carbon\CarbonInterval;
15+
use Temporal\Client\Common\RpcRetryOptions;
1516
use Temporal\Common\RetryOptions;
1617
use Temporal\Common\SdkVersion;
1718
use Temporal\Internal\Support\DateInterval;
@@ -25,7 +26,7 @@ final class Context implements ContextInterface
2526

2627
private function __construct()
2728
{
28-
$this->retryOptions = RetryOptions::new()
29+
$this->retryOptions = RpcRetryOptions::new()
2930
->withMaximumAttempts(0)
3031
->withInitialInterval(CarbonInterval::millisecond(500));
3132

src/Common/RetryOptions.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public function mergeWith(MethodRetry $retry = null): self
127127
* @psalm-suppress ImpureMethodCall
128128
*
129129
* @param DateIntervalValue|null $interval
130-
* @return self
130+
* @return static
131131
*/
132132
#[Pure]
133133
public function withInitialInterval($interval): self
@@ -143,7 +143,7 @@ public function withInitialInterval($interval): self
143143
* @psalm-suppress ImpureMethodCall
144144
*
145145
* @param float $coefficient
146-
* @return self
146+
* @return static
147147
*/
148148
#[Pure]
149149
public function withBackoffCoefficient(float $coefficient): self
@@ -159,7 +159,7 @@ public function withBackoffCoefficient(float $coefficient): self
159159
* @psalm-suppress ImpureMethodCall
160160
*
161161
* @param DateIntervalValue|null $interval
162-
* @return self
162+
* @return static
163163
*/
164164
#[Pure]
165165
public function withMaximumInterval($interval): self
@@ -175,7 +175,7 @@ public function withMaximumInterval($interval): self
175175
* @psalm-suppress ImpureMethodCall
176176
*
177177
* @param int<0, max> $attempts
178-
* @return self
178+
* @return static
179179
*/
180180
#[Pure]
181181
public function withMaximumAttempts(int $attempts): self
@@ -192,7 +192,7 @@ public function withMaximumAttempts(int $attempts): self
192192
* @psalm-suppress ImpureMethodCall
193193
*
194194
* @param ExceptionsList $exceptions
195-
* @return self
195+
* @return static
196196
*/
197197
#[Pure]
198198
public function withNonRetryableExceptions(array $exceptions): self

src/Workflow/ChildWorkflowOptions.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public function __construct()
181181
* @param CronSchedule|null $cron
182182
* @return $this
183183
*/
184-
public function mergeWith(MethodRetry $retry = null, CronSchedule $cron = null): self
184+
public function mergeWith(?MethodRetry $retry = null, ?CronSchedule $cron = null): self
185185
{
186186
$self = clone $this;
187187

0 commit comments

Comments
 (0)