Skip to content

Commit 424b0a9

Browse files
committed
complete change in approach
- necessitated a rewrite, but many bugs with small binary streams fixed - added tests
1 parent 543a8ac commit 424b0a9

File tree

6 files changed

+192
-140
lines changed

6 files changed

+192
-140
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# buffer-peek-stream changelog
22

3+
## 0.2.0 (2014/11/12)
4+
5+
- rewrite using a completely different approach because 0.1.x approach was too fragile and often
6+
broke with edge cases
7+
- api backwards incompatible with 0.1 - will bump to 1.0.0 only once api is stable
8+
39
## 0.1.2 (2014/11/07)
410

511
- fix data being emitted out of order on a large peeked stream

buffer-peek-stream.js

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
var stream = require('stream');
2+
var util = require('util');
3+
4+
function peek(source, bytes, callback) {
5+
if (!callback) return peek(source, undefined, bytes);
6+
7+
var dest = new BufferPeekStream({peekBytes: bytes});
8+
9+
dest.once('peek', function (buffer) {
10+
callback(null, buffer, dest);
11+
});
12+
13+
return source.pipe(dest);
14+
}
15+
peek.BufferPeekStream = BufferPeekStream;
16+
17+
module.exports = peek;
18+
19+
20+
function BufferPeekStream(opts) {
21+
if (!opts) opts = {};
22+
23+
opts.highWaterMark = opts.peekBytes || 65536;
24+
25+
stream.Transform.call(this, opts);
26+
27+
this._peekState = {
28+
buffer: [],
29+
bytes: 0,
30+
maxBytes: opts.peekBytes || 65536,
31+
peeked: false
32+
};
33+
}
34+
35+
util.inherits(BufferPeekStream, stream.Transform);
36+
37+
38+
BufferPeekStream.prototype._transform = function _transform(chunk, enc, callback) {
39+
var state = this._peekState;
40+
41+
// buffer incoming chunks until we have enough for our peek
42+
state.buffer.push(chunk);
43+
state.bytes += chunk.length;
44+
45+
// get more?
46+
if (state.bytes >= state.maxBytes) _peek(this, callback);
47+
else callback();
48+
};
49+
50+
51+
BufferPeekStream.prototype._flush = function _flush(callback) {
52+
if (this._peekState.peeked) callback();
53+
else _peek(this, callback);
54+
};
55+
56+
57+
function _peek(stream, callback) {
58+
var state = stream._peekState;
59+
60+
var buffer = Buffer.concat(state.buffer);
61+
62+
// emit exactly the number of bytes we wanted to peek
63+
stream.emit('peek', buffer.slice(0, state.maxBytes));
64+
65+
stream.push(buffer);
66+
67+
state.buffer = null;
68+
state.bytes = null;
69+
state.peeked = true;
70+
71+
stream._transform = passthrough;
72+
73+
callback();
74+
}
75+
76+
function passthrough(chunk, enc, callback) {
77+
this.push(chunk);
78+
callback();
79+
}

lib/buffer-peek-stream.js

-95
This file was deleted.

lib/index.js

-40
This file was deleted.

package.json

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
{
22
"name": "buffer-peek-stream",
3-
"version": "0.1.3",
3+
"version": "0.2.0",
44
"description": "Transform stream that lets you inspect the start of a ReadStream before deciding what to do with it",
5-
"main": "lib/index.js",
5+
"main": "buffer-peek-stream.js",
66
"scripts": {
7-
"test": "mocha test.js"
7+
"test": "node_modules/mocha/bin/mocha test.js"
88
},
99
"repository": {
1010
"type": "git",
1111
"url": "https://github.com/seangarner/node-buffer-peek-stream.git"
1212
},
1313
"keywords": [
1414
"stream",
15+
"streams",
1516
"peek",
1617
"parse",
1718
"buffer",
@@ -24,8 +25,11 @@
2425
},
2526
"homepage": "https://github.com/seangarner/node-buffer-peek-stream",
2627
"devDependencies": {
27-
"chai": "^1.9.2",
28+
"chai": "^1.10.0",
29+
"concat-stream": "^1.4.6",
30+
"dev-null-stream": "0.0.1",
2831
"mocha": "^2.0.1",
29-
"randomstring": "^1.0.3"
32+
"randstream": "^0.3.2",
33+
"truncate-stream": "^1.0.1"
3034
}
3135
}

