Skip to content

Commit 1a36bd5

Browse files
authored
Harden results viewer: Increase num of retries to 60 with constant backoff of 500ms (#1821)
* Increase retries to 60 with constant backoff of 500ms * back to debug
1 parent 1a2a8a0 commit 1a36bd5

File tree

2 files changed

+82
-56
lines changed

2 files changed

+82
-56
lines changed

src/flinkStatementResultsManager.test.ts

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -219,33 +219,6 @@ describe("FlinkStatementResultsViewModel and FlinkStatementResultsManager", () =
219219
});
220220
});
221221

222-
it("should handle StopStatement message with retries", async () => {
223-
// Mock the updateSqlv1Statement to fail with 409 twice then succeed
224-
ctx.flinkSqlStatementsApi.updateSqlv1Statement
225-
.onFirstCall()
226-
.rejects(createResponseError(409, "Conflict", "{}"));
227-
ctx.flinkSqlStatementsApi.updateSqlv1Statement
228-
.onSecondCall()
229-
.rejects(createResponseError(409, "Conflict", "{}"));
230-
ctx.flinkSqlStatementsApi.updateSqlv1Statement.onThirdCall().resolves();
231-
232-
await vm.stopStatement();
233-
234-
await eventually(() => {
235-
assert.equal(ctx.flinkSqlStatementsApi.updateSqlv1Statement.callCount, 3);
236-
});
237-
});
238-
239-
it("should handle StopStatement message with max retries exceeded", async () => {
240-
// Mock the updateSqlv1Statement to always fail with 409
241-
const responseError = createResponseError(409, "Conflict", "{}");
242-
ctx.flinkSqlStatementsApi.updateSqlv1Statement.rejects(responseError);
243-
244-
// Call stop statement and expect it to throw after max retries
245-
await vm.stopStatement();
246-
assert.equal(ctx.flinkSqlStatementsApi.updateSqlv1Statement.callCount, 5);
247-
});
248-
249222
it("should stop polling when statement is not results viewable", async () => {
250223
assert.ok(ctx.manager["_pollingInterval"] as NodeJS.Timeout);
251224

@@ -276,16 +249,57 @@ describe("FlinkStatementResultsViewModel and FlinkStatementResultsManager", () =
276249
});
277250

278251
describe("with fetchResults not running in a setInterval", () => {
252+
let clock: sinon.SinonFakeTimers;
253+
279254
beforeEach(() => {
280255
clearInterval(ctx.manager["_pollingInterval"]);
281256
ctx.manager["_pollingInterval"] = undefined;
282-
283257
ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.resetHistory();
258+
259+
// TODO: Eventually, the idea would be to move this fake timer up to
260+
// the top-level describe's beforeEach.
261+
// See https://github.com/confluentinc/vscode/issues/1807
262+
clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
284263
});
285264

286-
it("should abort in-flight get results when stopping statement", async () => {
287-
// Clear the polling interval so we can control when fetchResults is called
265+
afterEach(() => {
266+
clock.restore();
267+
});
268+
269+
it("should handle StopStatement message with retries", async () => {
270+
// Mock the updateSqlv1Statement to fail with 409 twice then succeed
271+
ctx.flinkSqlStatementsApi.updateSqlv1Statement
272+
.onFirstCall()
273+
.rejects(createResponseError(409, "Conflict", "{}"));
274+
ctx.flinkSqlStatementsApi.updateSqlv1Statement
275+
.onSecondCall()
276+
.rejects(createResponseError(409, "Conflict", "{}"));
277+
ctx.flinkSqlStatementsApi.updateSqlv1Statement.onThirdCall().resolves();
278+
279+
const stopPromise = vm.stopStatement();
280+
281+
await clock.tickAsync(3000);
282+
283+
await stopPromise;
284+
285+
assert.equal(ctx.flinkSqlStatementsApi.updateSqlv1Statement.callCount, 3);
286+
});
287+
288+
it("should handle StopStatement message with max retries exceeded", async () => {
289+
// Mock the updateSqlv1Statement to always fail with 409
290+
const responseError = createResponseError(409, "Conflict", "{}");
291+
ctx.flinkSqlStatementsApi.updateSqlv1Statement.rejects(responseError);
292+
293+
// Call stop statement and expect it to throw after max retries
294+
const stopPromise = vm.stopStatement();
295+
296+
await clock.tickAsync(61 * 500);
288297

298+
await stopPromise;
299+
assert.equal(ctx.flinkSqlStatementsApi.updateSqlv1Statement.callCount, 60);
300+
});
301+
302+
it("should abort in-flight get results when stopping statement", async () => {
289303
// Create a promise that we can reject manually to simulate the aborted request
290304
let rejectRequest: (reason: Error) => void;
291305
const requestPromise = new Promise<GetSqlv1StatementResult200Response>((_resolve, reject) => {
@@ -350,12 +364,17 @@ describe("FlinkStatementResultsViewModel and FlinkStatementResultsManager", () =
350364
});
351365

352366
// Trigger a fetch
353-
await ctx.manager.fetchResults();
367+
const fetchPromise = ctx.manager.fetchResults();
368+
369+
// Advance time to trigger retries
370+
await clock.tickAsync(500);
371+
await clock.tickAsync(500);
372+
await clock.tickAsync(500);
373+
374+
await fetchPromise;
354375

355376
// Verify the request was made 3 times
356-
await eventually(() => {
357-
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 3);
358-
});
377+
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 3);
359378
});
360379

361380
it("should handle fetch results with max retries exceeded", async () => {
@@ -364,13 +383,16 @@ describe("FlinkStatementResultsViewModel and FlinkStatementResultsManager", () =
364383
ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.rejects(responseError);
365384

366385
// Trigger a fetch
367-
await ctx.manager.fetchResults();
386+
const fetchPromise = ctx.manager.fetchResults();
368387

369-
await eventually(() => {
370-
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 5);
371-
// Verify error state is set
372-
assert.ok(ctx.manager["_latestError"]());
373-
});
388+
// Advance time to trigger all retries
389+
await clock.tickAsync(61 * 500);
390+
391+
await fetchPromise;
392+
393+
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 60);
394+
// Verify error state is set
395+
assert.ok(ctx.manager["_latestError"]());
374396
});
375397

376398
it("should not retry on non-409 errors during fetch", async () => {
@@ -379,13 +401,16 @@ describe("FlinkStatementResultsViewModel and FlinkStatementResultsManager", () =
379401
ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.rejects(responseError);
380402

381403
// Trigger a fetch
382-
await ctx.manager.fetchResults();
404+
const fetchPromise = ctx.manager.fetchResults();
383405

384-
await eventually(() => {
385-
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 1);
386-
// Verify error state is set
387-
assert.ok(ctx.manager["_latestError"]());
388-
});
406+
// Advance time to ensure no retries happen
407+
await clock.tickAsync(1000);
408+
409+
await fetchPromise;
410+
411+
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 1);
412+
// Verify error state is set
413+
assert.ok(ctx.manager["_latestError"]());
389414
});
390415

391416
it("should only allow one instance of fetchResults to run at a time", async () => {
@@ -405,11 +430,10 @@ describe("FlinkStatementResultsViewModel and FlinkStatementResultsManager", () =
405430
ctx.manager.fetchResults(),
406431
ctx.manager.fetchResults(),
407432
ctx.manager.fetchResults(),
408-
ctx.manager.fetchResults(),
409433
];
410434

411-
// Wait a bit to ensure all calls have started
412-
await new Promise((resolve) => setTimeout(resolve, 50));
435+
// Advance time to ensure all calls have started
436+
await clock.tickAsync(50);
413437

414438
// Verify only one API call was made
415439
assert.equal(ctx.flinkSqlStatementResultsApi.getSqlv1StatementResult.callCount, 1);

src/flinkStatementResultsManager.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ export class FlinkStatementResultsManager {
209209
const currentResults = this._results();
210210
const pageToken = this.extractPageToken(this._latestResult()?.metadata?.next);
211211

212-
const response = await this.retryWithBackoff(async () => {
212+
const response = await this.retry(async () => {
213213
return await this._flinkStatementResultsSqlApi.getSqlv1StatementResult(
214214
{
215215
environment_id: this.statement.environmentId,
@@ -334,12 +334,15 @@ export class FlinkStatementResultsManager {
334334
);
335335
}
336336

337-
private async retryWithBackoff<T>(
337+
/**
338+
* Retries {@link maxRetries} times with a constant backoff delay of
339+
* {@link backoffMs}. Nothing fancy.
340+
*/
341+
private async retry<T>(
338342
operation: () => Promise<T>,
339343
operationName: string,
340-
maxRetries: number = 5,
341-
initialBackoffMs: number = 100,
342-
maxBackoffMs: number = 10_000,
344+
maxRetries: number = 60,
345+
backoffMs: number = 500,
343346
): Promise<T> {
344347
let lastErr: Error | undefined;
345348
for (let attempt = 0; attempt < maxRetries; attempt++) {
@@ -349,7 +352,6 @@ export class FlinkStatementResultsManager {
349352
lastErr = err as Error;
350353
if (isResponseErrorWithStatus(err, 409)) {
351354
if (attempt < maxRetries - 1) {
352-
const backoffMs = Math.min(initialBackoffMs * Math.pow(2, attempt), maxBackoffMs);
353355
logger.debug(
354356
`Retrying ${operationName} after 409 conflict. Attempt ${attempt + 1}/${maxRetries}. Waiting ${backoffMs}ms`,
355357
);
@@ -369,7 +371,7 @@ export class FlinkStatementResultsManager {
369371
this._getResultsAbortController.abort();
370372

371373
try {
372-
await this.retryWithBackoff(async () => {
374+
await this.retry(async () => {
373375
await this.refreshStatement();
374376
await this._stopStatement();
375377
}, "stop statement");

0 commit comments

Comments
 (0)