Skip to content

Commit 2a9e5cd

Browse files
author
Eryk Solecki
committed
Topic test many readers, many writers
1 parent 04d763b commit 2a9e5cd

File tree

2 files changed

+108
-31
lines changed

2 files changed

+108
-31
lines changed

packages/host/src/lib/serviceDiscovery/topic.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,27 @@ class Topic extends Duplex implements StreamHandler {
7373
this.setState(WorkState.Flowing)
7474
}
7575

76-
pipe<T extends NodeJS.WritableStream>(destination: T, options: { end?: boolean; } = { end: false }): T {
76+
pipe<T extends StreamWrapper<Writable>>(destination: T, options?: { end?: boolean; }): T
77+
pipe<T extends NodeJS.WritableStream>(destination: T, options?: { end?: boolean; }): T
78+
pipe(destination: StreamWrapper<Writable> | NodeJS.WritableStream, options?: { end?: boolean; }): typeof destination {
79+
if (destination instanceof StreamWrapper<Writable>)
80+
destination = destination.stream();
81+
82+
if (!(destination instanceof Writable))
83+
throw new Error("Streams not extending Writable are not supported");
84+
7785
this.addConsumer(destination);
7886
return super.pipe(destination, options);
7987
};
8088

81-
unpipe(destination?: NodeJS.WritableStream): this {
82-
if (destination) this.removeConsumer(destination)
89+
unpipe(destination?: StreamWrapper<Writable> | NodeJS.WritableStream): this {
90+
if (destination) {
91+
if (destination instanceof StreamWrapper<Writable>)
92+
destination = destination.stream();
93+
if (!(destination instanceof Writable))
94+
throw new Error("Streams not extending Writable are not supported");
95+
this.removeConsumer(destination)
96+
}
8397
else this.removeAllConsumers();
8498

8599
return super.unpipe(destination);
@@ -154,18 +168,12 @@ class Topic extends Duplex implements StreamHandler {
154168
this.emit(TopicEvent.ProvidersChanged);
155169
}
156170

157-
protected addConsumer<T extends NodeJS.WritableStream>(destination: T) {
158-
if (!(destination instanceof Writable))
159-
throw new Error("Streams not extending Writable are not supported");
160-
171+
protected addConsumer<T extends Writable>(destination: T) {
161172
if (!this.addStream(destination, this.consumers)) return
162173
this.updateState();
163174
this.emit(TopicEvent.ConsumersChanged);
164175
}
165-
protected removeConsumer<T extends NodeJS.WritableStream>(destination: T) {
166-
if (!(destination instanceof Writable))
167-
throw new Error("Streams not extending Writable are not supported");
168-
176+
protected removeConsumer<T extends Writable>(destination: T) {
169177
if (!this.removeStream(destination, this.consumers)) return;
170178
this.updateState()
171179
this.emit(TopicEvent.ConsumersChanged);

packages/host/test/serviceDiscovery/topic.spec.ts

Lines changed: 89 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ describe("Consumer management", () => {
6565

6666
test("Automaticly add consumer on pipe", () => {
6767
expect(testTopic.consumers.size).toBe(0);
68-
testTopic.pipe(testConsumer.stream());
68+
testTopic.pipe(testConsumer);
6969
expect(testTopic.consumers.size).toBe(1);
7070
})
7171

7272
test("Add only unique streams on pipe", () => {
73-
testTopic.pipe(testConsumer.stream());
74-
testTopic.pipe(testConsumer.stream());
75-
testTopic.pipe(testConsumer.stream());
73+
testTopic.pipe(testConsumer);
74+
testTopic.pipe(testConsumer);
75+
testTopic.pipe(testConsumer);
7676
expect(testTopic.consumers.size).toBe(1);
7777
})
7878

@@ -81,9 +81,9 @@ describe("Consumer management", () => {
8181
const testConsumer2 = StreamWrapper.create(new Writable({ write: () => { } }), "testWriteStream2", StreamType.Instance, {});
8282
const testConsumer3 = StreamWrapper.create(new Writable({ write: () => { } }), "testWriteStream3", StreamType.Instance, {});
8383

84-
testTopic.pipe(testConsumer1.stream());
85-
testTopic.pipe(testConsumer2.stream());
86-
testTopic.pipe(testConsumer3.stream());
84+
testTopic.pipe(testConsumer1);
85+
testTopic.pipe(testConsumer2);
86+
testTopic.pipe(testConsumer3);
8787

8888
expect(testTopic.consumers.size).toBe(3);
8989
})
@@ -106,17 +106,16 @@ describe("Consumer management", () => {
106106
const testConsumer2 = StreamWrapper.create(new Writable({ write: () => { } }), "testWriteStream2", StreamType.Instance, {});
107107
const testConsumer3 = StreamWrapper.create(new Writable({ write: () => { } }), "testWriteStream3", StreamType.Instance, {});
108108

109-
testTopic.pipe(testConsumer1.stream());
110-
testTopic.pipe(testConsumer2.stream());
111-
testTopic.pipe(testConsumer3.stream());
109+
testTopic.pipe(testConsumer1);
110+
testTopic.pipe(testConsumer2);
111+
testTopic.pipe(testConsumer3);
112112

113113
expect(testTopic.consumers.size).toBe(3);
114114
testTopic.unpipe();
115115
expect(testTopic.consumers.size).toBe(0);
116116
})
117117
})
118118

119-
120119
describe("Event flow", () => {
121120
const waitForEvent = (eventName: string, source: Stream) => {
122121
return new Promise<boolean>((resolve, reject) => {
@@ -182,7 +181,7 @@ describe("Event flow", () => {
182181
testProvider.stream().pipe(testTopic)
183182
expect(testTopic.state()).toBe(WorkState.Waiting);
184183
const eventPromise = waitForEvent(TopicEvent.StateChanged, testTopic);
185-
testTopic.pipe(testConsumer.stream());
184+
testTopic.pipe(testConsumer);
186185
await eventPromise
187186
expect(testTopic.state()).toBe(WorkState.Flowing);
188187

@@ -208,7 +207,7 @@ describe("Event flow", () => {
208207

209208
test("State flowing after resume()", async () => {
210209
let eventPromise = waitForEvent(TopicEvent.StateChanged, testTopic);
211-
testProvider.stream().pipe(testTopic).pipe(testConsumer.stream());
210+
testProvider.stream().pipe(testTopic).pipe(testConsumer);
212211
await eventPromise;
213212
expect(testTopic.state()).toBe(WorkState.Flowing);
214213

@@ -238,16 +237,16 @@ describe("Event flow", () => {
238237
})
239238
test("ConsumersChanged on add", async () => {
240239
const eventPromise = waitForEvent(TopicEvent.ConsumersChanged, testTopic);
241-
testTopic.pipe(testConsumer.stream());
240+
testTopic.pipe(testConsumer);
242241
await eventPromise;
243242
expect(testTopic.consumers.size).toBe(1);
244243
})
245244
test("ConsumersChanged on remove", async () => {
246245
let eventPromise = waitForEvent(TopicEvent.ConsumersChanged, testTopic);
247-
testTopic.pipe(testConsumer.stream());
246+
testTopic.pipe(testConsumer);
248247
await eventPromise;
249248
eventPromise = waitForEvent(TopicEvent.ConsumersChanged, testTopic);
250-
testTopic.unpipe(testConsumer.stream());
249+
testTopic.unpipe(testConsumer);
251250
expect(testTopic.consumers.size).toBe(0);
252251
})
253252
})
@@ -256,6 +255,13 @@ describe("Event flow", () => {
256255
describe("Data flow", () => {
257256
const testText = "Lorem ipsum dolor sit amet, consectetur adipiscing elit.";
258257

258+
const createWaitingPromise = (): [Promise<void>, () => void, (_: any) => void] => {
259+
let res = () => { }
260+
let rej = (reason: any) => { };
261+
const promise = new Promise<void>((resolve, reject) => { res = resolve, rej = reject })
262+
return [promise, res, rej]
263+
}
264+
259265
test("Basic flow", async () => {
260266
const topicFinished = new Promise(resolve => testTopic.on("readable", () => {
261267
resolve(testTopic.read());
@@ -269,7 +275,7 @@ describe("Data flow", () => {
269275
const testProvider = StreamWrapper.create(new PassThrough({ encoding: "ascii" }), "testReadStream", StreamType.Instance, {});
270276
const testConsumer = StreamWrapper.create(new PassThrough({ encoding: "ascii" }), "testWriteStream", StreamType.Instance, {});
271277

272-
testProvider.stream().pipe(testTopic).pipe(testConsumer.stream());
278+
testProvider.stream().pipe(testTopic).pipe(testConsumer);
273279

274280
const readPromise = new Promise(resolve => testConsumer.stream().on("readable", () => {
275281
resolve(testConsumer.stream().read())
@@ -279,10 +285,73 @@ describe("Data flow", () => {
279285

280286
expect(readValue).toBe(testText);
281287
});
282-
test("Many Providers writing", () => {
283-
288+
test("Many Providers writing", async () => {
289+
const [startGeneratingPromise, startGenerating] = createWaitingPromise();
290+
async function* generator(from: number, to: number) {
291+
let i = from;
292+
while (i <= to) {
293+
await startGeneratingPromise;
294+
yield Number(i++).toString()
295+
}
296+
}
297+
298+
const createStreamProvider = (name: string, from: number, to: number): [StreamWrapper<Readable>, Promise<void>] => {
299+
const gen = generator(from, to);
300+
const provider = StreamWrapper.create(Readable.from(gen).setEncoding("ascii"), name, StreamType.Instance, {});
301+
const [streamEndPromise, streamEnd, streamError] = createWaitingPromise();
302+
provider.stream().on("close", streamEnd).on("error", streamError)
303+
return [provider, streamEndPromise];
304+
}
305+
306+
const [provider1, provider1End] = createStreamProvider("TestReadStream1", 1, 10);
307+
const [provider2, provider2End] = createStreamProvider("TestReadStream2", 11, 20);
308+
const [provider3, provider3End] = createStreamProvider("TestReadStream3", 21, 30);
309+
310+
provider1.stream().pipe(testTopic, { end: false });
311+
provider2.stream().pipe(testTopic, { end: false });
312+
provider3.stream().pipe(testTopic, { end: false });
313+
314+
const result: number[] = [];
315+
testTopic.on("readable", () => { result.push(Number(testTopic.read())) })
316+
317+
startGenerating();
318+
await Promise.all([provider1End, provider2End, provider3End]);
319+
result.sort((a: number, b: number) => a - b)
320+
const expectedResult = [...Array(30).keys()].map(val => val + 1);
321+
const match = result.length === expectedResult.length && !expectedResult.some((value, index) => result[index] !== value)
322+
expect(match).toBe(true);
284323
})
285-
test("Many Consumers reading", () => {
324+
test("Many Consumers reading", async () => {
325+
const consumer1 = StreamWrapper.create(new PassThrough({ encoding: "ascii" }), "TestWriteStream1", StreamType.Instance, {});
326+
const consumer2 = StreamWrapper.create(new PassThrough({ encoding: "ascii" }), "TestWriteStream1", StreamType.Instance, {});
327+
const consumer3 = StreamWrapper.create(new PassThrough({ encoding: "ascii" }), "TestWriteStream1", StreamType.Instance, {});
328+
329+
let result = ["", "", ""];
330+
const [readed1Promise, readed1] = createWaitingPromise();
331+
const [readed2Promise, readed2] = createWaitingPromise();
332+
const [readed3Promise, readed3] = createWaitingPromise();
333+
334+
consumer1.stream().on("readable", () => {
335+
result[0] = consumer1.stream().read();
336+
readed1();
337+
})
338+
consumer2.stream().on("readable", () => {
339+
result[1] = consumer2.stream().read();
340+
readed2();
341+
})
342+
consumer3.stream().on("readable", () => {
343+
result[2] = consumer3.stream().read();
344+
readed3();
345+
})
286346

347+
testTopic.pipe(consumer1);
348+
testTopic.pipe(consumer2);
349+
testTopic.pipe(consumer3);
350+
testTopic.write(testText);
351+
352+
await Promise.all([readed1Promise, readed2Promise, readed3Promise]);
353+
expect(result[0]).toBe(testText);
354+
expect(result[1]).toBe(testText);
355+
expect(result[2]).toBe(testText);
287356
})
288357
})

0 commit comments

Comments
 (0)