Skip to content

Commit ff91ee6

Browse files
authored
Merge pull request #22 from hackmdio/feat/add-enable-awareness-options
feat: add `enableAwareness` option
2 parents c1d038a + 941f3a2 commit ff91ee6

File tree

5 files changed

+140
-82
lines changed

5 files changed

+140
-82
lines changed

src/api.js

+16-8
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ const decodeRedisRoomStreamName = (rediskey, expectedPrefix) => {
8484

8585
/**
8686
* @param {import('./storage.js').AbstractStorage} store
87-
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
87+
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
8888
*/
89-
export const createApiClient = async (store, { redisPrefix, redisUrl }) => {
90-
const a = new Api(store, redisPrefix, redisUrl)
89+
export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwareness = true }) => {
90+
const a = new Api(store, redisPrefix, redisUrl, { enableAwareness })
9191
await a.redis.connect()
9292
try {
9393
await a.redis.xGroupCreate(a.redisWorkerStreamName, a.redisWorkerGroupName, '0', { MKSTREAM: true })
@@ -100,10 +100,13 @@ export class Api {
100100
* @param {import('./storage.js').AbstractStorage} store
101101
* @param {string=} prefix
102102
* @param {string=} url
103+
* @param {Object} opts
104+
* @param {boolean=} opts.enableAwareness
103105
*/
104-
constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis')) {
106+
constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis'), { enableAwareness = true } = {}) {
105107
this.store = store
106108
this.prefix = prefix
109+
this.enableAwareness = enableAwareness
107110
this.consumername = random.uuidv4()
108111
/**
109112
* After this timeout, a new worker will pick up the task
@@ -240,8 +243,11 @@ export class Api {
240243
if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`)
241244
const docstate = await this.store.retrieveDoc(room, docid)
242245
const ydoc = new Y.Doc()
243-
const awareness = new awarenessProtocol.Awareness(ydoc)
244-
awareness.setLocalState(null) // we don't want to propagate awareness state
246+
let awareness = null
247+
if (this.enableAwareness) {
248+
awareness = new awarenessProtocol.Awareness(ydoc)
249+
awareness.setLocalState(null) // we don't want to propagate awareness state
250+
}
245251
const now = performance.now()
246252
if (docstate) { Y.applyUpdateV2(ydoc, docstate.doc) }
247253
let changed = false
@@ -257,7 +263,9 @@ export class Api {
257263
break
258264
}
259265
case 1: { // awareness message
260-
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null)
266+
if (this.enableAwareness && awareness) {
267+
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null)
268+
}
261269
break
262270
}
263271
}
@@ -394,7 +402,7 @@ export class Api {
394402

395403
/**
396404
* @param {import('./storage.js').AbstractStorage} store
397-
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
405+
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
398406
*/
399407
export const createWorker = async (store, opts) => {
400408
const a = await createApiClient(store, opts)

src/socketio.js

+9-2
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,16 @@ class YSocketIOServer {
3838
* @param {string} [conf.redisUrl]
3939
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
4040
* @param {import('worker_threads').Worker=} [conf.persistWorker]
41+
* @param {boolean} [conf.enableAwareness]
4142
*/
42-
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => {
43-
const app = new YSocketIO(io, { authenticate })
43+
export const registerYSocketIOServer = async (io, store, {
44+
authenticate,
45+
redisUrl,
46+
redisPrefix,
47+
persistWorker,
48+
enableAwareness = true
49+
}) => {
50+
const app = new YSocketIO(io, { authenticate, enableAwareness })
4451
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
4552
return new YSocketIOServer(app, client, subscriber)
4653
}

src/subscriber.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const run = async subscriber => {
3232

3333
/**
3434
* @param {import('./storage.js').AbstractStorage} store
35-
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
35+
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
3636
*/
3737
export const createSubscriber = async (store, opts) => {
3838
const client = await api.createApiClient(store, opts)

src/y-socket-io/client.js

+94-61
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import { io } from 'socket.io-client'
2323
* @prop {boolean=} autoConnect
2424
* (Optional) This boolean specify if the provider should connect when the instance is created, by default is true
2525
*
26+
* @prop {boolean=} enableAwareness
27+
* (Optional) This boolean enable the awareness functionality, by default is true
28+
*
2629
* @prop {AwarenessProtocol.Awareness=} awareness
2730
* (Optional) An existent awareness, by default is a new AwarenessProtocol.Awareness instance
2831
*
@@ -73,9 +76,14 @@ export class SocketIOProvider extends Observable {
7376
* @public
7477
*/
7578
doc
79+
/**
80+
* Enable awareness
81+
* @type {boolean}
82+
*/
83+
enableAwareness
7684
/**
7785
* The awareness
78-
* @type {AwarenessProtocol.Awareness}
86+
* @type {AwarenessProtocol.Awareness=}
7987
* @public
8088
*/
8189
awareness
@@ -126,7 +134,8 @@ export class SocketIOProvider extends Observable {
126134
doc = new Y.Doc(),
127135
{
128136
autoConnect = true,
129-
awareness = new AwarenessProtocol.Awareness(doc),
137+
enableAwareness = true,
138+
awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined,
130139
resyncInterval = -1,
131140
disableBc = false,
132141
auth = {}
@@ -140,6 +149,8 @@ export class SocketIOProvider extends Observable {
140149
this._url = url
141150
this.roomName = roomName
142151
this.doc = doc
152+
153+
this.enableAwareness = enableAwareness
143154
this.awareness = awareness
144155

145156
this._broadcastChannel = `${url}/${roomName}`
@@ -167,12 +178,13 @@ export class SocketIOProvider extends Observable {
167178

168179
this.initSyncListeners()
169180

170-
this.initAwarenessListeners()
181+
if (this.enableAwareness) {
182+
this.initAwarenessListeners()
183+
awareness?.on('update', this.awarenessUpdate)
184+
}
171185

172186
this.initSystemListeners()
173187

174-
awareness.on('update', this.awarenessUpdate)
175-
176188
if (autoConnect) this.connect()
177189
}
178190

@@ -260,6 +272,8 @@ export class SocketIOProvider extends Observable {
260272
*/
261273
initAwarenessListeners = () => {
262274
this.socket.on('awareness-update', (/** @type {ArrayBuffer} */ update) => {
275+
if (!this.awareness) return
276+
263277
AwarenessProtocol.applyAwarenessUpdate(
264278
this.awareness,
265279
new Uint8Array(update),
@@ -310,7 +324,7 @@ export class SocketIOProvider extends Observable {
310324
Y.applyUpdate(this.doc, new Uint8Array(update), this)
311325
}
312326
)
313-
if (this.awareness.getLocalState() !== null) {
327+
if (this.enableAwareness && this.awareness && this.awareness.getLocalState() !== null) {
314328
this.socket.emit(
315329
'awareness-update',
316330
AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [
@@ -355,13 +369,15 @@ export class SocketIOProvider extends Observable {
355369

356370
this.emit('connection-close', [event, this])
357371
this.synced = false
358-
AwarenessProtocol.removeAwarenessStates(
359-
this.awareness,
360-
Array.from(this.awareness.getStates().keys()).filter(
361-
(client) => client !== this.doc.clientID
362-
),
363-
this
364-
)
372+
if (this.enableAwareness && this.awareness) {
373+
AwarenessProtocol.removeAwarenessStates(
374+
this.awareness,
375+
Array.from(this.awareness.getStates().keys()).filter(
376+
(client) => client !== this.doc.clientID
377+
),
378+
this
379+
)
380+
}
365381
this.emit('status', [{ status: 'disconnected' }])
366382
}
367383

@@ -382,8 +398,10 @@ export class SocketIOProvider extends Observable {
382398
if (this.resyncInterval != null) clearInterval(this.resyncInterval)
383399
this.disconnect()
384400
if (typeof window !== 'undefined') { window.removeEventListener('beforeunload', this.beforeUnloadHandler) } else if (typeof process !== 'undefined') { process.off('exit', this.beforeUnloadHandler) }
385-
this.awareness.off('update', this.awarenessUpdate)
386-
this.awareness.destroy()
401+
if (this.enableAwareness) {
402+
this.awareness?.off('update', this.awarenessUpdate)
403+
this.awareness?.destroy()
404+
}
387405
this.doc.off('update', this.onUpdateDoc)
388406
super.destroy()
389407
}
@@ -429,6 +447,8 @@ export class SocketIOProvider extends Observable {
429447
* @readonly
430448
*/
431449
awarenessUpdate = ({ added, updated, removed }, origin) => {
450+
if (!this.awareness) return
451+
432452
const changedClients = added.concat(updated).concat(removed)
433453
this.socket.emit(
434454
'awareness-update',
@@ -457,6 +477,8 @@ export class SocketIOProvider extends Observable {
457477
* @readonly
458478
*/
459479
beforeUnloadHandler = () => {
480+
if (!this.enableAwareness || !this.awareness) return
481+
460482
AwarenessProtocol.removeAwarenessStates(
461483
this.awareness,
462484
[this.doc.clientID],
@@ -485,21 +507,24 @@ export class SocketIOProvider extends Observable {
485507
{ type: 'sync-step-2', data: Y.encodeStateAsUpdate(this.doc) },
486508
this
487509
)
488-
bc.publish(
489-
this._broadcastChannel,
490-
{ type: 'query-awareness', data: null },
491-
this
492-
)
493-
bc.publish(
494-
this._broadcastChannel,
495-
{
496-
type: 'awareness-update',
497-
data: AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [
498-
this.doc.clientID
499-
])
500-
},
501-
this
502-
)
510+
511+
if (this.enableAwareness && this.awareness) {
512+
bc.publish(
513+
this._broadcastChannel,
514+
{ type: 'query-awareness', data: null },
515+
this
516+
)
517+
bc.publish(
518+
this._broadcastChannel,
519+
{
520+
type: 'awareness-update',
521+
data: AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [
522+
this.doc.clientID
523+
])
524+
},
525+
this
526+
)
527+
}
503528
}
504529

505530
/**
@@ -509,18 +534,20 @@ export class SocketIOProvider extends Observable {
509534
* @readonly
510535
*/
511536
disconnectBc = () => {
512-
bc.publish(
513-
this._broadcastChannel,
514-
{
515-
type: 'awareness-update',
516-
data: AwarenessProtocol.encodeAwarenessUpdate(
517-
this.awareness,
518-
[this.doc.clientID],
519-
new Map()
520-
)
521-
},
522-
this
523-
)
537+
if (this.enableAwareness && this.awareness) {
538+
bc.publish(
539+
this._broadcastChannel,
540+
{
541+
type: 'awareness-update',
542+
data: AwarenessProtocol.encodeAwarenessUpdate(
543+
this.awareness,
544+
[this.doc.clientID],
545+
new Map()
546+
)
547+
},
548+
this
549+
)
550+
}
524551
if (this.bcconnected) {
525552
bc.unsubscribe(this._broadcastChannel, this.onBroadcastChannelMessage)
526553
this.bcconnected = false
@@ -556,27 +583,33 @@ export class SocketIOProvider extends Observable {
556583
Y.applyUpdate(this.doc, new Uint8Array(message.data), this)
557584
break
558585

559-
case 'query-awareness':
560-
bc.publish(
561-
this._broadcastChannel,
562-
{
563-
type: 'awareness-update',
564-
data: AwarenessProtocol.encodeAwarenessUpdate(
565-
this.awareness,
566-
Array.from(this.awareness.getStates().keys())
567-
)
568-
},
569-
this
570-
)
586+
case 'query-awareness': {
587+
if (this.enableAwareness && this.awareness) {
588+
bc.publish(
589+
this._broadcastChannel,
590+
{
591+
type: 'awareness-update',
592+
data: AwarenessProtocol.encodeAwarenessUpdate(
593+
this.awareness,
594+
Array.from(this.awareness.getStates().keys())
595+
)
596+
},
597+
this
598+
)
599+
}
571600
break
572-
573-
case 'awareness-update':
574-
AwarenessProtocol.applyAwarenessUpdate(
575-
this.awareness,
576-
new Uint8Array(message.data),
577-
this
578-
)
601+
}
602+
603+
case 'awareness-update': {
604+
if (this.enableAwareness && this.awareness) {
605+
AwarenessProtocol.applyAwarenessUpdate(
606+
this.awareness,
607+
new Uint8Array(message.data),
608+
this
609+
)
610+
}
579611
break
612+
}
580613

581614
default:
582615
break

0 commit comments

Comments
 (0)