Skip to content

Commit 96aa942

Browse files
committed
use withCleanup to wrap actual generators
1 parent 8861629 commit 96aa942

File tree

3 files changed

+45
-95
lines changed

3 files changed

+45
-95
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,6 @@ export class IncrementalGraph {
9797
while ((completed = this._completedQueue.shift()) !== undefined) {
9898
yield completed;
9999
}
100-
if (this._rootNodes.size === 0) {
101-
for (const resolve of this._nextQueue) {
102-
resolve(undefined);
103-
}
104-
}
105100
}
106101

107102
nextCompletedBatch(): Promise<

src/execution/IncrementalPublisher.ts

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
isCompletedExecutionGroup,
2929
isFailedExecutionGroup,
3030
} from './types.js';
31+
import { withCleanup } from './withCleanup.js';
3132

3233
export function buildIncrementalResponse(
3334
context: IncrementalPublisherContext,
@@ -63,11 +64,13 @@ interface SubsequentIncrementalExecutionResultContext {
6364
* @internal
6465
*/
6566
class IncrementalPublisher {
67+
private _isDone: boolean;
6668
private _context: IncrementalPublisherContext;
6769
private _nextId: number;
6870
private _incrementalGraph: IncrementalGraph;
6971

7072
constructor(context: IncrementalPublisherContext) {
73+
this._isDone = false;
7174
this._context = context;
7275
this._nextId = 0;
7376
this._incrementalGraph = new IncrementalGraph();
@@ -92,10 +95,14 @@ class IncrementalPublisher {
9295
? { errors, data, pending, hasNext: true }
9396
: { data, pending, hasNext: true };
9497

95-
return {
96-
initialResult,
97-
subsequentResults: this._subscribe(),
98-
};
98+
const subsequentResults = withCleanup(this._subscribe(), async () => {
99+
this._isDone = true;
100+
this._context.abortSignalListener?.disconnect();
101+
this._incrementalGraph.abort();
102+
await this._returnAsyncIteratorsIgnoringErrors();
103+
});
104+
105+
return { initialResult, subsequentResults };
99106
}
100107

101108
private _toPendingResults(
@@ -121,22 +128,12 @@ class IncrementalPublisher {
121128
return String(this._nextId++);
122129
}
123130

124-
private _subscribe(): AsyncGenerator<
131+
private async *_subscribe(): AsyncGenerator<
125132
SubsequentIncrementalExecutionResult,
126133
void,
127134
void
128135
> {
129-
let isDone = false;
130-
131-
const _next = async (): Promise<
132-
IteratorResult<SubsequentIncrementalExecutionResult, void>
133-
> => {
134-
if (isDone) {
135-
this._context.abortSignalListener?.disconnect();
136-
await this._returnAsyncIteratorsIgnoringErrors();
137-
return { value: undefined, done: true };
138-
}
139-
136+
while (!this._isDone) {
140137
const context: SubsequentIncrementalExecutionResultContext = {
141138
pending: [],
142139
incremental: [],
@@ -155,7 +152,7 @@ class IncrementalPublisher {
155152
const hasNext = this._incrementalGraph.hasNext();
156153

157154
if (!hasNext) {
158-
isDone = true;
155+
this._isDone = true;
159156
}
160157

161158
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
@@ -172,50 +169,14 @@ class IncrementalPublisher {
172169
subsequentIncrementalExecutionResult.completed = completed;
173170
}
174171

175-
return { value: subsequentIncrementalExecutionResult, done: false };
172+
yield subsequentIncrementalExecutionResult;
173+
break;
176174
}
177175

178176
// eslint-disable-next-line no-await-in-loop
179177
batch = await this._incrementalGraph.nextCompletedBatch();
180178
} while (batch !== undefined);
181-
182-
// TODO: add test for this case
183-
/* c8 ignore next */
184-
this._context.abortSignalListener?.disconnect();
185-
await this._returnAsyncIteratorsIgnoringErrors();
186-
return { value: undefined, done: true };
187-
};
188-
189-
const _return = async (): Promise<
190-
IteratorResult<SubsequentIncrementalExecutionResult, void>
191-
> => {
192-
isDone = true;
193-
this._incrementalGraph.abort();
194-
await this._returnAsyncIterators();
195-
return { value: undefined, done: true };
196-
};
197-
198-
const _throw = async (
199-
error?: unknown,
200-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
201-
isDone = true;
202-
this._incrementalGraph.abort();
203-
await this._returnAsyncIterators();
204-
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
205-
return Promise.reject(error);
206-
};
207-
208-
return {
209-
[Symbol.asyncIterator]() {
210-
return this;
211-
},
212-
next: _next,
213-
return: _return,
214-
throw: _throw,
215-
async [Symbol.asyncDispose]() {
216-
await _return();
217-
},
218-
};
179+
}
219180
}
220181

221182
private _handleCompletedIncrementalData(

src/execution/__tests__/simplePubSub.ts

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import { assert } from 'chai';
1+
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
2+
3+
import { withCleanup } from '../withCleanup.js';
24

35
/**
46
* Create an AsyncIterator from an EventEmitter. Useful for mocking a
@@ -21,57 +23,49 @@ export class SimplePubSub<T> {
2123
}
2224

2325
getSubscriber<R>(transform: (value: T) => R): AsyncGenerator<R, void, void> {
24-
const pullQueue: Array<(result: IteratorResult<R, void>) => void> = [];
26+
let pendingNext: ((result: R) => void) | undefined;
2527
const pushQueue: Array<R> = [];
2628
let listening = true;
2729
this._subscribers.add(pushValue);
2830

2931
const emptyQueue = () => {
3032
listening = false;
3133
this._subscribers.delete(pushValue);
32-
for (const resolve of pullQueue) {
33-
resolve({ value: undefined, done: true });
34+
if (pendingNext) {
35+
pendingNext(undefined as R);
3436
}
35-
pullQueue.length = 0;
37+
pendingNext = undefined;
3638
pushQueue.length = 0;
3739
};
3840

39-
return {
40-
next(): Promise<IteratorResult<R, void>> {
41-
if (!listening) {
42-
return Promise.resolve({ value: undefined, done: true });
43-
}
44-
41+
async function* getSubscriberImpl(): AsyncGenerator<R, void, void> {
42+
// eslint-disable-next-line no-unmodified-loop-condition
43+
while (listening) {
4544
if (pushQueue.length > 0) {
4645
const value = pushQueue[0];
4746
pushQueue.shift();
48-
return Promise.resolve({ value, done: false });
47+
yield value;
48+
continue;
4949
}
50-
return new Promise((resolve) => pullQueue.push(resolve));
51-
},
52-
return(): Promise<IteratorResult<R, void>> {
53-
emptyQueue();
54-
return Promise.resolve({ value: undefined, done: true });
55-
},
56-
throw(error: unknown) {
57-
emptyQueue();
58-
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
59-
return Promise.reject(error);
60-
},
61-
[Symbol.asyncIterator]() {
62-
return this;
63-
},
64-
async [Symbol.asyncDispose]() {
65-
await this.return();
66-
},
67-
};
50+
51+
const { promise, resolve } = promiseWithResolvers<R>();
52+
pendingNext = resolve;
53+
// eslint-disable-next-line no-await-in-loop
54+
const value = await promise;
55+
if (!listening) {
56+
return;
57+
}
58+
yield value;
59+
}
60+
}
61+
62+
return withCleanup(getSubscriberImpl(), emptyQueue);
6863

6964
function pushValue(event: T): void {
7065
const value: R = transform(event);
71-
if (pullQueue.length > 0) {
72-
const receiver = pullQueue.shift();
73-
assert(receiver != null);
74-
receiver({ value, done: false });
66+
if (pendingNext) {
67+
pendingNext(value);
68+
pendingNext = undefined;
7569
} else {
7670
pushQueue.push(value);
7771
}

0 commit comments

Comments
 (0)