Skip to content

Commit c1d038a

Browse files
authored
Merge pull request #23 from hackmdio/expr/disable-worker
expr/disable worker
2 parents 8741079 + b9d1882 commit c1d038a

File tree

9 files changed

+431
-60
lines changed

9 files changed

+431
-60
lines changed

package.json

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"test-db": "docker-compose -f ./docker-compose.test.yaml up",
1616
"dist": "tsup",
1717
"lint": "standard && tsc",
18+
"lint:fix": "standard --fix && tsc",
1819
"test": "dotenvx run --env-file=.env -- node tests/index.js",
1920
"test-inspect": "dotenvx run --env-file=.env -- node --inspect-brk tests/index.js",
2021
"preversion": "npm run lint && npm run dist",
@@ -65,6 +66,7 @@
6566
"redis": "^4.6.12",
6667
"socket.io": "^4.7.5",
6768
"socket.io-client": "^4.8.0",
69+
"toobusy-js": "^0.5.1",
6870
"y-protocols": "^1.0.6",
6971
"yjs": "^13.6.18"
7072
},
@@ -80,6 +82,7 @@
8082
"@dotenvx/dotenvx": "^1.14.0",
8183
"@redis/client": "^1.6.0",
8284
"@types/node": "^20.11.5",
85+
"@types/toobusy-js": "^0.5.4",
8386
"@types/ws": "^8.5.10",
8487
"concurrently": "^8.2.2",
8588
"standard": "^17.1.0",

pnpm-lock.yaml

+17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/api.js

+45-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ let ydocUpdateCallback = env.getConf('ydoc-update-callback')
1919
if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') {
2020
ydocUpdateCallback += '/'
2121
}
22+
const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'
2223

2324
/**
2425
* @param {string} a
@@ -117,20 +118,27 @@ export class Api {
117118
this.redisWorkerGroupName = this.prefix + ':worker'
118119
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
119120
this._destroyed = false
121+
/** @type {import('worker_threads').Worker | null} */
122+
this.persistWorker = null
123+
124+
const addScript = WORKER_DISABLED
125+
? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])'
126+
: `
127+
if redis.call("EXISTS", KEYS[1]) == 0 then
128+
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
129+
elseif redis.call("XLEN", KEYS[1]) > 100 then
130+
redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1])
131+
end
132+
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
133+
`
134+
120135
this.redis = redis.createClient({
121136
url,
122137
// scripting: https://github.com/redis/node-redis/#lua-scripts
123138
scripts: {
124139
addMessage: redis.defineScript({
125140
NUMBER_OF_KEYS: 1,
126-
SCRIPT: `
127-
if redis.call("EXISTS", KEYS[1]) == 0 then
128-
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
129-
elseif redis.call("XLEN", KEYS[1]) > 100 then
130-
redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1])
131-
end
132-
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
133-
`,
141+
SCRIPT: addScript,
134142
/**
135143
* @param {string} key
136144
* @param {Buffer} message
@@ -265,6 +273,35 @@ export class Api {
265273
}
266274
}
267275

276+
/**
277+
* @param {string} room
278+
* @param {string} docid
279+
*/
280+
async getRedisLastId (room, docid) {
281+
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
282+
const docMessages = ms.get(room)?.get(docid) || null
283+
return docMessages?.lastId.toString() || '0'
284+
}
285+
286+
/**
287+
* @param {string} room
288+
* @param {string} docid
289+
* @param {boolean} [remove=false]
290+
*/
291+
async trimRoomStream (room, docid, remove = false) {
292+
const roomName = computeRedisRoomStreamName(room, docid, this.prefix)
293+
const redisLastId = await this.getRedisLastId(room, docid)
294+
const lastId = number.parseInt(redisLastId.split('-')[0])
295+
if (remove) {
296+
await this.redis.del(roomName)
297+
} else {
298+
await this.redis.multi()
299+
.xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime)
300+
.xDelIfEmpty(roomName)
301+
.exec()
302+
}
303+
}
304+
268305
/**
269306
* @param {Object} opts
270307
* @param {number} [opts.blockTime]

src/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export * from './server.js'
33
export * from './storage.js'
44
export * from './api.js'
55
export * from './subscriber.js'
6+
export * from './persist-worker-thread.js'
67
export * from './y-socket-io/index.js'

src/persist-worker-thread.js

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import * as Y from 'yjs'
2+
import * as logging from 'lib0/logging'
3+
import { isMainThread, parentPort } from 'worker_threads'
4+
5+
export class PersistWorkerThread {
6+
/**
7+
* @private
8+
* @readonly
9+
*/
10+
log = logging.createModuleLogger('@y/persist-worker-thread')
11+
12+
/**
13+
* @param {import('./storage.js').AbstractStorage} store
14+
*/
15+
constructor (store) {
16+
if (isMainThread) {
17+
this.log('persist worker cannot run on main thread')
18+
return
19+
}
20+
this.store = store
21+
parentPort?.on('message', this.persist)
22+
}
23+
24+
/**
25+
* @param {{ room: string, docstate: SharedArrayBuffer }} props
26+
*/
27+
persist = async ({ room, docstate }) => {
28+
this.log(`persisting ${room} in worker`)
29+
const state = new Uint8Array(docstate)
30+
const doc = new Y.Doc()
31+
Y.applyUpdateV2(doc, state)
32+
await this.store?.persistDoc(room, 'index', doc)
33+
doc.destroy()
34+
parentPort?.postMessage({ event: 'persisted', room })
35+
}
36+
}
37+
38+
/**
39+
* @param {import('./storage.js').AbstractStorage} store
40+
*/
41+
export function createPersistWorkerThread (store) {
42+
if (isMainThread) throw new Error('cannot create persist worker in main thread')
43+
return new PersistWorkerThread(store)
44+
}

src/socketio.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ class YSocketIOServer {
3737
* @param {string} [conf.redisPrefix]
3838
* @param {string} [conf.redisUrl]
3939
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
40+
* @param {import('worker_threads').Worker=} [conf.persistWorker]
4041
*/
41-
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix }) => {
42+
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => {
4243
const app = new YSocketIO(io, { authenticate })
43-
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix })
44+
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
4445
return new YSocketIOServer(app, client, subscriber)
4546
}

src/y-socket-io/client.js

+2
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ export class SocketIOProvider extends Observable {
351351
* @readonly
352352
*/
353353
onSocketDisconnection = (event) => {
354+
if (event === 'io server disconnect') this.socket.connect()
355+
354356
this.emit('connection-close', [event, this])
355357
this.synced = false
356358
AwarenessProtocol.removeAwarenessStates(

src/y-socket-io/utils.js

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/**
2+
* Basically Promise.withResolvers()
3+
* @template T
4+
* @see https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers
5+
*/
6+
export function promiseWithResolvers () {
7+
/** @type {(value: T | PromiseLike<T>) => void} */
8+
let res = () => {}
9+
/** @type {(reason?: Error) => void} */
10+
let rej = () => {}
11+
/** @type {Promise<T>} */
12+
const promise = new Promise((resolve, reject) => {
13+
res = resolve
14+
rej = reject
15+
})
16+
return { promise, resolve: res, reject: rej }
17+
}

0 commit comments

Comments
 (0)