Skip to content

Commit b27bfa9

Browse files
ldanilekConvex, Inc.
authored andcommitted
[index worker] backfill multiple tables in parallel (#24327)
for cluster migration, instead of backfilling tables one at a time, we can do them in parallel. to avoid cpu contention that we observed with `try_join`, i'm doing the `rt.spawn` pattern instead (same pattern as db_verifier uses, which @sujayakar said looked right). unfortunately that means cloning a lot more, but these objects look cheap to clone. GitOrigin-RevId: 6dc29f8fa5327a865470dde7289d696c79848c24
1 parent ef5ebd1 commit b27bfa9

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

crates/database/src/index_worker.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use common::{
5858
RateLimiter,
5959
Runtime,
6060
RuntimeInstant,
61+
SpawnHandle,
6162
},
6263
types::{
6364
DatabaseIndexUpdate,
@@ -130,16 +131,18 @@ pub struct IndexWorker<RT: Runtime> {
130131
persistence_version: PersistenceVersion,
131132
}
132133

134+
#[derive(Clone)]
133135
pub struct IndexWriter<RT: Runtime> {
134136
// Persistence target for writing indexes.
135137
persistence: Arc<dyn Persistence>,
136138
// Reader must have by_id index fully populated.
137139
reader: Arc<dyn PersistenceReader>,
138140
retention_validator: Arc<dyn RetentionValidator>,
139-
rate_limiter: RateLimiter<RT>,
141+
rate_limiter: Arc<RateLimiter<RT>>,
140142
runtime: RT,
141143
}
142144

145+
#[derive(Clone)]
143146
pub enum IndexSelector {
144147
All(IndexRegistry),
145148
Index { name: TabletIndexName, id: IndexId },
@@ -532,7 +535,10 @@ impl<RT: Runtime> IndexWriter<RT> {
532535
persistence,
533536
reader,
534537
retention_validator,
535-
rate_limiter: new_rate_limiter(runtime.clone(), Quota::per_second(*ENTRIES_PER_SECOND)),
538+
rate_limiter: Arc::new(new_rate_limiter(
539+
runtime.clone(),
540+
Quota::per_second(*ENTRIES_PER_SECOND),
541+
)),
536542
runtime,
537543
}
538544
}
@@ -567,15 +573,34 @@ impl<RT: Runtime> IndexWriter<RT> {
567573
) -> anyhow::Result<()> {
568574
// Backfill in two steps: first create index entries for all latest documents,
569575
// then create index entries for all documents in the retention range.
570-
for table_id in index_selector.iterate_tables() {
571-
self.backfill_exact_snapshot_of_table(
572-
snapshot_ts,
573-
&index_selector,
574-
index_metadata,
575-
table_id,
576-
)
577-
.await?;
576+
577+
let (tx, rx) = mpsc::unbounded();
578+
let handles = index_selector.iterate_tables().map(|table_id| {
579+
let index_metadata = index_metadata.clone();
580+
let index_selector = index_selector.clone();
581+
let self_ = (*self).clone();
582+
let tx = tx.clone();
583+
self.runtime
584+
.spawn("index_backfill_table_snapshot", async move {
585+
tx.unbounded_send(
586+
self_
587+
.backfill_exact_snapshot_of_table(
588+
snapshot_ts,
589+
&index_selector,
590+
&index_metadata,
591+
table_id,
592+
)
593+
.await,
594+
)
595+
.unwrap();
596+
})
597+
});
598+
for handle in handles {
599+
handle.into_join_future().await?;
578600
}
601+
tx.close_channel();
602+
let _: Vec<_> = rx.try_collect().await?;
603+
579604
let mut min_backfilled_ts = snapshot_ts;
580605

581606
// Retry until min_snapshot_ts passes min_backfilled_ts, at which point we

0 commit comments

Comments
 (0)