From a5a067bd8f1001e2e9a978057b41e331be6062ca Mon Sep 17 00:00:00 2001 From: Dirk Baeumer Date: Thu, 10 Oct 2024 15:30:29 +0200 Subject: [PATCH 1/2] Fix message reader dispose --- jsonrpc/src/common/disposable.ts | 37 +++++++++++++++++++++++++++++ jsonrpc/src/common/messageReader.ts | 19 +++++++++------ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/jsonrpc/src/common/disposable.ts b/jsonrpc/src/common/disposable.ts index 97139d349..4c7b776f9 100644 --- a/jsonrpc/src/common/disposable.ts +++ b/jsonrpc/src/common/disposable.ts @@ -17,3 +17,40 @@ export namespace Disposable { }; } } + +export class DisposableStore implements Disposable { + + private isDisposed: boolean; + private readonly disposables: Set; + + constructor() { + this.isDisposed = false; + this.disposables = new Set(); + } + + /** + * Dispose of all registered disposables and mark this object as disposed. + * + * Any future disposables added to this object will be disposed of on `add`. + */ + public dispose(): void { + if (this.isDisposed || this.disposables.size === 0) { + return; + } + try { + this.disposables.forEach(item => item.dispose()); + } finally { + this.isDisposed = true; + this.disposables.clear(); + } + } + + public add(t: T): T { + if (this.isDisposed) { + t.dispose(); + } else { + this.disposables.add(t); + } + return t; + } +} \ No newline at end of file diff --git a/jsonrpc/src/common/messageReader.ts b/jsonrpc/src/common/messageReader.ts index 3e9175515..dff5d4d1e 100644 --- a/jsonrpc/src/common/messageReader.ts +++ b/jsonrpc/src/common/messageReader.ts @@ -11,6 +11,7 @@ import { Message } from './messages'; import { ContentDecoder, ContentTypeDecoder } from './encoding'; import { Disposable } from './api'; import { Semaphore } from './semaphore'; +import { DisposableStore } from './disposable'; /** * A callback that receives each incoming JSON-RPC message. @@ -170,7 +171,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { private readable: RAL.ReadableStream; private options: ResolvedMessageReaderOptions; - private callback!: DataCallback; + private callback: DataCallback | undefined; private nextMessageLength: number; private messageToken: number; @@ -199,16 +200,20 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { } public listen(callback: DataCallback): Disposable { + if (this.callback !== undefined) { + throw new Error('Reader can only listen once.'); + } this.nextMessageLength = -1; this.messageToken = 0; this.partialMessageTimer = undefined; this.callback = callback; - const result = this.readable.onData((data: Uint8Array) => { + const disposables = new DisposableStore(); + disposables.add(this.readable.onData((data: Uint8Array) => { this.onData(data); - }); - this.readable.onError((error: any) => this.fireError(error)); - this.readable.onClose(() => this.fireClose()); - return result; + })); + disposables.add(this.readable.onError((error: any) => this.fireError(error))); + disposables.add(this.readable.onClose(() => this.fireClose())); + return disposables; } private onData(data: Uint8Array): void { @@ -250,7 +255,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { ? await this.options.contentDecoder.decode(body) : body; const message = await this.options.contentTypeDecoder.decode(bytes, this.options); - this.callback(message); + this.callback!(message); }).catch((error) => { this.fireError(error); }); From e2414f141a36d9049fae7009e54ca01e9ebed24d Mon Sep 17 00:00:00 2001 From: Karthik Nadig Date: Thu, 17 Oct 2024 11:21:59 -0700 Subject: [PATCH 2/2] Ensure reader events are hooked up once --- jsonrpc/src/common/messageReader.ts | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/jsonrpc/src/common/messageReader.ts b/jsonrpc/src/common/messageReader.ts index dff5d4d1e..ca57ef197 100644 --- a/jsonrpc/src/common/messageReader.ts +++ b/jsonrpc/src/common/messageReader.ts @@ -180,15 +180,29 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { private _partialMessageTimeout: number; private readSemaphore: Semaphore; + private disposables = new DisposableStore(); public constructor(readable: RAL.ReadableStream, options?: RAL.MessageBufferEncoding | MessageReaderOptions) { super(); this.readable = readable; this.options = ResolvedMessageReaderOptions.fromOptions(options); this.buffer = RAL().messageBuffer.create(this.options.charset); this._partialMessageTimeout = 10000; + this.partialMessageTimer = undefined; this.nextMessageLength = -1; this.messageToken = 0; this.readSemaphore = new Semaphore(1); + + this.disposables.add(this.readable.onData((data: Uint8Array) => { + if(this.callback){this.onData(data);} + })); + this.disposables.add(this.readable.onError((error: any) => this.fireError(error))); + this.disposables.add(this.readable.onClose(() => this.fireClose())); + } + + public dispose(): void { + super.dispose(); + this.disposables.dispose(); + this.callback = undefined; } public set partialMessageTimeout(timeout: number) { @@ -203,17 +217,8 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { if (this.callback !== undefined) { throw new Error('Reader can only listen once.'); } - this.nextMessageLength = -1; - this.messageToken = 0; - this.partialMessageTimer = undefined; this.callback = callback; - const disposables = new DisposableStore(); - disposables.add(this.readable.onData((data: Uint8Array) => { - this.onData(data); - })); - disposables.add(this.readable.onError((error: any) => this.fireError(error))); - disposables.add(this.readable.onClose(() => this.fireClose())); - return disposables; + return Disposable.create(() => this.callback = undefined); } private onData(data: Uint8Array): void {