Skip to content

Commit dc8e040

Browse files
author
Eryk Solecki
committed
Test for multiple disconnections
1 parent c9e1ba6 commit dc8e040

File tree

4 files changed

+569
-131
lines changed

4 files changed

+569
-131
lines changed

packages/host/src/lib/serviceDiscovery/backupingStream.ts

Lines changed: 51 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,73 @@
1-
import { createReadStream, Stats, fstat, fstatSync, Dir, WriteStream, createWriteStream, openSync, watch } from "fs";
1+
import { FileHandle, open } from "fs/promises";
22
import { Duplex, DuplexOptions } from "stream";
33

4-
type BackupingStreamOptions = Pick<DuplexOptions, "encoding">
4+
type BackupingStreamOptions = Pick<DuplexOptions, "encoding" | "highWaterMark">
55

66
class BackupingStream extends Duplex {
7-
readonly backupDir: Dir;
8-
private writeStream: WriteStream;
9-
private bytesRead;
10-
private size: number;
11-
private fd: number;
12-
// private readStream: ReadStream;
7+
private writeHandle: FileHandle;
8+
private readHandle: FileHandle;
9+
bytesWritten: number;
10+
bytesRead: number;
11+
readonly backupFile: string;
1312

14-
constructor(backupDir: Dir, opts?: BackupingStreamOptions) {
15-
super({ ...opts, highWaterMark: 0 });
16-
const backupFilePath = `${backupDir.path}/tmp1`;
17-
18-
this.backupDir = backupDir;
19-
this.fd = openSync(backupFilePath, "w+");
13+
private constructor(backupFile: string, writeHandle: FileHandle,
14+
readHandle: FileHandle, opts?: BackupingStreamOptions) {
15+
super({ ...opts });
16+
this.backupFile = backupFile;
17+
this.writeHandle = writeHandle;
18+
this.bytesWritten = 0;
2019
this.bytesRead = 0;
21-
this.size = fstatSync(this.fd).size;
22-
this.writeStream = createWriteStream("", { fd: this.fd, encoding: opts?.encoding, autoClose: false });
23-
this.attachBackupReadableEvent(this.fd, backupFilePath);
20+
this.readHandle = readHandle;
21+
}
2422

25-
this.on("backupReadable", async () => {
26-
this.pushFromBackup();
27-
});
23+
static async create(backupDir: string, opts?: BackupingStreamOptions) {
24+
const backupFile = `${backupDir}/tmp1`;
25+
const backupWrite = await open(backupFile, "w");
26+
const backupRead = await open(backupFile, "r");
27+
28+
return new BackupingStream(backupFile, backupWrite, backupRead, opts);
2829
}
29-
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null | undefined) => void) {
30-
if (this.canSkipBackup()) {
30+
31+
async _write(chunk: any, encoding: BufferEncoding,
32+
callback: (error?: Error | null | undefined) => void): Promise<void> {
33+
if (this.bytesInBackup() <= 0 && this.readableLength < this.readableHighWaterMark) {
3134
this.push(chunk, encoding);
3235
callback();
3336
return;
3437
}
35-
this.createBackup(chunk, callback);
36-
}
37-
_read(_size: number) { }
38-
39-
canSkipBackup() { return this.readableFlowing === true && this.backupIsEmpty(); }
40-
bytesInBackup() { return this.writeStream.bytesWritten - this.bytesRead; }
41-
backupIsEmpty() { return this.bytesInBackup() <= 0; }
38+
try {
39+
const { bytesWritten } = await this.writeBuckup(chunk, encoding);
4240

43-
createBackup(chunk: any, callback: (error?: Error | null | undefined) => void) {
44-
this.writeStream.write(chunk, callback);
41+
this.bytesWritten += bytesWritten;
42+
callback();
43+
} catch (err: any) {
44+
callback(err);
45+
}
4546
}
47+
async _read(size: number): Promise<void> {
48+
if (this.bytesInBackup() <= 0) return;
4649

47-
private attachBackupReadableEvent(fd: number, backupFilePath: string) {
48-
watch(backupFilePath, { persistent: false }, (event) => {
49-
if (event !== "change") return;
50-
fstat(fd, (err, stats: Stats) => {
51-
if (this.size === stats.size) return;
52-
this.size = stats.size;
53-
this.emit("backupReadable");
54-
});
50+
const { bytesRead, buffer } = await this.readHandle.read({
51+
buffer: Buffer.alloc(size),
52+
position: this.bytesRead,
53+
length: size,
5554
});
55+
56+
this.bytesRead += bytesRead;
57+
this.push(buffer.subarray(0, bytesRead));
5658
}
5759

58-
private pushFromBackup(size?: number) {
59-
const readStream = createReadStream("", { fd: this.fd, autoClose: false, start: this.bytesRead })
60-
.on("readable", () => {
61-
let chunk;
60+
bytesInBackup() { return this.bytesWritten - this.bytesRead; }
61+
62+
writeBuckup(chunk: any, encoding: BufferEncoding) {
63+
if (typeof chunk === "string") {
64+
return this.writeHandle.write(chunk, this.bytesWritten, encoding);
65+
}
66+
return this.writeHandle.write(chunk, 0, undefined, this.bytesWritten);
67+
}
6268

63-
while ((chunk = readStream.read(size)) !== null) {
64-
if (!this.push(chunk)) break;
65-
}
66-
});
69+
async close() {
70+
return Promise.all([this.writeHandle.close(), this.readHandle.close()]);
6771
}
6872
}
6973

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import { createReadStream } from "fs";
2+
import BackupingStream from "../../src/lib/serviceDiscovery/backupingStream";
3+
import { FileHandle, mkdir, open } from "fs/promises";
4+
import { resolve } from "path";
5+
import { PassThrough } from "stream";
6+
import { createInterface } from "readline";
7+
import { EOL } from "os";
8+
9+
const testText = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Duis eget nisl viverra, efficitur eros quis, sodales eros. Suspendisse feugiat ac ipsum non aliquam. Quisque dapibus nisi libero. Fusce euismod lacus vitae eros suscipit semper in non dui. Sed quis porttitor elit. Nulla a sapien mi. Suspendisse tempus vitae lectus a lobortis. Phasellus et gravida purus, et porta mauris. Integer urna nulla, semper nec leo id, ultrices bibendum odio. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Aenean eu iaculis dui. Ut ac dolor aliquet, placerat sapien vitae, hendrerit purus.
10+
11+
Proin porta cursus erat, ut dignissim felis. Suspendisse potenti. Duis et aliquet odio. Aliquam congue blandit dolor non dapibus. Vivamus eros dolor, vulputate vitae arcu vel, pretium porttitor lorem. Vivamus eget quam quis arcu lacinia malesuada id a sem. Maecenas in ligula orci. Sed dapibus, urna vel malesuada dictum, lectus ex convallis ante, eget tempor lorem ante ac lectus. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Suspendisse enim ante, laoreet at urna vel, malesuada vestibulum nulla. Morbi orci metus, imperdiet in mattis a, tincidunt vitae tortor. Vivamus at consectetur mauris.
12+
13+
In eu nulla luctus, lacinia nulla sit amet, condimentum leo. Vivamus lectus ex, venenatis in nulla eu, congue rutrum sapien. Cras suscipit sed elit non blandit. Nullam volutpat facilisis elit at laoreet. Aliquam mattis quis sem eget rutrum. Duis sapien dui, tincidunt nec ipsum id, rutrum pulvinar purus. Maecenas ut euismod purus, vel porta lacus. Vestibulum sit amet nulla sit amet diam rhoncus pharetra nec at purus. Donec mattis diam sit amet nisi tincidunt accumsan. Donec a nunc sed tortor dictum finibus. In ultricies lectus non urna convallis mollis. Pellentesque aliquam metus ante, laoreet porttitor mauris sodales et. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Nullam quis laoreet nisl. Praesent pulvinar, orci sit amet finibus semper, nibh risus laoreet augue, at blandit tellus nulla ut odio.
14+
15+
Vestibulum bibendum est a massa porta, vitae tempus enim malesuada. Pellentesque fringilla leo non vehicula pretium. Ut maximus ipsum at diam egestas, sit amet volutpat erat iaculis. Fusce dictum ipsum quam, sit amet hendrerit elit gravida a. Nam efficitur leo in mollis eleifend. Cras quis lorem pretium, condimentum libero sed, lacinia neque. Vestibulum tincidunt quam ut congue consectetur. Nullam congue varius bibendum. Maecenas consequat arcu quis magna rhoncus posuere. Cras vitae lectus ut lectus pharetra pulvinar varius vel justo.
16+
17+
Ut congue purus ac lorem scelerisque, id congue eros venenatis. Sed rutrum vitae odio vel malesuada. Aliquam vel nibh at ipsum sodales ultricies. Quisque facilisis neque sem, id accumsan leo gravida vel. Vestibulum eros libero, tempus ac arcu vel, tincidunt pulvinar magna. Aliquam dictum ornare magna sed iaculis. Integer vulputate metus at iaculis eleifend. Aenean vestibulum ut sapien id malesuada. Sed eu faucibus lectus, vitae gravida metus. Mauris et iaculis felis, vitae egestas metus. Integer semper, erat in gravida vehicula, justo neque suscipit est, porta dapibus est magna ac metus.`;
18+
19+
let backupDir: string;
20+
let testFilePath: string;
21+
22+
beforeAll(async () => {
23+
backupDir = resolve(process.cwd(), "./dist/test/");
24+
await mkdir(backupDir, { recursive: true });
25+
26+
testFilePath = resolve(process.cwd(), "./test/serviceDiscovery/testFile.txt");
27+
});
28+
29+
test("Basic read/write", async () => {
30+
const backupingStream = await BackupingStream.create(backupDir, { encoding: "ascii" });
31+
32+
const readingFinished = new Promise(res => { backupingStream.on("readable", () => { res(backupingStream.read()); }); });
33+
34+
backupingStream.write(testText);
35+
const result = await readingFinished;
36+
37+
expect(result).toBe(testText);
38+
await backupingStream.close();
39+
});
40+
41+
test("Simple piped read/write", async () => {
42+
const provider = new PassThrough({ encoding: "ascii" });
43+
const consumer = new PassThrough({ encoding: "ascii" });
44+
const backupingStream = await BackupingStream.create(backupDir, { encoding: "ascii" });
45+
46+
provider.pipe(backupingStream).pipe(consumer);
47+
48+
const readPromise = new Promise(res => consumer.on("readable", () => {
49+
res(consumer.read());
50+
}));
51+
52+
provider.write(testText);
53+
const consumerRead = await readPromise;
54+
55+
expect(consumerRead).toBe(testText);
56+
await backupingStream.close();
57+
});
58+
59+
test("Write to backup", async () => {
60+
const backupingStream = await BackupingStream.create(backupDir, { highWaterMark: 0 });
61+
62+
const source = createReadStream(testFilePath);
63+
const readLine = createInterface({ input: source });
64+
65+
let lastLineWritten = new Promise<void>(res => { res(); });
66+
67+
const inputFinished = new Promise<void>((res) => {
68+
readLine
69+
.on("close", res)
70+
.on("line", (input: string) => {
71+
lastLineWritten = new Promise(wrtieRes => {
72+
backupingStream.write(input);
73+
backupingStream.write(EOL, undefined, () => wrtieRes());
74+
});
75+
});
76+
});
77+
78+
await inputFinished;
79+
await lastLineWritten;
80+
81+
const [inputFile, backupFile] = await Promise.all([open(testFilePath), open(backupingStream.backupFile)]);
82+
const [inputBuff, outputBuff] = await Promise.all([inputFile.readFile(), backupFile.readFile()]);
83+
84+
const equals = inputBuff.equals(outputBuff);
85+
86+
expect(equals).toBeTruthy();
87+
await Promise.all([backupingStream.close(), inputFile.close(), backupFile.close()]);
88+
});
89+
90+
const waitForFileToReachSize = async (file: FileHandle, size: number) => {
91+
let timeoutOccured = false;
92+
const timeout = setTimeout(() => { timeoutOccured = true; }, 200);
93+
94+
while (!timeoutOccured) {
95+
const stats = await file.stat();
96+
97+
if (stats.size === size) break;
98+
}
99+
clearTimeout(timeout);
100+
};
101+
102+
test("Pipe to consumer from backup", async () => {
103+
const backupingStream = await BackupingStream.create(backupDir);
104+
105+
const source = createReadStream(testFilePath);
106+
const readLine = createInterface({ input: source });
107+
108+
let lastLineWritten = new Promise<void>(res => { res(); });
109+
110+
const inputFinished = new Promise<void>((res) => {
111+
readLine
112+
.on("close", res)
113+
.on("line", (input: string) => {
114+
lastLineWritten = new Promise(wrtieRes => {
115+
backupingStream.write(input);
116+
backupingStream.write(EOL, undefined, () => wrtieRes());
117+
});
118+
});
119+
});
120+
121+
await inputFinished;
122+
await lastLineWritten;
123+
124+
const outputFilePath = resolve(backupDir, "./testBackupingStream_output.txt");
125+
const outputFile = await open(outputFilePath, "w+");
126+
const consumer = outputFile.createWriteStream();
127+
128+
backupingStream.pipe(consumer);
129+
130+
const [inputFile, resultFile] = await Promise.all([open(testFilePath), open(outputFilePath)]);
131+
132+
const inputFileStat = await inputFile.stat();
133+
134+
await waitForFileToReachSize(outputFile, inputFileStat.size);
135+
136+
const [inputBuff, resultBuff] = await Promise.all([inputFile.readFile(), resultFile.readFile()]);
137+
138+
const equals = inputBuff.equals(resultBuff);
139+
140+
expect(equals).toBeTruthy();
141+
142+
await Promise.all([backupingStream.close(), inputFile.close(), outputFile.close(), resultFile.close()]);
143+
});
144+
145+
test("Multiple disconnections of consumer", async () => {
146+
const outputPath = resolve(backupDir, "./testBackupingStream_long_output.txt");
147+
const [source, output] = await Promise.all([open(testFilePath, "r"), open(outputPath, "w+")]);
148+
const sourceSize = (await source.stat()).size;
149+
150+
const provider = source.createReadStream({ highWaterMark: 50 });
151+
const backupingStream = await BackupingStream.create(backupDir, { highWaterMark: 100 });
152+
const consumer = output.createWriteStream();
153+
154+
const providerEnd = new Promise(res => { provider.on("end", res); });
155+
156+
let readLen = 0;
157+
let piped = true;
158+
let switchLenght = sourceSize / 5;
159+
160+
provider.on("data", (chunk) => {
161+
readLen += chunk.length;
162+
163+
if (readLen > switchLenght) {
164+
// eslint-disable-next-line no-unused-expressions
165+
piped ? backupingStream.unpipe() : backupingStream.pipe(consumer);
166+
piped = !piped;
167+
switchLenght += sourceSize / 5;
168+
}
169+
});
170+
171+
provider.pipe(backupingStream).pipe(consumer);
172+
173+
await providerEnd;
174+
await waitForFileToReachSize(output, sourceSize);
175+
176+
const [sourceBuff, outputBuff] = await Promise.all([source.readFile(), output.readFile()]);
177+
178+
const equals = sourceBuff.equals(outputBuff);
179+
180+
expect(equals).toBeTruthy();
181+
182+
await Promise.all([backupingStream.close(), source.close(), output.close()]);
183+
});

packages/host/test/serviceDiscovery/persistingStream.spec.ts

Lines changed: 0 additions & 84 deletions
This file was deleted.

0 commit comments

Comments
 (0)