Skip to content

Commit 7c2fe7b

Browse files
authored
feat: async iterate readable streams, iterate function (#30)
- async iterator on all bluestream readable streams - `iterate(stream)` any readable stream BREAKING CHANGE: dropped the default export
1 parent de32329 commit 7c2fe7b

13 files changed

+1038
-809
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ cache:
55
notifications:
66
email: false
77
node_js:
8+
- '10'
89
- '9'
910
- '8'
1011
after_success:

lib/index.ts

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { collect } from './collect'
22
import { filter, FilterStream } from './filter'
3+
import { iterate } from './iterate'
34
import { pipe } from './pipe'
45
import { IReadableStreamOptions, read, readFunction, ReadStream } from './read'
56
import { readAsync } from './readAsync'
@@ -29,24 +30,7 @@ export {
2930
WriteStream,
3031
collect,
3132
filter,
32-
map,
33-
pipe,
34-
read,
35-
readAsync,
36-
reduce,
37-
transform,
38-
wait,
39-
write,
40-
}
41-
42-
export default {
43-
FilterStream,
44-
ReadStream,
45-
ReduceStream,
46-
TransformStream,
47-
WriteStream,
48-
collect,
49-
filter,
33+
iterate,
5034
map,
5135
pipe,
5236
read,

lib/iterate.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { Readable } from 'stream'
2+
import { readAsync } from '.'
3+
4+
if ((Symbol as any).asyncIterator === undefined) {
5+
((Symbol as any).asyncIterator) = Symbol.for('asyncIterator')
6+
}
7+
8+
async function* iterateObjectMode (stream) {
9+
let data = true
10+
while (data) {
11+
data = await readAsync(stream, 1)
12+
if (data) {
13+
yield data[0]
14+
}
15+
}
16+
}
17+
18+
async function* iterateBufferMode (stream) {
19+
let data = true
20+
while (data) {
21+
data = await readAsync(stream)
22+
if (data) {
23+
yield data
24+
}
25+
}
26+
}
27+
28+
export function internalIterator (stream: Readable) {
29+
const readableState = (stream as any)._readableState
30+
const objectMode = readableState && readableState.objectMode
31+
if (objectMode) {
32+
return iterateObjectMode(stream)
33+
}
34+
return iterateBufferMode(stream)
35+
}
36+
37+
export function iterate (stream: Readable) {
38+
if (stream[(Symbol as any).asyncIterator]) {
39+
return stream[(Symbol as any).asyncIterator]()
40+
}
41+
const objectMode = (stream as any)._readableState.objectMode
42+
if (objectMode) {
43+
return iterateObjectMode(stream)
44+
}
45+
return iterateBufferMode(stream)
46+
}

lib/read.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Readable, ReadableOptions } from 'stream'
22
import { IBluestream } from './interfaces'
3+
import { internalIterator } from './iterate'
4+
import { readAsync } from './readAsync'
35
import { defer, maybeResume } from './utils'
46

57
async function readHandler (bytes) {
@@ -113,5 +115,9 @@ export class ReadStream extends Readable implements IBluestream {
113115
}
114116
}
115117

118+
ReadStream.prototype[(Symbol as any).asyncIterator] = function () {
119+
return internalIterator(this)
120+
}
121+
116122
export const read =
117123
(opts: IReadableStreamOptions | readFunction = {}, readFn?: readFunction) => new ReadStream(opts, readFn)

lib/readAsync.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ const readOnceAsync = async (stream: Readable, count?: number) => {
66
if (data !== null) {
77
return data
88
}
9+
if ((stream as any)._readableState.ended) {
10+
return null
11+
}
912
return new Promise(resolve => {
1013
stream.once('readable', () => {
1114
const nextData = stream.read(count)
@@ -17,17 +20,10 @@ const readOnceAsync = async (stream: Readable, count?: number) => {
1720
})
1821
}
1922

20-
export const readAsync = async (stream, count) => {
21-
if (!(stream && stream._readableState)) {
22-
throw new TypeError('"stream" is not a readable stream')
23-
}
24-
if (stream._readableState.flowing) {
25-
// tslint:disable-next-line
26-
throw new TypeError('"stream" is in flowing mode, this is probably not what you want as data loss could occur. Please use stream.pause() to pause the stream before calling readAsync.');
27-
}
28-
29-
const objectMode = stream._readableState.objectMode
23+
const internalReadAsync = async (stream: Readable, count?: number) => {
3024
const { resolve, reject, promise } = defer()
25+
const readableState = (stream as any)._readableState
26+
const objectMode = readableState && readableState.objectMode
3127

3228
const cleanup = () => {
3329
stream.removeListener('error', reject)
@@ -57,3 +53,24 @@ export const readAsync = async (stream, count) => {
5753
}
5854
return promise
5955
}
56+
57+
const inflightReads = new WeakMap()
58+
export const readAsync = async (stream: Readable, count?: number) => {
59+
if (!(stream && (stream as any)._readableState)) {
60+
throw new TypeError('"stream" is not a readable stream')
61+
}
62+
if ((stream as any)._readableState.flowing) {
63+
// tslint:disable-next-line
64+
throw new TypeError('"stream" is in flowing mode, this is probably not what you want as data loss could occur. Please use stream.pause() to pause the stream before calling readAsync.');
65+
}
66+
67+
const inflightRead = inflightReads.get(stream)
68+
if (inflightRead) {
69+
const queuedRead = inflightRead.then(() => internalReadAsync(stream, count))
70+
inflightReads.set(stream, queuedRead)
71+
return queuedRead
72+
}
73+
const readOperation = internalReadAsync(stream, count)
74+
inflightReads.set(stream, readOperation)
75+
return readOperation
76+
}

0 commit comments

Comments
 (0)