@@ -14,6 +14,7 @@ use std::{
14
14
} ,
15
15
Arc ,
16
16
} ,
17
+ time:: Duration ,
17
18
} ;
18
19
19
20
use :: metrics:: Timer ;
@@ -24,7 +25,11 @@ use common::{
24
25
DocumentIndexKeys ,
25
26
} ,
26
27
errors:: report_error,
27
- knobs:: SUBSCRIPTIONS_WORKER_QUEUE_SIZE ,
28
+ knobs:: {
29
+ SUBSCRIPTIONS_WORKER_QUEUE_SIZE ,
30
+ SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER ,
31
+ SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD ,
32
+ } ,
28
33
runtime:: {
29
34
block_in_place,
30
35
Runtime ,
@@ -58,7 +63,10 @@ use tokio::sync::{
58
63
use value:: ResolvedDocumentId ;
59
64
60
65
use crate :: {
61
- metrics,
66
+ metrics:: {
67
+ self ,
68
+ log_subscriptions_invalidated,
69
+ } ,
62
70
reads:: ReadSet ,
63
71
write_log:: {
64
72
LogOwner ,
@@ -116,6 +124,20 @@ impl Drop for SubscriptionSender {
116
124
}
117
125
}
118
126
127
+ impl SubscriptionSender {
128
+ fn drop_with_delay ( self , delay : Option < Duration > ) {
129
+ self . valid_ts . store ( -1 , Ordering :: SeqCst ) ;
130
+ if let Some ( delay) = delay {
131
+ tokio:: spawn ( async move {
132
+ tokio:: time:: sleep ( delay) . await ;
133
+ _ = self . valid_tx . send ( SubscriptionState :: Invalid ) ;
134
+ } ) ;
135
+ } else {
136
+ _ = self . valid_tx . send ( SubscriptionState :: Invalid ) ;
137
+ }
138
+ }
139
+ }
140
+
119
141
enum SubscriptionRequest {
120
142
Subscribe {
121
143
token : Token ,
@@ -314,9 +336,27 @@ impl SubscriptionManager {
314
336
}
315
337
}
316
338
// Then, invalidate all the remaining subscriptions.
339
+ let num_subscriptions_invalidated = to_notify. len ( ) ;
340
+ let should_splay_invalidations =
341
+ num_subscriptions_invalidated > * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD ;
342
+ if should_splay_invalidations {
343
+ tracing:: info!(
344
+ "Splaying subscription invalidations since there are {} subscriptions to \
345
+ invalidate. The threshold is {}",
346
+ num_subscriptions_invalidated,
347
+ * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD
348
+ ) ;
349
+ }
317
350
for subscriber_id in to_notify {
318
- self . _remove ( subscriber_id) ;
351
+ let delay = should_splay_invalidations. then ( || {
352
+ Duration :: from_millis ( rand:: random_range (
353
+ 0 ..=num_subscriptions_invalidated as u64
354
+ * * SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER ,
355
+ ) )
356
+ } ) ;
357
+ self . _remove ( subscriber_id, delay) ;
319
358
}
359
+ log_subscriptions_invalidated ( num_subscriptions_invalidated) ;
320
360
321
361
assert ! ( self . processed_ts <= next_ts) ;
322
362
self . processed_ts = next_ts;
@@ -369,13 +409,14 @@ impl SubscriptionManager {
369
409
if self . get_subscriber ( key) . is_none ( ) {
370
410
return ;
371
411
}
372
- self . _remove ( key. id ) ;
412
+ self . _remove ( key. id , None ) ;
373
413
}
374
414
375
- fn _remove ( & mut self , id : SubscriberId ) {
415
+ fn _remove ( & mut self , id : SubscriberId , delay : Option < Duration > ) {
376
416
let entry = self . subscribers . remove ( id) ;
377
417
self . subscriptions . remove ( id, & entry. reads ) ;
378
418
// dropping `entry.sender` will invalidate the subscription
419
+ entry. sender . drop_with_delay ( delay) ;
379
420
}
380
421
}
381
422
@@ -779,7 +820,7 @@ mod tests {
779
820
. subscribe_for_testing ( token. clone ( ) )
780
821
. unwrap ( ) ;
781
822
subscriptions. push ( subscriber) ;
782
- subscription_manager. _remove ( id) ;
823
+ subscription_manager. _remove ( id, None ) ;
783
824
}
784
825
785
826
assert ! (
@@ -817,7 +858,7 @@ mod tests {
817
858
let ( _subscription, id) = subscription_manager
818
859
. subscribe_for_testing ( token. clone ( ) )
819
860
. unwrap ( ) ;
820
- subscription_manager. _remove ( id) ;
861
+ subscription_manager. _remove ( id, None ) ;
821
862
822
863
assert ! ( notify_subscribed_tokens(
823
864
& mut id_generator,
@@ -891,7 +932,7 @@ mod tests {
891
932
) ;
892
933
}
893
934
for ( _, id) in & subscriptions {
894
- subscription_manager. _remove( * id) ;
935
+ subscription_manager. _remove( * id, None ) ;
895
936
}
896
937
let notifications = notify_subscribed_tokens(
897
938
& mut id_generator,
@@ -916,7 +957,7 @@ mod tests {
916
957
for token in & tokens {
917
958
let ( _subscription, id) = subscription_manager
918
959
. subscribe_for_testing( token. clone( ) ) . unwrap( ) ;
919
- subscription_manager. _remove( id) ;
960
+ subscription_manager. _remove( id, None ) ;
920
961
}
921
962
let notifications = notify_subscribed_tokens(
922
963
& mut id_generator,
0 commit comments