Skip to content

Commit 40c2953

Browse files
committed
chore: working on chapter 13
1 parent 1b145a7 commit 40c2953

File tree

8 files changed

+328
-0
lines changed

8 files changed

+328
-0
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# 08-task-distribution-redis-streams
2+
3+
This sample demonstrates how to distribute tasks to a set of remote workers
4+
using Redis Streams.
5+
6+
## Dependencies
7+
8+
As a pre-requisite to this sample, you first need to install
9+
[Redis](http://redis.io/download) and have it running locally on its default
10+
port.
11+
12+
If you have docker installed you can easily run an ephemeral redis instance
13+
locally with:
14+
15+
```bash
16+
docker run -it -p 6379:6379 redis redis-server --appendonly yes
17+
```
18+
19+
This example requires you to install some third-party dependencies from npm.
20+
21+
If you have `pnpm` installed, you can do that with:
22+
23+
```bash
24+
pnpm install
25+
```
26+
27+
Alternatively, if you prefer to use another package manager, make sure to delete
28+
the `pnpm-lock.yaml` file before using it.
29+
30+
If you want to use `npm`, you can run:
31+
32+
```bash
33+
npm install
34+
```
35+
36+
If you want to use `yarn`, you can run:
37+
38+
```bash
39+
yarn install
40+
```
41+
42+
## Run
43+
44+
To run the various components, run in different terminals the following
45+
commands:
46+
47+
```bash
48+
node worker.js workerA # runs a worker that will process tasks
49+
node worker.js workerB # runs a second worker that will process tasks (you can run as many as you want but make sure to use a different name for each worker)
50+
node collector.js # runs a collector that will collect results from the workers
51+
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b # runs a producer that will send tasks to the workers
52+
```
53+
54+
> [!TIP] If you want to test other hashes, you can generate them with the
55+
> following code:
56+
>
57+
> ```js
58+
> import { createHash } from "node:crypto";
59+
> console.log(createHash("sha1").update("your-string-here").digest("hex"));
60+
> ```
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import Redis from 'ioredis' // v5.6.1
2+
3+
const redisClient = new Redis()
4+
5+
let lastRecordId = '$'
6+
while (true) {
7+
const data = await redisClient.xread(
8+
'BLOCK',
9+
'0',
10+
'STREAMS',
11+
'results_stream',
12+
lastRecordId
13+
)
14+
for (const [, logs] of data) {
15+
for (const [recordId, [, message]] of logs) {
16+
console.log(`Message from worker: ${message}`)
17+
lastRecordId = recordId
18+
}
19+
}
20+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export function* generateTasks(searchHash, alphabet, maxWordLength, batchSize) {
2+
const alphabetLength = BigInt(alphabet.length)
3+
const maxWordLengthBigInt = BigInt(maxWordLength)
4+
let nVariations = 0n
5+
for (let n = 1n; n <= maxWordLengthBigInt; n++) {
6+
nVariations += alphabetLength ** n
7+
}
8+
console.log(
9+
`Finding the hashsum source string over ${nVariations} possible variations`
10+
)
11+
12+
let batchStart = 1n
13+
while (batchStart <= nVariations) {
14+
const expectedBatchSize = batchStart + BigInt(batchSize) - 1n
15+
const batchEnd =
16+
expectedBatchSize > nVariations ? nVariations : expectedBatchSize
17+
yield JSON.stringify({
18+
searchHash,
19+
alphabet: alphabet,
20+
// convert BigInt to string for JSON serialization
21+
batchStart: batchStart.toString(),
22+
batchEnd: batchEnd.toString(),
23+
})
24+
25+
batchStart = batchEnd + 1n
26+
}
27+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "08-task-distribution-redis-streams",
3+
"version": "1.0.0",
4+
"description": "This sample demonstrates how to distribute tasks to a set of remote workers using Redis Streams",
5+
"type": "module",
6+
"scripts": {},
7+
"engines": {
8+
"node": ">=24"
9+
},
10+
"engineStrict": true,
11+
"keywords": [],
12+
"author": "Luciano Mammino and Mario Casciaro",
13+
"license": "MIT",
14+
"dependencies": {
15+
"indexed-string-variation": "^2.1.0",
16+
"ioredis": "^5.6.1"
17+
}
18+
}

13-messaging-and-integration-patterns/08-task-distribution-redis-streams/pnpm-lock.yaml

Lines changed: 108 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { createHash } from 'node:crypto'
2+
import isv from 'indexed-string-variation' // v2.0.1
3+
4+
export function processTask(task) {
5+
const strings = isv({
6+
alphabet: task.alphabet,
7+
from: BigInt(task.batchStart),
8+
to: BigInt(task.batchEnd),
9+
})
10+
11+
let first
12+
let last
13+
for (const string of strings) {
14+
if (!first) {
15+
first = string
16+
}
17+
18+
const digest = createHash('sha1').update(string).digest('hex')
19+
20+
if (digest === task.searchHash) {
21+
console.log(`>> Found: ${string} => ${digest}`)
22+
return string
23+
}
24+
last = string
25+
}
26+
console.log(
27+
`Processed ${first}..${last} (${task.batchStart}..${task.batchEnd})`
28+
)
29+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import Redis from 'ioredis' // v5.6.1
2+
import { generateTasks } from './generateTasks.js'
3+
4+
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
5+
const BATCH_SIZE = 10000
6+
7+
const [, , maxLength, searchHash] = process.argv
8+
9+
const redisClient = new Redis()
10+
11+
const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
12+
for (const task of generatorObj) {
13+
console.log(`Sending task: ${task}`)
14+
await redisClient.xadd('tasks_stream', '*', 'task', task)
15+
}
16+
17+
redisClient.disconnect()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import Redis from 'ioredis' // v5.6.1
2+
import { processTask } from './processTask.js'
3+
4+
const redisClient = new Redis()
5+
const [, , consumerName] = process.argv
6+
7+
await redisClient
8+
.xgroup('CREATE', 'tasks_stream', 'workers_group', '$', 'MKSTREAM')
9+
.catch(() => console.log('Consumer group already exists'))
10+
11+
const [[, records]] = await redisClient.xreadgroup(
12+
'GROUP',
13+
'workers_group',
14+
consumerName,
15+
'STREAMS',
16+
'tasks_stream',
17+
'0'
18+
)
19+
for (const [recordId, [, rawTask]] of records) {
20+
await processAndAck(recordId, rawTask)
21+
}
22+
23+
while (true) {
24+
const [[, records]] = await redisClient.xreadgroup(
25+
'GROUP',
26+
'workers_group',
27+
consumerName,
28+
'BLOCK',
29+
'0',
30+
'COUNT',
31+
'1',
32+
'STREAMS',
33+
'tasks_stream',
34+
'>'
35+
)
36+
for (const [recordId, [, rawTask]] of records) {
37+
await processAndAck(recordId, rawTask)
38+
}
39+
}
40+
41+
async function processAndAck(recordId, rawTask) {
42+
const found = processTask(JSON.parse(rawTask))
43+
if (found) {
44+
console.log(`Found! => ${found}`)
45+
await redisClient.xadd('results_stream', '*', 'result', `Found: ${found}`)
46+
}
47+
48+
await redisClient.xack('tasks_stream', 'workers_group', recordId)
49+
}

0 commit comments

Comments
 (0)