Skip to content

Commit 272c9ab

Browse files
OlliVleo
authored andcommitted
Added support for stream pause and resume callbacks (#5)
* Implement stream pausing callback support * Update examples and add an example for pausing * Cleaned up the code
1 parent e03d53f commit 272c9ab

File tree

4 files changed

+58
-9
lines changed

4 files changed

+58
-9
lines changed

examples/first.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
#!/bin/sh
2-
// >&/dev/null;exec node --harmony_async_await $0 $@
1+
#!/usr/bin/env node
32

4-
const Sema = require('./index.js')
3+
const Sema = require('../index.js')
54

65
function getRnd (min, max) {
76
min = Math.ceil(min)

examples/second.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
#!/bin/sh
2-
// >&/dev/null;exec node --harmony_async_await $0 $@
1+
#!/usr/bin/env node
32

4-
const Sema = require('./index.js')
3+
const Sema = require('../index.js')
54
const redis = require('promise-redis')
65

76
async function f () {

examples/third.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/usr/bin/env node
2+
3+
// Usage ./third.js < VERY_LARGE_FILE
4+
5+
const Sema = require('../index.js')
6+
const readline = require('readline')
7+
8+
const rl = readline.createInterface({
9+
input: process.stdin,
10+
output: process.stdout,
11+
terminal: false
12+
})
13+
14+
function pause() {
15+
console.log('Pausing the stream')
16+
}
17+
18+
function resume() {
19+
console.log('Resuming the stream')
20+
}
21+
22+
const s = new Sema(5, { pauseFn: pause, resumeFn: resume })
23+
async function parse(line) {
24+
await s.v()
25+
26+
console.log(line)
27+
28+
s.p()
29+
}
30+
31+
rl.on('line', (line) => {
32+
parse(line).catch(console.error)
33+
})

index.js

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,33 @@ function defaultInit () {
1111
}
1212

1313
class Sema {
14-
constructor (nr, { initFn, capacity }) {
14+
constructor (nr, { initFn, pauseFn, resumeFn, capacity }) {
1515
initFn = initFn || defaultInit
1616
capacity = capacity || 10
1717

18+
if (pauseFn ^ resumeFn) {
19+
throw new Error('pauseFn and resumeFn must be both set for pausing')
20+
}
21+
1822
this.nrTokens = nr
1923
this.free = new Deque(nr)
2024
this.waiting = new Deque(capacity)
2125
this.releaseEmitter = new ReleaseEmitter()
2226
this.noTokens = initFn === defaultInit
27+
this.pauseFn = pauseFn
28+
this.resumeFn = resumeFn
2329

2430
this.releaseEmitter.on('release', (token) => {
2531
const p = this.waiting.shift()
2632
if (p) {
2733
p.resolve(token)
2834
} else {
29-
this.free.push(token)
35+
if (this.resumeFn && this.paused) {
36+
this.paused = false
37+
this.resumeFn()
38+
}
39+
40+
this.free.push(token)
3041
}
3142
})
3243

@@ -37,10 +48,17 @@ class Sema {
3748

3849
async v () {
3950
let token = this.free.pop()
40-
if (token)
51+
52+
if (token) {
4153
return token
54+
}
4255

4356
return new Promise((resolve, reject) => {
57+
if (this.pauseFn && !this.paused) {
58+
this.paused = true
59+
this.pauseFn()
60+
}
61+
4462
this.waiting.push({ resolve, reject })
4563
})
4664
}

0 commit comments

Comments
 (0)