@@ -128,12 +128,6 @@ export class YSocketIO {
128
128
* @readonly
129
129
*/
130
130
debouncedPersistMap = new Map ( )
131
- /**
132
- * @type {Map<string, Y.Doc> }
133
- * @private
134
- * @readonly
135
- */
136
- debouncedPersistDocMap = new Map ( )
137
131
/**
138
132
* @type {Map<string, number> }
139
133
* @private
@@ -373,8 +367,7 @@ export class YSocketIO {
373
367
const nsp = this . namespaceMap . get ( ns )
374
368
if ( nsp ?. sockets . size === 0 && stream ) {
375
369
this . cleanupNamespace ( ns , stream , DEFAULT_CLEAR_TIMEOUT )
376
- const doc = this . namespaceDocMap . get ( ns )
377
- if ( doc ) this . debouncedPersist ( ns , doc . ydoc , true )
370
+ if ( this . namespaceDocMap . has ( ns ) ) this . debouncedPersist ( ns , true )
378
371
}
379
372
}
380
373
} )
@@ -492,19 +485,17 @@ export class YSocketIO {
492
485
const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL
493
486
if ( changed || shouldPersist || nsp . sockets . size === 0 ) {
494
487
this . namespacePersistentMap . set ( namespace , now )
495
- this . debouncedPersist ( namespace , doc . ydoc , nsp . sockets . size === 0 )
488
+ this . debouncedPersist ( namespace , nsp . sockets . size === 0 )
496
489
}
497
490
this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
498
491
this . namespaceDocMap . set ( namespace , doc )
499
492
}
500
493
501
494
/**
502
495
* @param {string } namespace
503
- * @param {Y.Doc } doc
504
496
* @param {boolean= } immediate
505
497
*/
506
- debouncedPersist ( namespace , doc , immediate = false ) {
507
- this . debouncedPersistDocMap . set ( namespace , doc )
498
+ debouncedPersist ( namespace , immediate = false ) {
508
499
if ( this . debouncedPersistMap . has ( namespace ) ) {
509
500
if ( ! immediate ) return
510
501
clearTimeout ( this . debouncedPersistMap . get ( namespace ) || undefined )
@@ -514,9 +505,17 @@ export class YSocketIO {
514
505
: PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
515
506
const timeout = setTimeout (
516
507
async ( ) => {
508
+ // wait for previous persisting operation if exists
509
+ const prev = this . awaitingPersistMap . get ( namespace )
510
+ if ( prev ?. promise ) await prev . promise
511
+ // delete persist entry to allow queueing next persisting operation
512
+ // we can delete it here because the following until awaiting persist
513
+ // are all synchronize operations
514
+ this . debouncedPersistMap . delete ( namespace )
515
+
517
516
try {
518
517
assert ( this . client )
519
- const doc = this . debouncedPersistDocMap . get ( namespace )
518
+ const doc = this . namespaceDocMap . get ( namespace ) ?. ydoc
520
519
logSocketIO ( `trying to persist ${ namespace } ` )
521
520
if ( ! doc ) return
522
521
if ( this . client . persistWorker ) {
@@ -534,15 +533,14 @@ export class YSocketIO {
534
533
} )
535
534
await promise
536
535
} else {
537
- await this . client . store . persistDoc ( namespace , 'index' , doc )
536
+ const promise = this . client . store . persistDoc ( namespace , 'index' , doc )
537
+ this . awaitingPersistMap . set ( namespace , { promise, resolve : ( ) => { } } )
538
+ await promise
538
539
}
539
540
540
541
await this . client . trimRoomStream ( namespace , 'index' )
541
542
} catch ( e ) {
542
543
console . error ( e )
543
- } finally {
544
- this . debouncedPersistDocMap . delete ( namespace )
545
- this . debouncedPersistMap . delete ( namespace )
546
544
}
547
545
} ,
548
546
timeoutInterval
0 commit comments