Skip to content

Commit 5d94fa5

Browse files
goffrieConvex, Inc.
authored andcommitted
Record a metric for staleness of query invalidations (#39205)
This PR plumbs the timestamp of the invalidating write, if known, all the way through `extend_validity`. Then, in the sync worker, we can measure the real time delay between the write and the subscribed re-executing query. We can monitor this to make sure it is a reasonable value. GitOrigin-RevId: 2210b600a96656f489e685553a49f11abfda7fcd
1 parent 52acb66 commit 5d94fa5

File tree

13 files changed

+266
-110
lines changed

13 files changed

+266
-110
lines changed

crates/application/src/api.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -575,14 +575,25 @@ impl<RT: Runtime> SubscriptionClient for ApplicationSubscriptionClient<RT> {
575575
}
576576
}
577577

578+
pub enum ExtendValidityResult {
579+
/// The subscription's validity can be extended to the requested timestamp.
580+
Extended,
581+
/// The subscription may no longer be valid at the requested timestamp.
582+
/// This result can be returned spuriously even if there were no conflicting
583+
/// writes.
584+
Invalid {
585+
/// The earliest conflicting timestamp, if known. This is not guaranteed
586+
/// to be known.
587+
invalid_ts: Option<Timestamp>,
588+
},
589+
}
590+
578591
#[async_trait]
579592
pub trait SubscriptionTrait: Send + Sync {
580593
fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<()>>;
581594

582-
// Returns true if the subscription validity can be extended to new_ts. Note
583-
// that extend_validity might return false even if the subscription can be
584-
// extended, but will never return true if it can't.
585-
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<bool>;
595+
/// See comments on [`ExtendValidityResult`]
596+
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<ExtendValidityResult>;
586597
}
587598

