@@ -28,7 +28,6 @@ function BufferPeekStream(bytes) {
2828util . inherits ( BufferPeekStream , stream . Transform ) ;
2929module . exports = BufferPeekStream ;
3030
31-
3231BufferPeekStream . prototype . _transform = function _transform ( chunk , enc , callback ) {
3332 // buffer incoming chunks until we have enough for our peek
3433 this . __buffer . push ( chunk ) ;
@@ -56,34 +55,41 @@ BufferPeekStream.prototype.__stopPeeking = function __stopPeeking() {
5655 // unpipe from upstream
5756 this . __src . unpipe ( this ) ;
5857
59- //TODO: find out if this stream can still get data after unpiping under any circumstance
60-
61- var buffer = Buffer . concat ( this . __buffer ) ;
62- var source = this . __src ;
63-
64- // push exactly the number of bytes we wanted to peek
65- this . push ( buffer . slice ( 0 , this . __peekBytes ) ) ;
66- this . push ( null ) ;
67-
68- if ( source . _readableState . ended ) {
58+ if ( this . __src . _readableState . ended ) {
6959 // if the source has ended then we need to modify its state so it'll start flowing again when we
7060 // unshift the data back on. these settings were naively obtained by creating a file read
7161 // stream of a tiny file then leaving it hang for a timeout of 5s and dumping the state
72- source . readable = true ;
73- source . _readableState . ended = true ;
74- source . _readableState . endEmitted = false ;
75- source . _readableState . ranOut = false ;
76- source . _readableState . reading = false ;
77- source . _readableState . calledRead = true ;
78- source . _readableState . sync = false ;
79- source . _readableState . needReadable = false ;
80- source . _readableState . emittedReadable = true ;
81- source . _readableState . readableListening = false ;
82- source . _readableState . readingMore = false ;
62+ this . __src . readable = true ;
63+ this . __src . _readableState . ended = false ;
64+ this . __src . _readableState . endEmitted = false ;
65+ this . __src . _readableState . ranOut = false ;
66+ this . __src . _readableState . reading = false ;
67+ this . __src . _readableState . calledRead = true ;
68+ this . __src . _readableState . sync = false ;
69+ this . __src . _readableState . needReadable = false ;
70+ this . __src . _readableState . emittedReadable = true ;
71+ this . __src . _readableState . readableListening = false ;
72+ this . __src . _readableState . readingMore = false ;
73+
74+ // flush before end is called on this stream
75+ flush . call ( this ) ;
76+ } else {
77+
78+ // flush once we've emptied this streams buffer through the transform function
79+ this . once ( 'drain' , flush . bind ( this ) ) ;
8380 }
8481
85- source . unshift ( buffer ) ;
82+ function flush ( ) {
83+ var buffer = Buffer . concat ( this . __buffer ) ;
8684
87- // we don't need to keep the original buffers
88- this . __buffer = null ;
85+ // push exactly the number of bytes we wanted to peek
86+ this . push ( buffer . slice ( 0 , this . __peekBytes ) ) ;
87+
88+ // put the whole buffer back onto the start of the origin stream
89+ this . __src . unshift ( buffer ) ;
90+ this . __src = null ;
91+ this . __buffer = null ;
92+
93+ this . push ( null ) ;
94+ }
8995} ;
0 commit comments