@@ -13,12 +13,14 @@ import { isDeepStrictEqual } from 'util'
13
13
import { User } from './user.js'
14
14
import { createModuleLogger } from 'lib0/logging'
15
15
import toobusy from 'toobusy-js'
16
+ import { promiseWithResolvers } from './utils.js'
16
17
17
18
const logSocketIO = createModuleLogger ( '@y/socket-io/server' )
18
19
const PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-persist-interval' ) || '3000' )
19
20
const MAX_PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-max-persist-interval' ) || '30000' )
20
21
const REVALIDATE_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-server-revalidate-timeout' ) || '60000' )
21
22
const WORKER_DISABLED = env . getConf ( 'y-worker-disabled' ) === 'true'
23
+ const DEFAULT_CLEAR_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' ) || '30000' )
22
24
23
25
process . on ( 'SIGINT' , function ( ) {
24
26
// calling .shutdown allows your process to exit normally
@@ -137,11 +139,17 @@ export class YSocketIO {
137
139
*/
138
140
namespacePersistentMap = new Map ( )
139
141
/**
140
- * @type {Map<string, () => void> }
142
+ * @type {Map<string, { promise: Promise<void>, resolve: () => void } > }
141
143
* @private
142
144
* @readonly
143
145
*/
144
146
awaitingPersistMap = new Map ( )
147
+ /**
148
+ * @type {Map<string, NodeJS.Timeout> }
149
+ * @private
150
+ * @readonly
151
+ */
152
+ awaitingCleanupNamespace = new Map ( )
145
153
146
154
/**
147
155
* YSocketIO constructor.
@@ -213,6 +221,12 @@ export class YSocketIO {
213
221
'index' ,
214
222
redisPrefix
215
223
)
224
+ const prevAwaitCleanup = this . awaitingCleanupNamespace . get ( namespace )
225
+ if ( prevAwaitCleanup ) {
226
+ clearTimeout ( prevAwaitCleanup )
227
+ this . cleanupNamespace ( namespace , stream )
228
+ }
229
+
216
230
if ( ! this . namespaceMap . has ( namespace ) ) {
217
231
this . namespaceMap . set ( namespace , socket . nsp )
218
232
}
@@ -346,13 +360,9 @@ export class YSocketIO {
346
360
if ( ! ns ) continue
347
361
const nsp = this . namespaceMap . get ( ns )
348
362
if ( nsp ?. sockets . size === 0 && stream ) {
349
- this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
350
- this . namespaceStreamMap . delete ( ns )
351
- this . streamNamespaceMap . delete ( stream )
352
- this . namespaceMap . delete ( ns )
353
- this . namespaceDocMap . get ( ns ) ?. ydoc . destroy ( )
354
- this . namespaceDocMap . delete ( ns )
355
- this . namespacePersistentMap . delete ( ns )
363
+ this . cleanupNamespace ( ns , stream , DEFAULT_CLEAR_TIMEOUT )
364
+ const doc = this . namespaceDocMap . get ( ns )
365
+ if ( doc ) this . debouncedPersist ( ns , doc . ydoc , true )
356
366
}
357
367
}
358
368
} )
@@ -398,18 +408,13 @@ export class YSocketIO {
398
408
* @param {Array<Uint8Array> } messages
399
409
*/
400
410
redisMessageSubscriber = async ( stream , messages ) => {
411
+ console . log ( '[DEBUG]' , { stream, messages } )
401
412
const namespace = this . streamNamespaceMap . get ( stream )
402
413
if ( ! namespace ) return
403
414
const nsp = this . namespaceMap . get ( namespace )
404
415
if ( ! nsp ) return
405
416
if ( nsp . sockets . size === 0 && this . subscriber ) {
406
- this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
407
- this . namespaceStreamMap . delete ( namespace )
408
- this . streamNamespaceMap . delete ( stream )
409
- this . namespaceMap . delete ( namespace )
410
- this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
411
- this . namespaceDocMap . delete ( namespace )
412
- this . namespacePersistentMap . delete ( namespace )
417
+ this . cleanupNamespace ( namespace , stream , DEFAULT_CLEAR_TIMEOUT )
413
418
}
414
419
415
420
/** @type {Uint8Array[] } */
@@ -463,9 +468,9 @@ export class YSocketIO {
463
468
const lastPersistCalledAt = this . namespacePersistentMap . get ( namespace ) ?? 0
464
469
const now = Date . now ( )
465
470
const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL
466
- if ( changed || shouldPersist ) {
471
+ if ( changed || shouldPersist || nsp . sockets . size === 0 ) {
467
472
this . namespacePersistentMap . set ( namespace , now )
468
- this . debouncedPersist ( namespace , doc . ydoc )
473
+ this . debouncedPersist ( namespace , doc . ydoc , nsp . sockets . size === 0 )
469
474
}
470
475
this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
471
476
this . namespaceDocMap . set ( namespace , doc )
@@ -474,47 +479,50 @@ export class YSocketIO {
474
479
/**
475
480
* @param {string } namespace
476
481
* @param {Y.Doc } doc
482
+ * @param {boolean= } immediate
477
483
*/
478
- async debouncedPersist ( namespace , doc ) {
484
+ debouncedPersist ( namespace , doc , immediate = false ) {
479
485
this . debouncedPersistDocMap . set ( namespace , doc )
480
- if ( this . debouncedPersistMap . has ( namespace ) ) return
486
+ if ( this . debouncedPersistMap . has ( namespace ) ) {
487
+ if ( ! immediate ) return
488
+ clearTimeout ( this . debouncedPersistMap . get ( namespace ) || undefined )
489
+ }
490
+ const timeoutInterval = immediate
491
+ ? 0
492
+ : PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
481
493
const timeout = setTimeout (
482
494
async ( ) => {
483
495
try {
484
496
assert ( this . client )
485
497
const doc = this . debouncedPersistDocMap . get ( namespace )
486
498
logSocketIO ( `trying to persist ${ namespace } ` )
487
499
if ( ! doc ) return
488
- /** @type {Promise<void> | null } */
489
- let workerPromise = null
490
500
if ( this . client . persistWorker ) {
491
- workerPromise = new Promise ( ( resolve ) => {
492
- assert ( this . client ?. persistWorker )
493
- this . awaitingPersistMap . set ( namespace , resolve )
494
-
495
- const docState = Y . encodeStateAsUpdateV2 ( doc )
496
- const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
497
- buf . set ( docState )
498
- this . client . persistWorker . postMessage ( {
499
- room : namespace ,
500
- docstate : buf
501
- } )
501
+ /** @type { ReturnType<typeof promiseWithResolvers<void>> } */
502
+ const { promise , resolve } = promiseWithResolvers ( )
503
+ assert ( this . client ?. persistWorker )
504
+ this . awaitingPersistMap . set ( namespace , { promise , resolve } )
505
+
506
+ const docState = Y . encodeStateAsUpdateV2 ( doc )
507
+ const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
508
+ buf . set ( docState )
509
+ this . client . persistWorker . postMessage ( {
510
+ room : namespace ,
511
+ docstate : buf
502
512
} )
503
- if ( workerPromise ) {
504
- await workerPromise
505
- }
513
+ await promise
506
514
} else {
507
515
await this . client . store . persistDoc ( namespace , 'index' , doc )
508
516
}
509
- await this . client . trimRoomStream ( namespace , 'index' , true )
517
+ await this . client . trimRoomStream ( namespace , 'index' )
510
518
} catch ( e ) {
511
519
console . error ( e )
512
520
} finally {
513
521
this . debouncedPersistDocMap . delete ( namespace )
514
522
this . debouncedPersistMap . delete ( namespace )
515
523
}
516
524
} ,
517
- PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
525
+ timeoutInterval
518
526
)
519
527
520
528
this . debouncedPersistMap . set ( namespace , timeout )
@@ -608,7 +616,45 @@ export class YSocketIO {
608
616
registerPersistWorkerResolve ( ) {
609
617
if ( ! this . client ?. persistWorker ) return
610
618
this . client . persistWorker . on ( 'message' , ( { event, room } ) => {
611
- if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. ( )
619
+ if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. resolve ( )
612
620
} )
613
621
}
622
+
623
+ /**
624
+ * @param {string } namespace
625
+ * @param {string } stream
626
+ * @param {number= } removeAfterWait
627
+ */
628
+ cleanupNamespace ( namespace , stream , removeAfterWait ) {
629
+ if ( ! removeAfterWait ) {
630
+ this . awaitingCleanupNamespace . delete ( namespace )
631
+ return this . cleanupNamespaceImpl ( namespace , stream )
632
+ }
633
+ if ( this . awaitingCleanupNamespace . has ( namespace ) ) return
634
+
635
+ const timer = setTimeout ( async ( ) => {
636
+ const awaitingPersist = this . awaitingPersistMap . get ( namespace )
637
+ if ( awaitingPersist ) await awaitingPersist . promise
638
+ this . cleanupNamespaceImpl ( namespace , stream )
639
+ this . awaitingCleanupNamespace . delete ( namespace )
640
+ logSocketIO ( `no active connection, namespace: ${ namespace } cleared` )
641
+ } , removeAfterWait )
642
+ this . awaitingCleanupNamespace . set ( namespace , timer )
643
+ }
644
+
645
+ /**
646
+ * @param {string } namespace
647
+ * @param {string } stream
648
+ * @private
649
+ */
650
+ cleanupNamespaceImpl ( namespace , stream ) {
651
+ this . subscriber ?. unsubscribe ( stream , this . redisMessageSubscriber )
652
+ this . namespaceStreamMap . delete ( namespace )
653
+ this . streamNamespaceMap . delete ( stream )
654
+ this . namespaceMap . delete ( namespace )
655
+ this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
656
+ this . namespaceDocMap . delete ( namespace )
657
+ this . namespacePersistentMap . delete ( namespace )
658
+ this . client ?. trimRoomStream ( namespace , 'index' , true )
659
+ }
614
660
}
0 commit comments