588599
struct ApplicationSubscription {
@@ -602,10 +613,10 @@ impl SubscriptionTrait for ApplicationSubscription {
602613
}
603614

604615
#[fastrace::trace]
605-
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<bool> {
616+
async fn extend_validity(&self, new_ts: Timestamp) -> anyhow::Result<ExtendValidityResult> {
606617
if new_ts < self.initial_ts {
607618
// new_ts is before the initial subscription timestamp.
608-
return Ok(false);
619+
return Ok(ExtendValidityResult::Invalid { invalid_ts: None });
609620
}
610621

611622
// The inner subscription is periodically updated by the subscription
@@ -614,15 +625,19 @@ impl SubscriptionTrait for ApplicationSubscription {
614625
// Subscription is no longer valid. We could check validity from end_ts
615626
// to new_ts, but this is likely to fail and is potentially unbounded amount of
616627
// work, so we return false here. This is valid per the function contract.
617-
return Ok(false);
628+
return Ok(ExtendValidityResult::Invalid {
629+
invalid_ts: self.inner.invalid_ts(),
630+
});
618631
};
619632

620633
let current_token = Token::new(self.reads.clone(), current_ts);
621-
let Some(_new_token) = self.log.refresh_token(current_token, new_ts)? else {
622-
// Subscription validity can't be extended. Note that returning false
623-
// here also doesn't mean there is a conflict.
624-
return Ok(false);
625-
};
626-
return Ok(true);
634+
Ok(match self.log.refresh_token(current_token, new_ts)? {
635+
Ok(_new_token) => ExtendValidityResult::Extended,
636+
Err(invalid_ts) => {
637+
// Subscription validity can't be extended. Note that returning false
638+
// here also doesn't mean there is a conflict.
639+
ExtendValidityResult::Invalid { invalid_ts }
640+
},
641+
})
627642
}
628643
}

crates/application/src/cache/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -665,8 +665,8 @@ impl<RT: Runtime> CacheManager<RT> {
665665
return Ok(None);
666666
}
667667
result.token = match self.database.refresh_token(result.token, ts).await? {
668-
Some(t) => t,
669-
None => {
668+
Ok(t) => t,
669+
Err(_invalid_ts) => {
670670
tracing::debug!(
671671
"Couldn't refresh cache entry from {} to {}, retrying...",
672672
result.original_ts,

crates/database/benches/subscriptions.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,9 @@ fn bench_query(c: &mut Criterion) {
267267
b.to_async(&rt).iter(|| async {
268268
for (doc_id, doc_index_keys) in documents {
269269
let mut to_notify = BTreeSet::new();
270-
subscription_manager.overlapping(
271-
doc_id,
272-
doc_index_keys,
273-
&mut to_notify,
274-
);
270+
subscription_manager.overlapping(doc_id, doc_index_keys, &mut |id| {
271+
to_notify.insert(id);
272+
});
275273
}
276274
})
277275
},

crates/database/src/database.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1883,14 +1883,14 @@ impl<RT: Runtime> Database<RT> {
18831883
.collect())
18841884
}
18851885

1886-
/// Attempt to pull a token forward to a given timestamp, returning `None`
1886+
/// Attempt to pull a token forward to a given timestamp, returning `Err`
18871887
/// if there have been overlapping writes between the token's original
18881888
/// timestamp and `ts`.
18891889
pub async fn refresh_token(
18901890
&self,
18911891
token: Token,
18921892
ts: Timestamp,
1893-
) -> anyhow::Result<Option<Token>> {
1893+
) -> anyhow::Result<Result<Token, Option<Timestamp>>> {
18941894
let _timer = metrics::refresh_token_timer();
18951895
self.log.refresh_token(token, ts)
18961896
}
@@ -2195,6 +2195,11 @@ fn occ_write_source_string(
21952195
pub struct ConflictingReadWithWriteSource {
21962196
pub(crate) read: ConflictingRead,
21972197
pub(crate) write_source: WriteSource,
2198+
/// The timestamp of the conflicting write.
2199+
///
2200+
/// N.B.: this may be a non-repeatable timestamp, if this conflict occurred
2201+
/// against a pending write!
2202+
pub(crate) write_ts: Timestamp,
21982203
}
21992204

22002205
impl ConflictingReadWithWriteSource {

crates/database/src/reads.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ impl ReadSet {
283283
persistence_version: PersistenceVersion,
284284
) -> Option<ConflictingReadWithWriteSource> {
285285
let mut buffer = IndexKeyBuffer::new();
286-
for (_ts, updates, write_source) in updates {
286+
for (update_ts, updates, write_source) in updates {
287287
for (_, update) in updates {
288288
if let Some(ref document) = update.new_document {
289289
if let Some(conflicting_read) =
@@ -292,6 +292,7 @@ impl ReadSet {
292292
return Some(ConflictingReadWithWriteSource {
293293
read: conflicting_read,
294294
write_source: write_source.clone(),
295+
write_ts: *update_ts,
295296
});
296297
}
297298
}
@@ -302,6 +303,7 @@ impl ReadSet {
302303
return Some(ConflictingReadWithWriteSource {
303304
read: conflicting_read,
304305
write_source: write_source.clone(),
306+
write_ts: *update_ts,
305307
});
306308
}
307309
}
@@ -323,13 +325,14 @@ impl ReadSet {
323325
),
324326
>,
325327
) -> Option<ConflictingReadWithWriteSource> {
326-
for (_ts, updates, write_source) in updates {
328+
for (update_ts, updates, write_source) in updates {
327329
for (id, update) in updates {
328330
if let Some(ref document) = update.new_document_keys {
329331
if let Some(conflicting_read) = self.overlaps_index_keys(*id, document) {
330332
return Some(ConflictingReadWithWriteSource {
331333
read: conflicting_read,
332334
write_source: write_source.clone(),
335+
write_ts: *update_ts,
333336
});
334337
}
335338
}
@@ -338,6 +341,7 @@ impl ReadSet {
338341
return Some(ConflictingReadWithWriteSource {
339342
read: conflicting_read,
340343
write_source: write_source.clone(),
344+
write_ts: *update_ts,
341345
});
342346
}
343347
}

0 commit comments

Comments
 (0)