@@ -4,6 +4,7 @@ use std::{
4
4
BTreeMap ,
5
5
BTreeSet ,
6
6
} ,
7
+ ops:: Deref ,
7
8
sync:: Arc ,
8
9
} ;
9
10
@@ -16,6 +17,7 @@ use common::{
16
17
IndexKeyBytes ,
17
18
} ,
18
19
interval:: Interval ,
20
+ knobs:: DOCUMENTS_IN_MEMORY ,
19
21
pause:: PauseClient ,
20
22
persistence:: {
21
23
new_static_repeatable_recent,
@@ -50,6 +52,7 @@ use futures::{
50
52
use futures_async_stream:: try_stream;
51
53
use value:: {
52
54
InternalDocumentId ,
55
+ InternalId ,
53
56
TableId ,
54
57
} ;
55
58
@@ -175,7 +178,7 @@ impl<RT: Runtime> TableIterator<RT> {
175
178
// (b) have key > cursor.
176
179
// We insert skipped documents into future pages of the index walk when we get
177
180
// to them.
178
- let mut skipped_keys = BTreeMap :: new ( ) ;
181
+ let mut skipped_keys = IterationDocuments :: default ( ) ;
179
182
180
183
loop {
181
184
self . pause_client . wait ( "before_index_page" ) . await ;
@@ -196,25 +199,23 @@ impl<RT: Runtime> TableIterator<RT> {
196
199
let page: BTreeMap < _ , _ > = page
197
200
. into_iter ( )
198
201
. filter ( |( _, ts, _) | * ts <= * self . snapshot_ts )
199
- . map ( |( index_key, ts, doc) | ( index_key, ( ts, doc) ) )
202
+ . map ( |( index_key, ts, doc) | ( index_key, ( ts, IterationDocument :: Full ( doc) ) ) )
200
203
. collect ( ) ;
201
204
202
205
// 2. Find any keys for documents that were skipped by this
203
206
// page or will be skipped by future pages.
204
207
// These documents are returned with index keys and revisions as
205
208
// they existed at snapshot_ts.
206
- skipped_keys. extend (
207
- self . fetch_skipped_keys (
208
- table_id,
209
- & indexed_fields,
210
- page_start. as_ref ( ) ,
211
- * end_ts,
212
- new_end_ts,
213
- rate_limiter,
214
- )
215
- . await ?
216
- . into_iter ( ) ,
217
- ) ;
209
+ self . fetch_skipped_keys (
210
+ table_id,
211
+ & indexed_fields,
212
+ page_start. as_ref ( ) ,
213
+ * end_ts,
214
+ new_end_ts,
215
+ rate_limiter,
216
+ & mut skipped_keys,
217
+ )
218
+ . await ?;
218
219
if let Some ( ( first_skipped_key, _) ) = skipped_keys. iter ( ) . next ( ) {
219
220
// Check all skipped ids are after the old cursor,
220
221
// which ensures the yielded output is in index key order.
@@ -225,59 +226,44 @@ impl<RT: Runtime> TableIterator<RT> {
225
226
// the current page.
226
227
let page_skipped_keys = {
227
228
let mut page_skipped_keys = BTreeMap :: new ( ) ;
228
- while let Some ( first_skipped_key) = skipped_keys. first_entry ( )
229
- && cursor_has_walked ( Some ( page_end) , first_skipped_key. key ( ) )
229
+ while let Some ( first_skipped_key) = skipped_keys. keys ( ) . next ( )
230
+ && cursor_has_walked ( Some ( page_end) , first_skipped_key)
230
231
{
231
- let ( key, value) = first_skipped_key. remove_entry ( ) ;
232
+ let ( key, value) = skipped_keys
233
+ . remove ( & first_skipped_key. clone ( ) )
234
+ . context ( "skipped_keys should be nonempty" ) ?;
232
235
page_skipped_keys. insert ( key, value) ;
233
236
}
234
237
page_skipped_keys
235
238
} ;
236
- // Note we already fetched these documents earlier when calculating
237
- // skipped_keys, but we would rather not hold them in memory. Since
238
- // skipped documents are relatively rare, an extra fetch
239
- // occasionally is worth it for the memory savings.
240
- let page_skipped_docs: BTreeMap < _ , _ > = self
241
- . load_revisions_at_snapshot_ts ( stream:: iter (
242
- page_skipped_keys. into_values ( ) . map ( Ok ) ,
243
- ) )
244
- . map_ok ( |( doc, ts) | {
245
- (
246
- doc. index_key ( & indexed_fields, self . persistence . version ( ) )
247
- . into_bytes ( ) ,
248
- ( ts, doc) ,
249
- )
250
- } )
251
- . try_collect ( )
252
- . await ?;
253
239
// Merge index walk and skipped keys into BTreeMap, which sorts by index key.
254
- let merged_page: BTreeMap < _ , _ > = page. into_iter ( ) . chain ( page_skipped_docs) . collect ( ) ;
240
+ let merged_page =
241
+ IterationDocuments :: new ( page. into_iter ( ) . chain ( page_skipped_keys) . collect ( ) ) ;
255
242
256
243
// Sanity check output.
257
244
let all_ids: BTreeSet < _ > = merged_page
258
- . iter ( )
259
- . map ( |( _, ( _ , doc) ) | doc. id ( ) . internal_id ( ) )
245
+ . values ( )
246
+ . map ( |( _, doc) | doc. internal_id ( ) )
260
247
. collect ( ) ;
261
248
anyhow:: ensure!(
262
249
all_ids. len( ) == merged_page. len( ) ,
263
250
"duplicate id in table iterator {merged_page:?}"
264
251
) ;
265
252
anyhow:: ensure!(
266
- merged_page
267
- . iter( )
268
- . all( |( _, ( ts, _) ) | * ts <= * self . snapshot_ts) ,
253
+ merged_page. values( ) . all( |( ts, _) | * ts <= * self . snapshot_ts) ,
269
254
"document after snapshot in table iterator {merged_page:?}"
270
255
) ;
271
256
anyhow:: ensure!(
272
- merged_page. iter ( ) . all( |( key, _ ) | {
257
+ merged_page. keys ( ) . all( |key| {
273
258
!cursor_has_walked( page_start. as_ref( ) , key)
274
259
&& cursor_has_walked( Some ( page_end) , key)
275
260
} ) ,
276
261
"document outside page in table iterator {merged_page:?}"
277
262
) ;
278
263
279
- for ( key, ( revision_ts, revision) ) in merged_page {
280
- yield ( key, revision_ts, revision) ;
264
+ let mut merged_page_docs = self . reload_revisions_at_snapshot_ts ( merged_page) ;
265
+ while let Some ( ( key, ts, doc) ) = merged_page_docs. try_next ( ) . await ? {
266
+ yield ( key, ts, doc) ;
281
267
}
282
268
if matches ! ( page_end, CursorPosition :: End ) {
283
269
// If we are done, all skipped_keys would be put in this final page.
@@ -301,22 +287,22 @@ impl<RT: Runtime> TableIterator<RT> {
301
287
start_ts : Timestamp ,
302
288
end_ts : RepeatableTimestamp ,
303
289
rate_limiter : & RateLimiter < RT > ,
304
- ) -> anyhow:: Result < BTreeMap < IndexKeyBytes , InternalDocumentId > > {
290
+ output : & mut IterationDocuments ,
291
+ ) -> anyhow:: Result < ( ) > {
305
292
let reader = self . persistence . clone ( ) ;
306
293
let persistence_version = reader. version ( ) ;
307
294
let skipped_revs = self . walk_document_log ( table_id, start_ts, end_ts, rate_limiter) ;
308
295
let revisions_at_snapshot = self . load_revisions_at_snapshot_ts ( skipped_revs) ;
309
296
pin_mut ! ( revisions_at_snapshot) ;
310
- let mut skipped_keys = BTreeMap :: new ( ) ;
311
- while let Some ( ( doc, _) ) = revisions_at_snapshot. try_next ( ) . await ? {
297
+ while let Some ( ( doc, ts) ) = revisions_at_snapshot. try_next ( ) . await ? {
312
298
let index_key = doc
313
299
. index_key ( indexed_fields, persistence_version)
314
300
. into_bytes ( ) ;
315
301
if !cursor_has_walked ( lower_bound, & index_key) {
316
- skipped_keys . insert ( index_key, doc. id_with_table_id ( ) ) ;
302
+ output . insert ( index_key, ts , doc) ;
317
303
}
318
304
}
319
- Ok ( skipped_keys )
305
+ Ok ( ( ) )
320
306
}
321
307
322
308
#[ try_stream( ok = InternalDocumentId , error = anyhow:: Error ) ]
@@ -403,8 +389,9 @@ impl<RT: Runtime> TableIterator<RT> {
403
389
Ok ( ( documents_in_page, ts) )
404
390
}
405
391
406
- // Load the revisions of documents visible at `self.snapshot_ts`, skipping
407
- // documents that don't exist then.
392
+ /// Load the revisions of documents visible at `self.snapshot_ts`.
393
+ /// Documents are yielded in the same order as input, skipping duplicates
394
+ /// and documents that didn't exist at the snapshot.
408
395
#[ try_stream( ok = ( ResolvedDocument , Timestamp ) , error = anyhow:: Error ) ]
409
396
async fn load_revisions_at_snapshot_ts < ' a > (
410
397
& ' a self ,
@@ -425,16 +412,138 @@ impl<RT: Runtime> TableIterator<RT> {
425
412
pin_mut ! ( id_chunks) ;
426
413
427
414
while let Some ( chunk) = id_chunks. try_next ( ) . await ? {
428
- let ids_to_load = chunk. into_iter ( ) . map ( |id| ( id, ts_succ) ) . collect ( ) ;
429
- let old_revisions = repeatable_persistence
415
+ let ids_to_load = chunk. iter ( ) . map ( |id| ( * id, ts_succ) ) . collect ( ) ;
416
+ let mut old_revisions = repeatable_persistence
430
417
. previous_revisions ( ids_to_load)
431
418
. await ?;
432
- for ( _, ( revision_ts, revision) ) in old_revisions. into_iter ( ) {
433
- if let Some ( revision) = revision {
419
+ // Yield in the same order as the input, skipping duplicates and
420
+ // missing documents.
421
+ for id in chunk {
422
+ if let Some ( ( revision_ts, Some ( revision) ) ) = old_revisions. remove ( & ( id, ts_succ) ) {
434
423
yield ( revision, revision_ts) ;
435
- }
424
+ } ;
425
+ }
426
+ }
427
+ }
428
+
429
+ #[ try_stream( boxed, ok = ( IndexKeyBytes , Timestamp , ResolvedDocument ) , error = anyhow:: Error ) ]
430
+ async fn load_index_entries_at_snapshot_ts (
431
+ & self ,
432
+ entries : Vec < ( InternalDocumentId , IndexKeyBytes ) > ,
433
+ ) {
434
+ let ids: Vec < _ > = entries. iter ( ) . map ( |( id, _) | * id) . collect ( ) ;
435
+ let mut key_by_id: BTreeMap < _ , _ > = entries. into_iter ( ) . collect ( ) ;
436
+ let revisions = self . load_revisions_at_snapshot_ts ( stream:: iter ( ids. into_iter ( ) . map ( Ok ) ) ) ;
437
+ pin_mut ! ( revisions) ;
438
+ while let Some ( ( doc, ts) ) = revisions. try_next ( ) . await ? {
439
+ let key = key_by_id
440
+ . remove ( & doc. id_with_table_id ( ) )
441
+ . context ( "key_by_id missing" ) ?;
442
+ yield ( key, ts, doc) ;
443
+ }
444
+ }
445
+
446
+ /// Like `load_revisions_at_snapshot_ts` but doesn't need to fetch
447
+ /// if the IterationDocument has the Full document.
448
+ #[ try_stream( boxed, ok = ( IndexKeyBytes , Timestamp , ResolvedDocument ) , error = anyhow:: Error ) ]
449
+ async fn reload_revisions_at_snapshot_ts ( & self , documents : IterationDocuments ) {
450
+ let mut current_batch = Vec :: new ( ) ;
451
+ for ( key, ( ts, doc) ) in documents. into_iter ( ) {
452
+ match doc {
453
+ IterationDocument :: Full ( doc) => {
454
+ let mut flush = self . load_index_entries_at_snapshot_ts ( current_batch) ;
455
+ while let Some ( ( key, ts, doc) ) = flush. try_next ( ) . await ? {
456
+ yield ( key, ts, doc) ;
457
+ }
458
+ current_batch = Vec :: new ( ) ;
459
+ yield ( key, ts, doc) ;
460
+ } ,
461
+ IterationDocument :: Id ( id) => {
462
+ current_batch. push ( ( id, key) ) ;
463
+ } ,
436
464
}
437
465
}
466
+ let mut flush = self . load_index_entries_at_snapshot_ts ( current_batch) ;
467
+ while let Some ( ( key, ts, doc) ) = flush. try_next ( ) . await ? {
468
+ yield ( key, ts, doc) ;
469
+ }
470
+ }
471
+ }
472
+
473
+ #[ derive( Debug ) ]
474
+ enum IterationDocument {
475
+ Full ( ResolvedDocument ) ,
476
+ Id ( InternalDocumentId ) ,
477
+ }
478
+
479
+ impl IterationDocument {
480
+ fn internal_id ( & self ) -> InternalId {
481
+ match self {
482
+ Self :: Full ( doc) => doc. internal_id ( ) ,
483
+ Self :: Id ( id) => id. internal_id ( ) ,
484
+ }
485
+ }
486
+ }
487
+
488
+ /// To avoid storing too many documents in memory, we evict the document values,
489
+ /// leaving only the IDs.
490
+ #[ derive( Default , Debug ) ]
491
+ struct IterationDocuments {
492
+ count_full : usize ,
493
+ docs : BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > ,
494
+ }
495
+
496
+ impl IterationDocuments {
497
+ fn new ( docs : BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > ) -> Self {
498
+ Self {
499
+ count_full : docs
500
+ . values ( )
501
+ . filter ( |( _, doc) | matches ! ( doc, IterationDocument :: Full ( _) ) )
502
+ . count ( ) ,
503
+ docs,
504
+ }
505
+ }
506
+
507
+ fn insert ( & mut self , index_key : IndexKeyBytes , ts : Timestamp , doc : ResolvedDocument ) {
508
+ if self . count_full < * DOCUMENTS_IN_MEMORY {
509
+ self . docs
510
+ . insert ( index_key, ( ts, IterationDocument :: Full ( doc) ) ) ;
511
+ self . count_full += 1 ;
512
+ } else {
513
+ self . docs . insert (
514
+ index_key,
515
+ ( ts, IterationDocument :: Id ( doc. id_with_table_id ( ) ) ) ,
516
+ ) ;
517
+ }
518
+ }
519
+
520
+ fn remove (
521
+ & mut self ,
522
+ index_key : & IndexKeyBytes ,
523
+ ) -> Option < ( IndexKeyBytes , ( Timestamp , IterationDocument ) ) > {
524
+ let removed = self . docs . remove_entry ( index_key) ;
525
+ if let Some ( ( _, ( _, IterationDocument :: Full ( _) ) ) ) = & removed {
526
+ self . count_full -= 1 ;
527
+ }
528
+ removed
529
+ }
530
+ }
531
+
532
+ impl IntoIterator for IterationDocuments {
533
+ type IntoIter =
534
+ <BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > as IntoIterator >:: IntoIter ;
535
+ type Item = ( IndexKeyBytes , ( Timestamp , IterationDocument ) ) ;
536
+
537
+ fn into_iter ( self ) -> Self :: IntoIter {
538
+ self . docs . into_iter ( )
539
+ }
540
+ }
541
+
542
+ impl Deref for IterationDocuments {
543
+ type Target = BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > ;
544
+
545
+ fn deref ( & self ) -> & Self :: Target {
546
+ & self . docs
438
547
}
439
548
}
440
549
0 commit comments