Skip to content

expr/disable worker #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 23, 2025
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"test-db": "docker-compose -f ./docker-compose.test.yaml up",
"dist": "tsup",
"lint": "standard && tsc",
"lint:fix": "standard --fix && tsc",
"test": "dotenvx run --env-file=.env -- node tests/index.js",
"test-inspect": "dotenvx run --env-file=.env -- node --inspect-brk tests/index.js",
"preversion": "npm run lint && npm run dist",
Expand Down Expand Up @@ -65,6 +66,7 @@
"redis": "^4.6.12",
"socket.io": "^4.7.5",
"socket.io-client": "^4.8.0",
"toobusy-js": "^0.5.1",
"y-protocols": "^1.0.6",
"yjs": "^13.6.18"
},
Expand All @@ -80,6 +82,7 @@
"@dotenvx/dotenvx": "^1.14.0",
"@redis/client": "^1.6.0",
"@types/node": "^20.11.5",
"@types/toobusy-js": "^0.5.4",
"@types/ws": "^8.5.10",
"concurrently": "^8.2.2",
"standard": "^17.1.0",
Expand Down
17 changes: 17 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 45 additions & 8 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ let ydocUpdateCallback = env.getConf('ydoc-update-callback')
if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') {
ydocUpdateCallback += '/'
}
const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'

/**
* @param {string} a
Expand Down Expand Up @@ -117,20 +118,27 @@ export class Api {
this.redisWorkerGroupName = this.prefix + ':worker'
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
this._destroyed = false
/** @type {import('worker_threads').Worker | null} */
this.persistWorker = null

const addScript = WORKER_DISABLED
? 'redis.call("XADD", KEYS[1], "*", "m", ARGV[1])'
: `
if redis.call("EXISTS", KEYS[1]) == 0 then
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
elseif redis.call("XLEN", KEYS[1]) > 100 then
redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1])
end
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
`

this.redis = redis.createClient({
url,
// scripting: https://github.com/redis/node-redis/#lua-scripts
scripts: {
addMessage: redis.defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
if redis.call("EXISTS", KEYS[1]) == 0 then
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
elseif redis.call("XLEN", KEYS[1]) > 100 then
redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1])
end
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
`,
SCRIPT: addScript,
/**
* @param {string} key
* @param {Buffer} message
Expand Down Expand Up @@ -265,6 +273,35 @@ export class Api {
}
}

/**
* @param {string} room
* @param {string} docid
*/
async getRedisLastId (room, docid) {
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
const docMessages = ms.get(room)?.get(docid) || null
return docMessages?.lastId.toString() || '0'
}

/**
* @param {string} room
* @param {string} docid
* @param {boolean} [remove=false]
*/
async trimRoomStream (room, docid, remove = false) {
const roomName = computeRedisRoomStreamName(room, docid, this.prefix)
const redisLastId = await this.getRedisLastId(room, docid)
const lastId = number.parseInt(redisLastId.split('-')[0])
if (remove) {
await this.redis.del(roomName)
} else {
await this.redis.multi()
.xTrim(roomName, 'MINID', lastId - this.redisMinMessageLifetime)
.xDelIfEmpty(roomName)
.exec()
}
}

/**
* @param {Object} opts
* @param {number} [opts.blockTime]
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export * from './server.js'
export * from './storage.js'
export * from './api.js'
export * from './subscriber.js'
export * from './persist-worker-thread.js'
export * from './y-socket-io/index.js'
44 changes: 44 additions & 0 deletions src/persist-worker-thread.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import * as Y from 'yjs'
import * as logging from 'lib0/logging'
import { isMainThread, parentPort } from 'worker_threads'

export class PersistWorkerThread {
/**
* @private
* @readonly
*/
log = logging.createModuleLogger('@y/persist-worker-thread')

/**
* @param {import('./storage.js').AbstractStorage} store
*/
constructor (store) {
if (isMainThread) {
this.log('persist worker cannot run on main thread')
return
}
this.store = store
parentPort?.on('message', this.persist)
}

/**
* @param {{ room: string, docstate: SharedArrayBuffer }} props
*/
persist = async ({ room, docstate }) => {
this.log(`persisting ${room} in worker`)
const state = new Uint8Array(docstate)
const doc = new Y.Doc()
Y.applyUpdateV2(doc, state)
await this.store?.persistDoc(room, 'index', doc)
doc.destroy()
parentPort?.postMessage({ event: 'persisted', room })
}
}

/**
* @param {import('./storage.js').AbstractStorage} store
*/
export function createPersistWorkerThread (store) {
if (isMainThread) throw new Error('cannot create persist worker in main thread')
return new PersistWorkerThread(store)
}
5 changes: 3 additions & 2 deletions src/socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class YSocketIOServer {
* @param {string} [conf.redisPrefix]
* @param {string} [conf.redisUrl]
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
* @param {import('worker_threads').Worker=} [conf.persistWorker]
*/
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix }) => {
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => {
const app = new YSocketIO(io, { authenticate })
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix })
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
return new YSocketIOServer(app, client, subscriber)
}
2 changes: 2 additions & 0 deletions src/y-socket-io/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ export class SocketIOProvider extends Observable {
* @readonly
*/
onSocketDisconnection = (event) => {
if (event === 'io server disconnect') this.socket.connect()

this.emit('connection-close', [event, this])
this.synced = false
AwarenessProtocol.removeAwarenessStates(
Expand Down
17 changes: 17 additions & 0 deletions src/y-socket-io/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Basically Promise.withResolvers()
* @template T
* @see https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers
*/
export function promiseWithResolvers () {
/** @type {(value: T | PromiseLike<T>) => void} */
let res = () => {}
/** @type {(reason?: Error) => void} */
let rej = () => {}
/** @type {Promise<T>} */
const promise = new Promise((resolve, reject) => {
res = resolve
rej = reject
})
return { promise, resolve: res, reject: rej }
}
Loading
Loading