@@ -322,69 +322,84 @@ impl SubscriptionManager {
322
322
let from_ts = self . processed_ts . succ ( ) ?;
323
323
324
324
let mut to_notify = BTreeSet :: new ( ) ;
325
- let mut buffer = IndexKeyBuffer :: new ( ) ;
326
- self . log . for_each ( from_ts, next_ts, |_, writes| {
327
- for ( _, document_change) in writes {
328
- // We're applying a mutation to the document so if it already exists
329
- // we need to remove it before writing the new version.
330
- if let Some ( ref old_document) = document_change. old_document {
331
- self . overlapping (
332
- old_document,
333
- & mut to_notify,
334
- self . persistence_version ,
335
- & mut buffer,
336
- ) ;
325
+ {
326
+ let _timer = metrics:: subscriptions_log_iterate_timer ( ) ;
327
+ let mut buffer = IndexKeyBuffer :: new ( ) ;
328
+ let mut log_len = 0 ;
329
+ let mut num_writes = 0 ;
330
+ self . log . for_each ( from_ts, next_ts, |_, writes| {
331
+ log_len += 1 ;
332
+ num_writes += writes. len ( ) ;
333
+ for ( _, document_change) in writes {
334
+ // We're applying a mutation to the document so if it already exists
335
+ // we need to remove it before writing the new version.
336
+ if let Some ( ref old_document) = document_change. old_document {
337
+ self . overlapping (
338
+ old_document,
339
+ & mut to_notify,
340
+ self . persistence_version ,
341
+ & mut buffer,
342
+ ) ;
343
+ }
344
+ // If we're doing anything other than deleting the document then
345
+ // we'll also need to insert a new value.
346
+ if let Some ( ref new_document) = document_change. new_document {
347
+ self . overlapping (
348
+ new_document,
349
+ & mut to_notify,
350
+ self . persistence_version ,
351
+ & mut buffer,
352
+ ) ;
353
+ }
337
354
}
338
- // If we're doing anything other than deleting the document then
339
- // we'll also need to insert a new value.
340
- if let Some ( ref new_document) = document_change. new_document {
341
- self . overlapping (
342
- new_document,
343
- & mut to_notify,
344
- self . persistence_version ,
345
- & mut buffer,
346
- ) ;
355
+ } ) ?;
356
+ metrics:: log_subscriptions_log_length ( log_len) ;
357
+ metrics:: log_subscriptions_log_writes ( num_writes) ;
358
+ }
359
+
360
+ {
361
+ let _timer = metrics:: subscriptions_invalidate_timer ( ) ;
362
+ // First, do a pass where we advance all of the valid subscriptions.
363
+ for ( subscriber_id, subscriber) in & mut self . subscribers {
364
+ if !to_notify. contains ( & subscriber_id) {
365
+ subscriber
366
+ . sender
367
+ . valid_ts
368
+ . store ( i64:: from ( next_ts) , Ordering :: SeqCst ) ;
347
369
}
348
370
}
349
- } ) ?;
350
-
351
- // First, do a pass where we advance all of the valid subscriptions.
352
- for ( subscriber_id, subscriber) in & mut self . subscribers {
353
- if !to_notify. contains ( & subscriber_id) {
354
- subscriber
355
- . sender
356
- . valid_ts
357
- . store ( i64:: from ( next_ts) , Ordering :: SeqCst ) ;
371
+ // Then, invalidate all the remaining subscriptions.
372
+ let num_subscriptions_invalidated = to_notify. len ( ) ;
373
+ let should_splay_invalidations =
374
+ num_subscriptions_invalidated > * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD ;
375
+ if should_splay_invalidations {
376
+ tracing:: info!(
377
+ "Splaying subscription invalidations since there are {} subscriptions to \
378
+ invalidate. The threshold is {}",
379
+ num_subscriptions_invalidated,
380
+ * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD
381
+ ) ;
358
382
}
359
- }
360
- // Then, invalidate all the remaining subscriptions.
361
- let num_subscriptions_invalidated = to_notify. len ( ) ;
362
- let should_splay_invalidations =
363
- num_subscriptions_invalidated > * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD ;
364
- if should_splay_invalidations {
365
- tracing:: info!(
366
- "Splaying subscription invalidations since there are {} subscriptions to \
367
- invalidate. The threshold is {}",
368
- num_subscriptions_invalidated,
369
- * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD
370
- ) ;
371
- }
372
- for subscriber_id in to_notify {
373
- let delay = should_splay_invalidations. then ( || {
374
- Duration :: from_millis ( rand:: random_range (
375
- 0 ..=num_subscriptions_invalidated as u64
376
- * * SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER ,
377
- ) )
378
- } ) ;
379
- self . _remove ( subscriber_id, delay) ;
380
- }
381
- log_subscriptions_invalidated ( num_subscriptions_invalidated) ;
383
+ for subscriber_id in to_notify {
384
+ let delay = should_splay_invalidations. then ( || {
385
+ Duration :: from_millis ( rand:: random_range (
386
+ 0 ..=num_subscriptions_invalidated as u64
387
+ * * SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER ,
388
+ ) )
389
+ } ) ;
390
+ self . _remove ( subscriber_id, delay) ;
391
+ }
392
+ log_subscriptions_invalidated ( num_subscriptions_invalidated) ;
382
393
383
- assert ! ( self . processed_ts <= next_ts) ;
384
- self . processed_ts = next_ts;
394
+ assert ! ( self . processed_ts <= next_ts) ;
395
+ self . processed_ts = next_ts;
396
+ }
385
397
386
398
// Enforce retention after we have processed the subscriptions.
387
- self . log . enforce_retention_policy ( next_ts) ;
399
+ {
400
+ let _timer = metrics:: subscriptions_log_enforce_retention_timer ( ) ;
401
+ self . log . enforce_retention_policy ( next_ts) ;
402
+ }
388
403
389
404
Ok ( ( ) )
390
405
} )
0 commit comments