Skip to content

Commit a5a067b

Browse files
committed
Fix message reader dispose
1 parent 0671381 commit a5a067b

File tree

2 files changed

+49
-7
lines changed

2 files changed

+49
-7
lines changed

jsonrpc/src/common/disposable.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,40 @@ export namespace Disposable {
1717
};
1818
}
1919
}
20+
21+
export class DisposableStore implements Disposable {
22+
23+
private isDisposed: boolean;
24+
private readonly disposables: Set<Disposable>;
25+
26+
constructor() {
27+
this.isDisposed = false;
28+
this.disposables = new Set<Disposable>();
29+
}
30+
31+
/**
32+
* Dispose of all registered disposables and mark this object as disposed.
33+
*
34+
* Any future disposables added to this object will be disposed of on `add`.
35+
*/
36+
public dispose(): void {
37+
if (this.isDisposed || this.disposables.size === 0) {
38+
return;
39+
}
40+
try {
41+
this.disposables.forEach(item => item.dispose());
42+
} finally {
43+
this.isDisposed = true;
44+
this.disposables.clear();
45+
}
46+
}
47+
48+
public add<T extends Disposable>(t: T): T {
49+
if (this.isDisposed) {
50+
t.dispose();
51+
} else {
52+
this.disposables.add(t);
53+
}
54+
return t;
55+
}
56+
}

jsonrpc/src/common/messageReader.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { Message } from './messages';
1111
import { ContentDecoder, ContentTypeDecoder } from './encoding';
1212
import { Disposable } from './api';
1313
import { Semaphore } from './semaphore';
14+
import { DisposableStore } from './disposable';
1415

1516
/**
1617
* A callback that receives each incoming JSON-RPC message.
@@ -170,7 +171,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader {
170171

171172
private readable: RAL.ReadableStream;
172173
private options: ResolvedMessageReaderOptions;
173-
private callback!: DataCallback;
174+
private callback: DataCallback | undefined;
174175

175176
private nextMessageLength: number;
176177
private messageToken: number;
@@ -199,16 +200,20 @@ export class ReadableStreamMessageReader extends AbstractMessageReader {
199200
}
200201

201202
public listen(callback: DataCallback): Disposable {
203+
if (this.callback !== undefined) {
204+
throw new Error('Reader can only listen once.');
205+
}
202206
this.nextMessageLength = -1;
203207
this.messageToken = 0;
204208
this.partialMessageTimer = undefined;
205209
this.callback = callback;
206-
const result = this.readable.onData((data: Uint8Array) => {
210+
const disposables = new DisposableStore();
211+
disposables.add(this.readable.onData((data: Uint8Array) => {
207212
this.onData(data);
208-
});
209-
this.readable.onError((error: any) => this.fireError(error));
210-
this.readable.onClose(() => this.fireClose());
211-
return result;
213+
}));
214+
disposables.add(this.readable.onError((error: any) => this.fireError(error)));
215+
disposables.add(this.readable.onClose(() => this.fireClose()));
216+
return disposables;
212217
}
213218

214219
private onData(data: Uint8Array): void {
@@ -250,7 +255,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader {
250255
? await this.options.contentDecoder.decode(body)
251256
: body;
252257
const message = await this.options.contentTypeDecoder.decode(bytes, this.options);
253-
this.callback(message);
258+
this.callback!(message);
254259
}).catch((error) => {
255260
this.fireError(error);
256261
});

0 commit comments

Comments
 (0)