Skip to content

Commit c81de26

Browse files
committed
use single phase of resolver abortSignal cancellation
Previously, we used a lazily created abortSignal per resolver so as to shift triggering the abortSignal for a resolver that returned a streamed asyncIterable from the original executor to the stream. This is expensive and difficult to reason about and has been replaced by a single phase of cancellation when the entire execution finishes. Note: when executing a subscription, we are still able to cancel resolver abortSignals after execution of each subscription event.
1 parent b52b05e commit c81de26

File tree

5 files changed

+65
-91
lines changed

5 files changed

+65
-91
lines changed

src/execution/Executor.ts

Lines changed: 38 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -217,42 +217,41 @@ export class Executor<
217217
> {
218218
validatedExecutionArgs: ValidatedExecutionArgs;
219219
finished: boolean;
220-
initialResponseAbortController: AbortController | undefined;
221-
resolverAbortControllers: Map<Path, AbortController>;
222220
collectedErrors: CollectedErrors;
221+
internalAbortController: AbortController;
222+
removeExternalAbortListener: (() => void) | undefined;
223+
resolverAbortController: AbortController | undefined;
224+
sharedResolverAbortSignal: AbortSignal;
223225

224-
constructor(validatedExecutionArgs: ValidatedExecutionArgs) {
226+
constructor(
227+
validatedExecutionArgs: ValidatedExecutionArgs,
228+
sharedResolverAbortSignal?: AbortSignal,
229+
) {
225230
this.validatedExecutionArgs = validatedExecutionArgs;
226231
this.finished = false;
227-
this.resolverAbortControllers = new Map();
228232
this.collectedErrors = new CollectedErrors();
233+
this.internalAbortController = new AbortController();
234+
235+
if (sharedResolverAbortSignal === undefined) {
236+
this.resolverAbortController = new AbortController();
237+
this.sharedResolverAbortSignal = this.resolverAbortController.signal;
238+
} else {
239+
this.sharedResolverAbortSignal = sharedResolverAbortSignal;
240+
}
229241
}
230242

231243
executeQueryOrMutationOrSubscriptionEvent(): PromiseOrValue<
232244
ExecutionResult | TAlternativeInitialResponse
233245
> {
234-
const abortController = (this.initialResponseAbortController =
235-
new AbortController());
236-
237-
const validatedExecutionArgs = this.validatedExecutionArgs;
238-
const externalAbortSignal = validatedExecutionArgs.externalAbortSignal;
239-
let removeAbortListener: (() => void) | undefined;
246+
const externalAbortSignal = this.validatedExecutionArgs.externalAbortSignal;
240247
if (externalAbortSignal) {
241-
if (externalAbortSignal.aborted) {
242-
throw new Error(externalAbortSignal.reason);
243-
}
248+
externalAbortSignal.throwIfAborted();
244249
const onExternalAbort = () => this.cancel(externalAbortSignal.reason);
245-
removeAbortListener = () =>
250+
this.removeExternalAbortListener = () =>
246251
externalAbortSignal.removeEventListener('abort', onExternalAbort);
247252
externalAbortSignal.addEventListener('abort', onExternalAbort);
248253
}
249254

250-
const onFinish = () => {
251-
removeAbortListener?.();
252-
this.finish();
253-
abortController.signal.throwIfAborted();
254-
};
255-
256255
try {
257256
const {
258257
schema,
@@ -261,7 +260,7 @@ export class Executor<
261260
operation,
262261
variableValues,
263262
hideSuggestions,
264-
} = validatedExecutionArgs;
263+
} = this.validatedExecutionArgs;
265264

266265
const { operation: operationType, selectionSet } = operation;
267266

@@ -293,62 +292,40 @@ export class Executor<
293292
if (isPromise(result)) {
294293
const promise = result.then(
295294
(data) => {
296-
onFinish();
295+
this.finish();
297296
return this.buildResponse(data);
298297
},
299298
(error: unknown) => {
300-
onFinish();
299+
this.finish();
301300
this.collectedErrors.add(error as GraphQLError, undefined);
302301
return this.buildResponse(null);
303302
},
304303
);
305-
return externalAbortSignal
306-
? cancellablePromise(promise, abortController.signal)
307-
: promise;
304+
return cancellablePromise(promise, this.internalAbortController.signal);
308305
}
309-
onFinish();
306+
this.finish();
310307
return this.buildResponse(result);
311308
} catch (error) {
309+
this.finish();
312310
this.collectedErrors.add(error as GraphQLError, undefined);
313-
onFinish();
314311
return this.buildResponse(null);
315312
}
316313
}
317314

318-
cancel(reason: unknown): void {
315+
cancel(reason?: unknown): void {
319316
if (!this.finished) {
320-
this.initialResponseAbortController?.abort(reason);
321-
this.finish(reason);
317+
this.finish();
318+
this.internalAbortController.abort(reason);
319+
this.resolverAbortController?.abort(reason);
322320
}
323321
}
324322

325-
finish(reason?: unknown): void {
323+
finish(): void {
326324
if (!this.finished) {
327325
this.finished = true;
328-
this.triggerResolverAbortSignals(reason);
329-
}
330-
}
331-
332-
triggerResolverAbortSignals(reason?: unknown): void {
333-
const { resolverAbortControllers } = this;
334-
const finishReason =
335-
reason ?? new Error('Execution has already completed.');
336-
for (const abortController of resolverAbortControllers.values()) {
337-
abortController.abort(finishReason);
326+
this.removeExternalAbortListener?.();
338327
}
339-
}
340-
341-
getAbortSignal(path: Path): AbortSignal {
342-
const resolverAbortSignal = this.resolverAbortControllers.get(path)?.signal;
343-
if (resolverAbortSignal !== undefined) {
344-
return resolverAbortSignal;
345-
}
346-
const abortController = new AbortController();
347-
this.resolverAbortControllers.set(path, abortController);
348-
if (this.finished) {
349-
abortController.abort(new Error('Execution has already completed.'));
350-
}
351-
return abortController.signal;
328+
this.internalAbortController.signal.throwIfAborted();
352329
}
353330

354331
/**
@@ -358,6 +335,7 @@ export class Executor<
358335
buildResponse(
359336
data: ObjMap<unknown> | null,
360337
): ExecutionResult | TAlternativeInitialResponse {
338+
this.resolverAbortController?.abort();
361339
const errors = this.collectedErrors.errors;
362340
return errors.length ? { errors, data } : { data };
363341
}
@@ -542,7 +520,7 @@ export class Executor<
542520
toNodes(fieldDetailsList),
543521
parentType,
544522
path,
545-
() => this.getAbortSignal(path),
523+
() => this.sharedResolverAbortSignal,
546524
);
547525

548526
// Get the resolve function, regardless of if its result is normal or abrupt (error).
@@ -571,7 +549,6 @@ export class Executor<
571549
path,
572550
result,
573551
positionContext,
574-
true,
575552
);
576553
}
577554

@@ -587,22 +564,13 @@ export class Executor<
587564
if (isPromise(completed)) {
588565
// Note: we don't rely on a `catch` method, but we do expect "thenable"
589566
// to take a second callback for the error case.
590-
return completed.then(
591-
(resolved) => {
592-
this.resolverAbortControllers.delete(path);
593-
return resolved;
594-
},
595-
(rawError: unknown) => {
596-
this.resolverAbortControllers.delete(path);
597-
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
598-
return null;
599-
},
600-
);
567+
return completed.then(undefined, (rawError: unknown) => {
568+
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
569+
return null;
570+
});
601571
}
602-
this.resolverAbortControllers.delete(path);
603572
return completed;
604573
} catch (rawError) {
605-
this.resolverAbortControllers.delete(path);
606574
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
607575
return null;
608576
}
@@ -755,7 +723,6 @@ export class Executor<
755723
path: Path,
756724
result: Promise<unknown>,
757725
positionContext: TPositionContext | undefined,
758-
isFieldValue?: boolean,
759726
): Promise<unknown> {
760727
try {
761728
const resolved = await result;
@@ -774,14 +741,8 @@ export class Executor<
774741
if (isPromise(completed)) {
775742
completed = await completed;
776743
}
777-
if (isFieldValue) {
778-
this.resolverAbortControllers.delete(path);
779-
}
780744
return completed;
781745
} catch (rawError) {
782-
if (isFieldValue) {
783-
this.resolverAbortControllers.delete(path);
784-
}
785746
this.handleFieldError(rawError, returnType, fieldDetailsList, path);
786747
return null;
787748
}

src/execution/incremental/IncrementalExecutor.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,10 @@ export class IncrementalExecutor<
264264

265265
constructor(
266266
validatedExecutionArgs: ValidatedExecutionArgs,
267+
sharedResolverAbortSignal?: AbortSignal,
267268
deferUsageSet?: DeferUsageSet,
268269
) {
269-
super(validatedExecutionArgs);
270+
super(validatedExecutionArgs, sharedResolverAbortSignal);
270271
this.deferUsageSet = deferUsageSet;
271272
this.groups = [];
272273
this.tasks = [];
@@ -276,7 +277,11 @@ export class IncrementalExecutor<
276277
createSubExecutor(
277278
deferUsageSet?: DeferUsageSet,
278279
): IncrementalExecutor<TExperimental> {
279-
return new IncrementalExecutor(this.validatedExecutionArgs, deferUsageSet);
280+
return new IncrementalExecutor(
281+
this.validatedExecutionArgs,
282+
this.sharedResolverAbortSignal,
283+
deferUsageSet,
284+
);
280285
}
281286

282287
override cancel(reason?: unknown): void {
@@ -296,20 +301,21 @@ export class IncrementalExecutor<
296301
override buildResponse(
297302
data: ObjMap<unknown> | null,
298303
): ExecutionResult | TExperimental {
299-
const errors = this.collectedErrors.errors;
300304
const work = this.getIncrementalWork();
301305
const { tasks, streams } = work;
302306
if (tasks?.length === 0 && streams?.length === 0) {
303-
return errors.length ? { errors, data } : { data };
307+
return super.buildResponse(data);
304308
}
305309

310+
const errors = this.collectedErrors.errors;
306311
invariant(data !== null);
307312
const incrementalPublisher = new IncrementalPublisher();
308313
return incrementalPublisher.buildResponse(
309314
data,
310315
errors,
311316
work,
312317
this.validatedExecutionArgs.externalAbortSignal,
318+
() => this.resolverAbortController?.abort(),
313319
) as TExperimental;
314320
}
315321

@@ -509,6 +515,7 @@ export class IncrementalExecutor<
509515
(resolved) =>
510516
this.buildExecutionGroupResult(deliveryGroups, path, resolved),
511517
(error: unknown) => {
518+
this.cancel();
512519
throw error;
513520
},
514521
);

src/execution/incremental/IncrementalPublisher.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export class IncrementalPublisher {
4747
errors: ReadonlyArray<GraphQLError>,
4848
work: IncrementalWork,
4949
abortSignal: AbortSignal | undefined,
50+
onFinished: () => void,
5051
): ExperimentalIncrementalExecutionResults {
5152
const { initialGroups, initialStreams, events } = createWorkQueue<
5253
ExecutionGroupValue,
@@ -61,12 +62,13 @@ export class IncrementalPublisher {
6162
});
6263
}
6364

64-
let onWorkQueueFinished: (() => void) | undefined;
6565
if (abortSignal) {
6666
abortSignal.addEventListener('abort', abort);
67-
onWorkQueueFinished = () =>
68-
abortSignal.removeEventListener('abort', abort);
6967
}
68+
const onWorkQueueFinished = () => {
69+
onFinished();
70+
abortSignal?.removeEventListener('abort', abort);
71+
};
7072

7173
const pending = this._toPendingResults(initialGroups, initialStreams);
7274

@@ -78,7 +80,7 @@ export class IncrementalPublisher {
7880
mapAsyncIterable(events, (batch) =>
7981
this._handleBatch(batch, onWorkQueueFinished),
8082
),
81-
() => onWorkQueueFinished?.(),
83+
() => onWorkQueueFinished(),
8284
);
8385

8486
return {

src/execution/legacyIncremental/BranchingIncrementalExecutor.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,27 +89,29 @@ export class BranchingIncrementalExecutor extends IncrementalExecutor<Experiment
8989
): IncrementalExecutor<ExperimentalIncrementalExecutionResults> {
9090
return new BranchingIncrementalExecutor(
9191
this.validatedExecutionArgs,
92+
this.sharedResolverAbortSignal,
9293
deferUsageSet,
9394
);
9495
}
9596

9697
override buildResponse(
9798
data: ObjMap<unknown> | null,
9899
): ExecutionResult | ExperimentalIncrementalExecutionResults {
99-
const errors = this.collectedErrors.errors;
100100
const work = this.getIncrementalWork();
101101
const { tasks, streams } = work;
102102
if (tasks?.length === 0 && streams?.length === 0) {
103-
return errors.length ? { errors, data } : { data };
103+
return super.buildResponse(data);
104104
}
105105

106+
const errors = this.collectedErrors.errors;
106107
invariant(data !== null);
107108
const incrementalPublisher = new BranchingIncrementalPublisher();
108109
return incrementalPublisher.buildResponse(
109110
data,
110111
errors,
111112
work,
112113
this.validatedExecutionArgs.externalAbortSignal,
114+
() => this.resolverAbortController?.abort(),
113115
);
114116
}
115117

src/execution/legacyIncremental/BranchingIncrementalPublisher.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export class BranchingIncrementalPublisher {
4444
errors: ReadonlyArray<GraphQLError>,
4545
work: IncrementalWork,
4646
abortSignal: AbortSignal | undefined,
47+
onFinished: () => void,
4748
): ExperimentalIncrementalExecutionResults {
4849
const { initialStreams, events } = createWorkQueue<
4950
ExecutionGroupValue,
@@ -62,12 +63,13 @@ export class BranchingIncrementalPublisher {
6263
});
6364
}
6465

65-
let onWorkQueueFinished: (() => void) | undefined;
6666
if (abortSignal) {
6767
abortSignal.addEventListener('abort', abort);
68-
onWorkQueueFinished = () =>
69-
abortSignal.removeEventListener('abort', abort);
7068
}
69+
const onWorkQueueFinished = () => {
70+
onFinished();
71+
abortSignal?.removeEventListener('abort', abort);
72+
};
7173

7274
const initialResult: InitialIncrementalExecutionResult = errors.length
7375
? { errors, data, hasNext: true }
@@ -77,7 +79,7 @@ export class BranchingIncrementalPublisher {
7779
mapAsyncIterable(events, (batch) =>
7880
this._handleBatch(batch, onWorkQueueFinished),
7981
),
80-
() => onWorkQueueFinished?.(),
82+
() => onWorkQueueFinished(),
8183
);
8284

8385
return {

0 commit comments

Comments
 (0)