Skip to content

Commit 19c26cc

Browse files
Fix engine progress calls (#923)
1 parent 42f97e4 commit 19c26cc

File tree

4 files changed

+112
-84
lines changed

4 files changed

+112
-84
lines changed

packages/engine/src/lib/handler/flow-executor.ts

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logger, runWithTemporaryContext } from '@openops/server-shared';
1+
import { runWithTemporaryContext } from '@openops/server-shared';
22
import {
33
Action,
44
ActionType,
@@ -57,7 +57,7 @@ export const flowExecutor = {
5757
? output.setVerdict(ExecutionVerdict.SUCCEEDED, output.verdictResponse)
5858
: output;
5959

60-
sendProgress(newContext, constants);
60+
await sendProgress(newContext, constants);
6161

6262
return newContext.toResponse();
6363
},
@@ -103,7 +103,7 @@ export const flowExecutor = {
103103
duration: stepEndTime - stepStartTime,
104104
});
105105

106-
sendProgress(flowExecutionContext, constants);
106+
await sendProgress(flowExecutionContext, constants);
107107

108108
if (flowExecutionContext.verdict !== ExecutionVerdict.RUNNING) {
109109
break;
@@ -121,13 +121,9 @@ export const flowExecutor = {
121121
function sendProgress(
122122
flowExecutionContext: FlowExecutorContext,
123123
constants: EngineConstants,
124-
): void {
125-
progressService
126-
.sendUpdate({
127-
engineConstants: constants,
128-
flowExecutorContext: flowExecutionContext,
129-
})
130-
.catch((error) => {
131-
logger.error('Error sending progress update', error);
132-
});
124+
): Promise<void> {
125+
return progressService.sendUpdate({
126+
engineConstants: constants,
127+
flowExecutorContext: flowExecutionContext,
128+
});
133129
}

packages/engine/src/lib/services/progress.service.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1+
import { makeHttpRequest } from '@openops/common';
12
import { hashUtils, logger } from '@openops/server-shared';
23
import { UpdateRunProgressRequest } from '@openops/shared';
34
import { Mutex } from 'async-mutex';
5+
import { AxiosHeaders } from 'axios';
46
import { EngineConstants } from '../handler/context/engine-constants';
57
import { FlowExecutorContext } from '../handler/context/flow-execution-context';
68
import { throwIfExecutionTimeExceeded } from '../timeout-validator';
79

8-
let lastRequestHash: string | undefined = undefined;
10+
const MAX_RETRIES = 3;
911

12+
let lastRequestHash: string | undefined = undefined;
1013
const lock = new Mutex();
1114

1215
export const progressService = {
@@ -42,6 +45,10 @@ const sendUpdateRunRequest = async (
4245
progressUpdateType: engineConstants.progressUpdateType,
4346
};
4447

48+
logger.debug(
49+
`Sending progress update for ${request.runId} ${request.runDetails.status}`,
50+
);
51+
4552
// Request deduplication using hash comparison
4653
const requestHash = hashUtils.hashObject(request, (key, value) => {
4754
if (key === 'duration') return undefined;
@@ -54,14 +61,28 @@ const sendUpdateRunRequest = async (
5461

5562
lastRequestHash = requestHash;
5663

57-
await fetch(url.toString(), {
58-
method: 'POST',
59-
headers: {
60-
'Content-Type': 'application/json',
61-
Authorization: `Bearer ${engineConstants.engineToken}`,
62-
},
63-
body: JSON.stringify(request),
64-
});
64+
try {
65+
await makeHttpRequest(
66+
'POST',
67+
url.toString(),
68+
new AxiosHeaders({
69+
'Content-Type': 'application/json',
70+
Authorization: `Bearer ${engineConstants.engineToken}`,
71+
}),
72+
request,
73+
{
74+
retries: MAX_RETRIES,
75+
retryDelay: (retryCount: number) => {
76+
return (retryCount + 1) * 200; // 200ms, 400ms, 600ms
77+
},
78+
},
79+
);
80+
} catch (error) {
81+
logger.error(
82+
`Progress update failed after ${MAX_RETRIES} retries for status ${request.runDetails.status} on run ${request.runId}`,
83+
{ error },
84+
);
85+
}
6586
};
6687

6788
type UpdateStepProgressParams = {

packages/engine/test/services/progress.service.test.ts

Lines changed: 69 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ jest.mock('../../src/lib/timeout-validator', () => ({
1515
throwIfExecutionTimeExceeded: jest.fn(),
1616
}));
1717

18-
const mockThrowIfExecutionTimeExceeded = throwIfExecutionTimeExceeded as jest.MockedFunction<typeof throwIfExecutionTimeExceeded>;
18+
jest.mock('@openops/common', () => ({
19+
makeHttpRequest: jest.fn(),
20+
}));
1921

20-
global.fetch = jest.fn();
22+
const mockThrowIfExecutionTimeExceeded = throwIfExecutionTimeExceeded as jest.MockedFunction<typeof throwIfExecutionTimeExceeded>;
23+
const mockMakeHttpRequest = require('@openops/common').makeHttpRequest as jest.MockedFunction<any>;
2124

2225
describe('Progress Service', () => {
2326
const mockParams = {
@@ -45,10 +48,11 @@ describe('Progress Service', () => {
4548
// Reset the timeout mock to not throw by default
4649
mockThrowIfExecutionTimeExceeded.mockReset();
4750

48-
(global.fetch as jest.Mock).mockResolvedValue({
49-
ok: true,
50-
status: 200,
51-
});
51+
mockMakeHttpRequest.mockResolvedValue({});
52+
53+
// Reset the global lastRequestHash by calling with unique params
54+
// This ensures no deduplication issues between tests
55+
jest.clearAllMocks();
5256
});
5357

5458
afterEach(() => {
@@ -68,14 +72,19 @@ describe('Progress Service', () => {
6872
await progressService.sendUpdate(successParams);
6973

7074
expect(successParams.flowExecutorContext.toResponse).toHaveBeenCalled();
71-
expect(global.fetch).toHaveBeenCalledWith(
75+
expect(mockMakeHttpRequest).toHaveBeenCalledWith(
76+
'POST',
7277
'http://localhost:3000/v1/engine/update-run',
78+
expect.any(Object),
7379
expect.objectContaining({
74-
method: 'POST',
75-
headers: {
76-
'Content-Type': 'application/json',
77-
Authorization: 'Bearer test-token',
78-
},
80+
executionCorrelationId: 'test-correlation-id-success',
81+
runId: 'test-run-id',
82+
workerHandlerId: 'test-handler-id',
83+
progressUpdateType: 'WEBHOOK_RESPONSE',
84+
}),
85+
expect.objectContaining({
86+
retries: 3,
87+
retryDelay: expect.any(Function),
7988
})
8089
);
8190
});
@@ -109,11 +118,12 @@ describe('Progress Service', () => {
109118

110119
await progressService.sendUpdate(uniqueParams);
111120

112-
expect(global.fetch).toHaveBeenCalledTimes(1);
113-
const fetchCall = (global.fetch as jest.Mock).mock.calls[0];
114-
const [url, options] = fetchCall;
115-
const requestBody = JSON.parse(options.body);
121+
expect(mockMakeHttpRequest).toHaveBeenCalledTimes(1);
122+
const call = mockMakeHttpRequest.mock.calls[0];
123+
const [method, url, _, requestBody] = call;
116124

125+
expect(method).toBe('POST');
126+
expect(url).toBe('http://localhost:3000/v1/engine/update-run');
117127
expect(requestBody).toEqual(
118128
expect.objectContaining({
119129
executionCorrelationId: 'test-correlation-id-payload',
@@ -142,10 +152,9 @@ describe('Progress Service', () => {
142152

143153
await progressService.sendUpdate(paramsWithoutHandlerId);
144154

145-
expect(global.fetch).toHaveBeenCalledTimes(1);
146-
const fetchCall = (global.fetch as jest.Mock).mock.calls[0];
147-
const [url, options] = fetchCall;
148-
const requestBody = JSON.parse(options.body);
155+
expect(mockMakeHttpRequest).toHaveBeenCalledTimes(1);
156+
const call = mockMakeHttpRequest.mock.calls[0];
157+
const [_, __, ___, requestBody] = call;
149158

150159
expect(requestBody.workerHandlerId).toBe(null);
151160
});
@@ -162,7 +171,7 @@ describe('Progress Service', () => {
162171
await progressService.sendUpdate(duplicateParams);
163172
await progressService.sendUpdate(duplicateParams);
164173

165-
expect(global.fetch).toHaveBeenCalledTimes(1);
174+
expect(mockMakeHttpRequest).toHaveBeenCalledTimes(1);
166175
});
167176

168177
it('should send different requests when content changes', async () => {
@@ -185,7 +194,7 @@ describe('Progress Service', () => {
185194
await progressService.sendUpdate(params1);
186195
await progressService.sendUpdate(params2);
187196

188-
expect(global.fetch).toHaveBeenCalledTimes(2);
197+
expect(mockMakeHttpRequest).toHaveBeenCalledTimes(2);
189198
});
190199

191200
it('should deduplicate requests with different durations but same content', async () => {
@@ -224,7 +233,7 @@ describe('Progress Service', () => {
224233
await progressService.sendUpdate(params1);
225234
await progressService.sendUpdate(params2);
226235

227-
expect(global.fetch).toHaveBeenCalledTimes(1);
236+
expect(mockMakeHttpRequest).toHaveBeenCalledTimes(1);
228237
expect(params1.flowExecutorContext.toResponse).toHaveBeenCalledTimes(1);
229238
expect(params2.flowExecutorContext.toResponse).toHaveBeenCalledTimes(1);
230239
});
@@ -265,47 +274,11 @@ describe('Progress Service', () => {
265274
await progressService.sendUpdate(params1);
266275
await progressService.sendUpdate(params2);
267276

268-
expect(global.fetch).toHaveBeenCalledTimes(2);
277+
expect(mockMakeHttpRequest).toHaveBeenCalledTimes(2);
269278
expect(params1.flowExecutorContext.toResponse).toHaveBeenCalledTimes(1);
270279
expect(params2.flowExecutorContext.toResponse).toHaveBeenCalledTimes(1);
271280
});
272281

273-
it('should use mutex for thread safety', async () => {
274-
const concurrentParams = {
275-
...mockParams,
276-
engineConstants: {
277-
...mockParams.engineConstants,
278-
executionCorrelationId: 'test-correlation-id-concurrent',
279-
},
280-
};
281-
282-
// Make multiple concurrent requests
283-
const promises = [
284-
progressService.sendUpdate(concurrentParams),
285-
progressService.sendUpdate(concurrentParams),
286-
progressService.sendUpdate(concurrentParams),
287-
];
288-
289-
await Promise.all(promises);
290-
291-
// Due to mutex locking and request deduplication, should only make one request
292-
expect(global.fetch).toHaveBeenCalledTimes(1);
293-
});
294-
295-
it('should handle fetch errors gracefully', async () => {
296-
const errorParams = {
297-
...mockParams,
298-
engineConstants: {
299-
...mockParams.engineConstants,
300-
executionCorrelationId: 'test-correlation-id-error',
301-
},
302-
};
303-
304-
(global.fetch as jest.Mock).mockRejectedValue(new Error('Network error'));
305-
306-
await expect(progressService.sendUpdate(errorParams)).rejects.toThrow('Network error');
307-
});
308-
309282
it('should construct correct URL', async () => {
310283
const paramsWithDifferentUrl = {
311284
...mockParams,
@@ -318,13 +291,15 @@ describe('Progress Service', () => {
318291

319292
await progressService.sendUpdate(paramsWithDifferentUrl);
320293

321-
expect(global.fetch).toHaveBeenCalledWith(
294+
expect(mockMakeHttpRequest).toHaveBeenCalledWith(
295+
'POST',
322296
'https://api.example.com/v1/engine/update-run',
297+
expect.any(Object),
298+
expect.any(Object),
323299
expect.any(Object)
324300
);
325301
});
326302

327-
328303
it('should throw error when execution time is exceeded', async () => {
329304
const timeoutParams = {
330305
...mockParams,
@@ -341,8 +316,39 @@ describe('Progress Service', () => {
341316

342317
await expect(progressService.sendUpdate(timeoutParams)).rejects.toThrow('Execution time exceeded');
343318
expect(mockThrowIfExecutionTimeExceeded).toHaveBeenCalledTimes(1);
344-
expect(global.fetch).not.toHaveBeenCalled();
319+
expect(mockMakeHttpRequest).not.toHaveBeenCalled();
345320
expect(timeoutParams.flowExecutorContext.toResponse).not.toHaveBeenCalled();
346321
});
322+
323+
it('should use correct retry configuration', async () => {
324+
const retryParams = {
325+
...mockParams,
326+
engineConstants: {
327+
...mockParams.engineConstants,
328+
executionCorrelationId: 'test-correlation-id-retry',
329+
},
330+
};
331+
332+
await progressService.sendUpdate(retryParams);
333+
334+
expect(mockMakeHttpRequest).toHaveBeenCalledWith(
335+
'POST',
336+
'http://localhost:3000/v1/engine/update-run',
337+
expect.any(Object),
338+
expect.any(Object),
339+
expect.objectContaining({
340+
retries: 3,
341+
retryDelay: expect.any(Function),
342+
})
343+
);
344+
345+
// Test the retry delay function
346+
const call = mockMakeHttpRequest.mock.calls[0];
347+
const [_, __, ___, ____, options] = call;
348+
349+
expect(options.retryDelay(0)).toBe(200); // 1st retry: 200ms
350+
expect(options.retryDelay(1)).toBe(400); // 2nd retry: 400ms
351+
expect(options.retryDelay(2)).toBe(600); // 3rd retry: 600ms
352+
});
347353
});
348354
});

packages/server/api/src/app/workers/engine-controller.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
} from '@fastify/type-provider-typebox';
55
import {
66
GetRunForWorkerRequest,
7+
logger,
78
SharedSystemProp,
89
system,
910
UpdateFailureCountRequest,
@@ -125,6 +126,10 @@ export const flowEngineWorker: FastifyPluginAsyncTypebox = async (app) => {
125126
);
126127
}
127128

129+
logger.debug(
130+
`Updating run ${runId} to ${getTerminalStatus(runDetails.status)}`,
131+
);
132+
128133
const populatedRun = await flowRunService.updateStatus({
129134
flowRunId: runId,
130135
status: getTerminalStatus(runDetails.status),

0 commit comments

Comments
 (0)