test.js

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
var fs = require('fs');
2+
var expect = require('chai').expect;
3+
var RandStream = require('randstream');
4+
var TruncateStream = require('truncate-stream');
5+
var DevNullStream = require('dev-null-stream');
6+
var concat = require('concat-stream');
7+
8+
var peek = require('./buffer-peek-stream');
9+
10+
11+
function make(size, mode) {
12+
return (new RandStream({mode: mode || 'pseudo'})).pipe(new TruncateStream({maxBytes: size}));
13+
}
14+
15+
describe('peek', function() {
16+
this.timeout(250);
17+
18+
it('should callback with a buffer', function (done) {
19+
var source = make(50000);
20+
peek(source, 50000, function (err, buffer) {
21+
if (err) return done(err);
22+
expect(buffer).to.be.an.instanceof(Buffer);
23+
done();
24+
}).pipe(new DevNullStream());
25+
});
26+
27+
it('should callback with exactly the number of bytes requested', function (done) {
28+
peek(make(50000), 1000, function (err, buffer) {
29+
if (err) return done(err);
30+
expect(buffer).to.have.lengthOf(1000);
31+
done();
32+
});
33+
});
34+
35+
it('should callback with all bytes when peeking more than is available', function (done) {
36+
peek(make(1000), 5000, function (err, buffer) {
37+
if (err) return done(err);
38+
expect(buffer).to.have.lengthOf(1000);
39+
done();
40+
});
41+
});
42+
43+
it('should callback with a stream which receives all bytes', function (done) {
44+
var source = make(50000);
45+
peek(source, 1000, function (err, buffer, stream) {
46+
if (err) return done(err);
47+
stream.pipe(concat(function (data) {
48+
expect(data).to.have.lengthOf(50000);
49+
done();
50+
}));
51+
});
52+
});
53+
54+
it('should return a stream which receives all bytes', function (done) {
55+
peek(make(5000), 1000, function () {}).pipe(concat(function (data) {
56+
expect(data).to.have.lengthOf(5000);
57+
done();
58+
}));
59+
});
60+
61+
it('should return the same stream as it calls back', function (done) {
62+
var res = peek(make(5000), 1000, function (err, buffer, stream) {
63+
if (err) return done(err);
64+
expect(stream).to.equal(res);
65+
done();
66+
});
67+
});
68+
69+
it('should peek 65536 bytes by default', function (done) {
70+
peek(make(100000), function (err, buffer) {
71+
if (err) return done(err);
72+
expect(buffer).to.have.lengthOf(65536);
73+
done();
74+
});
75+
});
76+
77+
it('should work when peeked more once in a pipeline', function (done) {
78+
peek(make(100000), 50000, function (err, first, stream) {
79+
if (err) return done(err);
80+
expect(first).to.have.lengthOf(50000);
81+
peek(stream, 40000, function (err, second, stream) {
82+
if (err) return done(err);
83+
expect(second).to.have.lengthOf(40000);
84+
expect(second).to.eql(first.slice(0, 40000));
85+
stream.pipe(concat(function (data) {
86+
expect(data).to.have.lengthOf(100000);
87+
expect(first).to.eql(data.slice(0, 50000));
88+
expect(second).to.eql(data.slice(0, 40000));
89+
done();
90+
}));
91+
});
92+
});
93+
});
94+
95+
//TODO: peeking inside gzip data (transform)
96+
97+
//TODO: peeking a http response
98+
});

0 commit comments

Comments
 (0)