1
+ import { Duplex , PassThrough } from "stream" ;
2
+ import { ContentType } from "./contentType" ;
3
+ import { ReadableState , StreamOrigin , WorkState , WritableState } from "./streamHandler" ;
4
+ import Topic , { TopicEvent , TopicStreamOptions } from "./topic" ;
5
+ import TopicName from "./topicName" ;
6
+ import { TopicState } from "./topicHandler" ;
7
+
8
+ class PersistentTopic extends Topic {
9
+ // inReadStream: Duplex
10
+ persistentSequence : Duplex ;
11
+ // outWriteStream: Duplex
12
+
13
+ constructor ( id : TopicName , contentType : ContentType , origin : StreamOrigin , options ?: TopicStreamOptions ) {
14
+ super ( id , contentType , origin , options ) ;
15
+
16
+ // this.inReadStream = new PassThrough({ highWaterMark: 0 })
17
+ this . persistentSequence = new PassThrough ( ) ;
18
+ // this.outWriteStream = new PassThrough({ highWaterMark: 0 })
19
+
20
+ // this.inReadStream.pipe(this.persistentSequence).pipe(this.outWriteStream)
21
+
22
+ this . persistentSequence . on ( "readable" , ( ) => { this . pushFromOutStream ( ) } )
23
+
24
+ this . persistentSequence . on ( "drain" , ( ) => this . updateState ( ) )
25
+ this . persistentSequence . on ( "pause" , ( ) => this . updateState ( ) )
26
+ this . persistentSequence . on ( "resume" , ( ) => this . updateState ( ) )
27
+ this . persistentSequence . on ( "error" , ( ) => this . updateState ( ) )
28
+ }
29
+ protected attachEventListeners ( ) {
30
+ this . on ( "pipe" , this . addProvider )
31
+ this . on ( "unpipe" , this . removeProvider )
32
+ this . on ( TopicEvent . ProvidersChanged , ( ) => this . updateState ( ) )
33
+ this . on ( TopicEvent . ConsumersChanged , ( ) => this . updateState ( ) )
34
+ }
35
+ state ( ) : TopicState {
36
+ if ( this . persistentSequence . errored ) return WorkState . Error ;
37
+ if ( this . persistentSequence . isPaused ( ) || this . providers . size === 0 || this . consumers . size === 0 ) return ReadableState . Pause ;
38
+ if ( this . persistentSequence . writableNeedDrain ) return WritableState . Drain ;
39
+ return WorkState . Flowing ;
40
+ }
41
+
42
+ _write ( chunk : any , encoding : BufferEncoding , callback : ( error ?: Error | null | undefined ) => void ) : void {
43
+ this . persistentSequence . write ( chunk , encoding , callback )
44
+ }
45
+ _read ( size : number ) : void {
46
+ this . pushFromOutStream ( size ) ;
47
+ }
48
+ private pushFromOutStream ( size ?: number ) {
49
+ let chunk ;
50
+ while ( null !==
51
+ ( chunk = this . persistentSequence . read ( size ) ) ) {
52
+ if ( ! this . push ( chunk ) ) break ;
53
+ }
54
+ }
55
+ }
56
+
57
+ export default PersistentTopic ;
58
+
59
+
60
+ // class Topic extends Duplex implements TopicBase {
61
+ // protected _options: TopicOptions;
62
+ // protected _origin: StreamOrigin;
63
+ // protected _state: TopicState;
64
+ // providers: Providers
65
+ // consumers: Consumers
66
+ // protected inReadStream: PassThrough
67
+ // protected outWriteStream: PassThrough
68
+
69
+ // id() { return this._options.name.toString() };
70
+ // options() { return this._options }
71
+ // type() { return StreamType.Topic }
72
+ // state(): TopicState {
73
+ // if (this.errored) return WorkState.Error;
74
+ // if (this.isPaused() || this.providers.size === 0 || this.consumers.size === 0) return ReadableState.Pause;
75
+ // if (this.inReadStream.writableNeedDrain || this.outWriteStream.writableNeedDrain) return WritableState.Drain;
76
+ // return WorkState.Flowing;
77
+ // }
78
+ // origin() { return this._origin }
79
+
80
+ // constructor(name: TopicName, contentType: string, origin: StreamOrigin, options?: Options) {
81
+ // super(options);
82
+ // this._origin = origin;
83
+ // this._state = ReadableState.Pause;
84
+ // this._options = { name, contentType }
85
+ // this.providers = new Map();
86
+ // this.consumers = new Map();
87
+
88
+ // this.inReadStream = new PassThrough({ ...options, highWaterMark: 0 });
89
+ // this.outWriteStream = new PassThrough({ ...options, highWaterMark: 0 });
90
+
91
+ // this.inReadStream.pipe(this.outWriteStream)
92
+
93
+ // this.inReadStream
94
+ // .on("error", (err: Error) => this.destroy(err))
95
+ // .on("pause", () => this.pause())
96
+ // .on("resume", () => this.resume())
97
+ // .on("drain", () => this.updateState())
98
+
99
+ // this.outWriteStream
100
+ // .on("error", (err: Error) => this.destroy(err))
101
+ // .on("readable", () => this.pushFromOutStream())
102
+ // .on("drain", () => this.updateState())
103
+ // .on("end", () => { this.push(null) })
104
+ // .on("finish", () => { throw new Error(`Unexpected error: topic ${this.id()} finished on outWriteStream`) })
105
+
106
+ // this.on("pipe", this.addProvider)
107
+ // this.on("unpipe", this.removeProvider)
108
+ // }
109
+
110
+ // updateState() {
111
+ // const currentState = this.state();
112
+ // if (this._state === currentState) return;
113
+ // this._state = currentState;
114
+ // this.emit(TopicEvent.StateChanged, currentState);
115
+ // }
116
+
117
+ // pipe<T extends WritableStreamWrapper<Writable>>(destination: T, options?: { end?: boolean; }): T
118
+ // pipe<T extends NodeJS.WritableStream>(destination: T, options?: { end?: boolean; }): T
119
+ // pipe(destination: WritableStreamWrapper<Writable> | NodeJS.WritableStream, options?: { end?: boolean; }): typeof destination {
120
+ // if (destination instanceof WritableStreamWrapper<Writable>)
121
+ // destination = destination.stream();
122
+
123
+ // if (!(destination instanceof Writable))
124
+ // throw new Error("Streams not extending Writable are not supported");
125
+
126
+ // this.addConsumer(destination);
127
+ // return super.pipe(destination, options);
128
+ // };
129
+
130
+ // unpipe(destination?: WritableStreamWrapper<Writable> | NodeJS.WritableStream): this {
131
+ // if (destination) {
132
+ // if (destination instanceof WritableStreamWrapper<Writable>)
133
+ // destination = destination.stream();
134
+ // if (!(destination instanceof Writable))
135
+ // throw new Error("Streams not extending Writable are not supported");
136
+ // this.removeConsumer(destination)
137
+ // }
138
+ // else this.removeAllConsumers();
139
+
140
+ // return super.unpipe(destination);
141
+ // };
142
+
143
+ // _read(size: number): void {
144
+ // this.pushFromOutStream(size);
145
+ // }
146
+ // setEncoding(encoding: BufferEncoding): this {
147
+ // this.inReadStream.setEncoding(encoding);
148
+ // this.outWriteStream.setEncoding(encoding);
149
+ // return this;
150
+ // }
151
+
152
+ // _write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null | undefined) => void): void {
153
+ // this.inReadStream.write(chunk, encoding, callback)
154
+ // }
155
+ // setDefaultEncoding(encoding: BufferEncoding): this {
156
+ // this.inReadStream.setDefaultEncoding(encoding);
157
+ // this.outWriteStream.setDefaultEncoding(encoding);
158
+ // return this;
159
+ // }
160
+
161
+ // end(cb?: (() => void) | undefined): this;
162
+ // end(chunk: any, cb?: (() => void) | undefined): this;
163
+ // end(chunk: any, encoding?: BufferEncoding | undefined, cb?: (() => void) | undefined): this;
164
+ // end(chunk?: unknown, encoding?: unknown, cb?: unknown): this {
165
+ // throw new Error(`Topics are not supporting end() method`)
166
+ // }
167
+
168
+ // cork(): void {
169
+ // this.inReadStream.cork();
170
+ // super.cork();
171
+ // }
172
+ // uncork(): void {
173
+ // this.inReadStream.uncork();
174
+ // super.uncork();
175
+ // }
176
+ // destroy(error?: Error | undefined): this {
177
+ // if (!this.inReadStream.errored)
178
+ // this.inReadStream.destroy(error);
179
+ // super.destroy(error);
180
+ // this.updateState();
181
+ // return this;
182
+ // }
183
+
184
+ // pause(): this {
185
+ // if (!this.inReadStream.isPaused())
186
+ // this.inReadStream.pause();
187
+ // super.pause();
188
+ // this.updateState();
189
+ // return this;
190
+ // }
191
+ // resume(): this {
192
+ // if (this.inReadStream.isPaused())
193
+ // this.inReadStream.resume();
194
+ // super.resume();
195
+ // this.updateState();
196
+ // return this;
197
+ // }
198
+
199
+ // protected addProvider<T extends NodeJS.ReadableStream>(source: T) {
200
+ // if (!(source instanceof Readable))
201
+ // throw new Error("Streams not extending Readable are not supported");
202
+
203
+ // if (!this.addStream(source, this.providers)) return
204
+ // this.emit(TopicEvent.ProvidersChanged);
205
+ // }
206
+ // protected removeProvider<T extends NodeJS.ReadableStream>(source: T) {
207
+ // if (!(source instanceof Readable))
208
+ // throw new Error("Streams not extending Readable are not supported");
209
+
210
+ // if (!this.removeStream(source, this.providers)) return;
211
+ // this.emit(TopicEvent.ProvidersChanged);
212
+ // }
213
+
214
+ // protected addConsumer<T extends Writable>(destination: T) {
215
+ // if (!this.addStream(destination, this.consumers)) return
216
+ // this.emit(TopicEvent.ConsumersChanged);
217
+ // }
218
+ // protected removeConsumer<T extends Writable>(destination: T) {
219
+ // if (!this.removeStream(destination, this.consumers)) return;
220
+ // this.emit(TopicEvent.ConsumersChanged);
221
+ // }
222
+ // protected removeAllConsumers() { this.consumers.clear(); }
223
+
224
+ // private addStream(stream: Writable, destination: Consumers): boolean
225
+ // private addStream(stream: Readable, destination: Providers): boolean
226
+ // private addStream(stream: Writable | Readable, destination: Consumers | Providers) {
227
+ // let streamHandler: StreamHandler;
228
+ // if (stream instanceof Topic) streamHandler = stream;
229
+ // else if (stream instanceof Readable) streamHandler = ReadableStreamWrapper.retrive(stream);
230
+ // else if (stream instanceof Writable) streamHandler = WritableStreamWrapper.retrive(stream);
231
+ // else throw new Error("Unsupported stream type");
232
+
233
+ // const streamExist = destination.has(stream);
234
+ // if (streamExist) return false;
235
+ // destination.set(stream, streamHandler);
236
+ // this.updateState();
237
+ // return true;
238
+ // }
239
+
240
+ // private removeStream(stream: Writable, destination: Consumers): boolean
241
+ // private removeStream(stream: Readable, destination: Providers): boolean
242
+ // private removeStream(stream: Writable | Readable, destination: Consumers | Providers) {
243
+ // const removed = destination.delete(stream);
244
+ // if (removed) this.updateState();
245
+ // return removed;
246
+ // }
247
+
248
+ // private pushFromOutStream(size?: number) {
249
+ // let chunk;
250
+ // while (null !==
251
+ // (chunk = this.outWriteStream.read(size))) {
252
+ // if (!this.push(chunk)) break;
253
+ // }
254
+ // }
255
+ // }
0 commit comments