Skip to content

Commit f6e31c5

Browse files
committed
feat: chapter 11
1 parent 1ff12b8 commit f6e31c5

File tree

11 files changed

+351
-0
lines changed

11 files changed

+351
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# 09-cpu-bound
2+
3+
This sample demonstrates how to run a CPU-bound task in Node.js using
4+
setImmediate or multiple processes.
5+
6+
## Run
7+
8+
To start the server, run:
9+
10+
```bash
11+
node index.js
12+
```
13+
14+
Then try to send one or more request to trigger the subset sum task:
15+
16+
```shell script
17+
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]" --data-urlencode "sum=0"
18+
```
19+
20+
While a subset sum task is running, you can check the responsiveness of the
21+
server with a command like this:
22+
23+
```shell script
24+
curl -G http://localhost:8000
25+
```
26+
27+
If the subset sum takes too long on your machine or is too fast for checking the
28+
responsiveness of the server, try to reduce or increase the number of items of
29+
the array given as input.
30+
31+
In the file `index.js`, try to swap between the various implementations of the
32+
`SubsetSum` API to compare the differences in the responsiveness of the server.
33+
34+
```js
35+
// index.js
36+
37+
// ...
38+
//import { SubsetSum } from './subsetSum.js'
39+
//import { SubsetSum } from './subsetSumDefer.js'
40+
//import { SubsetSum } from './subsetSumFork.js'
41+
//import { SubsetSum } from './subsetSumThreads.js'
42+
```
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { createServer } from 'node:http'
2+
import { SubsetSum } from './subsetSum.js'
3+
// import { SubsetSum } from './subsetSumDefer.js'
4+
// import { SubsetSum } from './subsetSumFork.js'
5+
// import { SubsetSum } from './subsetSumThreads.js'
6+
7+
createServer((req, res) => {
8+
const url = new URL(req.url, 'http://localhost')
9+
if (url.pathname !== '/subsetSum') {
10+
res.writeHead(200)
11+
return res.end("I'm alive!\n")
12+
}
13+
14+
const data = JSON.parse(url.searchParams.get('data'))
15+
const sum = JSON.parse(url.searchParams.get('sum'))
16+
res.writeHead(200)
17+
const subsetSum = new SubsetSum(sum, data)
18+
subsetSum.on('match', match => {
19+
res.cork()
20+
res.write(`Match: ${JSON.stringify(match)}\n`)
21+
res.uncork()
22+
})
23+
subsetSum.on('end', () => res.end())
24+
subsetSum.start()
25+
}).listen(8000, () => console.log('Server started'))
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"name": "09-cpu-bound",
3+
"version": "1.0.0",
4+
"description": "This sample demonstrates how to run a CPU-bound task in Node.js using setImmediate or multiple processes.",
5+
"type": "module",
6+
"scripts": {},
7+
"engines": {
8+
"node": ">=23"
9+
},
10+
"engineStrict": true,
11+
"keywords": [],
12+
"author": "Luciano Mammino and Mario Casciaro",
13+
"license": "MIT",
14+
"dependencies": {}
15+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { fork } from 'node:child_process'
2+
3+
export class ProcessPool {
4+
constructor(file, poolMax) {
5+
this.file = file
6+
this.poolMax = poolMax
7+
this.pool = []
8+
this.active = []
9+
this.waiting = []
10+
}
11+
12+
acquire() {
13+
return new Promise((resolve, reject) => {
14+
let worker
15+
if (this.pool.length > 0) {
16+
worker = this.pool.pop()
17+
this.active.push(worker)
18+
return resolve(worker)
19+
}
20+
21+
if (this.active.length >= this.poolMax) {
22+
return this.waiting.push({ resolve, reject })
23+
}
24+
25+
worker = fork(this.file)
26+
worker.once('message', message => {
27+
if (message === 'ready') {
28+
this.active.push(worker)
29+
return resolve(worker)
30+
}
31+
worker.kill()
32+
reject(new Error('Improper process start'))
33+
})
34+
worker.once('exit', code => {
35+
console.log(`Worker exited with code ${code}`)
36+
this.active = this.active.filter(w => worker !== w)
37+
this.pool = this.pool.filter(w => worker !== w)
38+
})
39+
})
40+
}
41+
42+
release(worker) {
43+
if (this.waiting.length > 0) {
44+
const { resolve } = this.waiting.shift()
45+
return resolve(worker)
46+
}
47+
this.active = this.active.filter(w => worker !== w)
48+
this.pool.push(worker)
49+
}
50+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { EventEmitter } from 'node:events'
2+
3+
export class SubsetSum extends EventEmitter {
4+
constructor(sum, set) {
5+
super()
6+
this.sum = sum
7+
this.set = set
8+
this.totalSubsets = 0
9+
}
10+
11+
_combine(set, subset) {
12+
for (let i = 0; i < set.length; i++) {
13+
const newSubset = subset.concat(set[i])
14+
this._combine(set.slice(i + 1), newSubset)
15+
this._processSubset(newSubset)
16+
}
17+
}
18+
19+
_processSubset(subset) {
20+
console.log('Subset', ++this.totalSubsets, subset)
21+
const res = subset.reduce((prev, item) => prev + item, 0)
22+
if (res === this.sum) {
23+
this.emit('match', subset)
24+
}
25+
}
26+
27+
start() {
28+
this._combine(this.set, [])
29+
this.emit('end')
30+
}
31+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { EventEmitter } from 'node:events'
2+
3+
export class SubsetSum extends EventEmitter {
4+
constructor(sum, set) {
5+
super()
6+
this.sum = sum
7+
this.set = set
8+
this.totalSubsets = 0
9+
}
10+
11+
_combineInterleaved(set, subset) {
12+
this.runningCombine++
13+
setImmediate(() => {
14+
this._combine(set, subset)
15+
if (--this.runningCombine === 0) {
16+
this.emit('end')
17+
}
18+
})
19+
}
20+
21+
_combine(set, subset) {
22+
for (let i = 0; i < set.length; i++) {
23+
const newSubset = subset.concat(set[i])
24+
this._combineInterleaved(set.slice(i + 1), newSubset)
25+
this._processSubset(newSubset)
26+
}
27+
}
28+
29+
_processSubset(subset) {
30+
console.log('Subset', ++this.totalSubsets, subset)
31+
const res = subset.reduce((prev, item) => prev + item, 0)
32+
if (res === this.sum) {
33+
this.emit('match', subset)
34+
}
35+
}
36+
37+
start() {
38+
this.runningCombine = 0
39+
this._combineInterleaved(this.set, [])
40+
}
41+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { EventEmitter } from 'node:events'
2+
import { join } from 'node:path'
3+
import { ProcessPool } from './processPool.js'
4+
5+
const workerFile = join(
6+
import.meta.dirname,
7+
'workers',
8+
'subsetSumProcessWorker.js'
9+
)
10+
const workers = new ProcessPool(workerFile, 2)
11+
12+
export class SubsetSum extends EventEmitter {
13+
constructor(sum, set) {
14+
super()
15+
this.sum = sum
16+
this.set = set
17+
}
18+
19+
async start() {
20+
const worker = await workers.acquire()
21+
worker.send({ sum: this.sum, set: this.set })
22+
23+
const onMessage = msg => {
24+
if (msg.event === 'end') {
25+
worker.removeListener('message', onMessage)
26+
workers.release(worker)
27+
}
28+
29+
this.emit(msg.event, msg.data)
30+
}
31+
32+
worker.on('message', onMessage)
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { EventEmitter } from 'node:events'
2+
import { join } from 'node:path'
3+
import { ThreadPool } from './threadPool.js'
4+
5+
const workerFile = join(
6+
import.meta.dirname,
7+
'workers',
8+
'subsetSumThreadWorker.js'
9+
)
10+
const workers = new ThreadPool(workerFile, 2)
11+
12+
export class SubsetSum extends EventEmitter {
13+
constructor(sum, set) {
14+
super()
15+
this.sum = sum
16+
this.set = set
17+
}
18+
19+
async start() {
20+
const worker = await workers.acquire()
21+
worker.postMessage({ sum: this.sum, set: this.set })
22+
23+
const onMessage = msg => {
24+
if (msg.event === 'end') {
25+
worker.removeListener('message', onMessage)
26+
workers.release(worker)
27+
}
28+
29+
this.emit(msg.event, msg.data)
30+
}
31+
32+
worker.on('message', onMessage)
33+
}
34+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { Worker } from 'node:worker_threads'
2+
3+
export class ThreadPool {
4+
constructor(file, poolMax) {
5+
this.file = file
6+
this.poolMax = poolMax
7+
this.pool = []
8+
this.active = []
9+
this.waiting = []
10+
}
11+
12+
acquire() {
13+
return new Promise((resolve, reject) => {
14+
let worker
15+
if (this.pool.length > 0) {
16+
worker = this.pool.pop()
17+
this.active.push(worker)
18+
return resolve(worker)
19+
}
20+
21+
if (this.active.length >= this.poolMax) {
22+
return this.waiting.push({ resolve, reject })
23+
}
24+
25+
worker = new Worker(this.file)
26+
worker.once('online', () => {
27+
this.active.push(worker)
28+
resolve(worker)
29+
})
30+
worker.once('exit', code => {
31+
console.log(`Worker exited with code ${code}`)
32+
this.active = this.active.filter(w => worker !== w)
33+
this.pool = this.pool.filter(w => worker !== w)
34+
})
35+
})
36+
}
37+
38+
release(worker) {
39+
if (this.waiting.length > 0) {
40+
const { resolve } = this.waiting.shift()
41+
return resolve(worker)
42+
}
43+
this.active = this.active.filter(w => worker !== w)
44+
this.pool.push(worker)
45+
}
46+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { SubsetSum } from '../subsetSum.js'
2+
3+
process.on('message', msg => {
4+
const subsetSum = new SubsetSum(msg.sum, msg.set)
5+
6+
subsetSum.on('match', data => {
7+
process.send({ event: 'match', data: data })
8+
})
9+
10+
subsetSum.on('end', data => {
11+
process.send({ event: 'end', data: data })
12+
})
13+
14+
subsetSum.start()
15+
})
16+
17+
process.send('ready')

0 commit comments

Comments
 (0)