Skip to content

refactor: new retention policy ByNumOfSnapshotsToKeep #17840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
Expand All @@ -38,10 +36,9 @@ impl VacuumHandler for RealVacuumHandler {
&self,
table: &dyn Table,
ctx: Arc<dyn TableContext>,
retention_time: DateTime<Utc>,
dry_run: bool,
) -> Result<Option<Vec<String>>> {
do_vacuum(table, ctx, retention_time, dry_run).await
do_vacuum(table, ctx, dry_run).await
}

async fn do_vacuum2(
Expand Down
32 changes: 24 additions & 8 deletions src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::time::Instant;

use chrono::DateTime;
use chrono::Utc;
use databend_common_catalog::table::NavigationPoint;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
Expand All @@ -28,6 +27,7 @@ use databend_common_storages_fuse::io::SnapshotLiteExtended;
use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::RetentionPolicy;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::SegmentInfo;

Expand Down Expand Up @@ -358,23 +358,39 @@ pub async fn do_dry_run_orphan_files(
pub async fn do_vacuum(
table: &dyn Table,
ctx: Arc<dyn TableContext>,
retention_time: DateTime<Utc>,
dry_run: bool,
) -> Result<Option<Vec<String>>> {
let fuse_table = FuseTable::try_from_table(table)?;
let start = Instant::now();
// First, do purge
let instant = Some(NavigationPoint::TimePoint(retention_time));
let dry_run_limit = if dry_run { Some(DRY_RUN_LIMIT) } else { None };
// Let the table navigate to the point according to the table's retention policy.
let navigation_point = None;
let keep_last_snapshot = true;
let purge_files_opt = fuse_table
.purge(ctx.clone(), instant, dry_run_limit, true, dry_run)
.purge(
ctx.clone(),
navigation_point,
dry_run_limit,
keep_last_snapshot,
dry_run,
)
.await?;
let status = format!("do_vacuum: purged table, cost:{:?}", start.elapsed());
ctx.set_status_info(&status);
let retention = fuse_table.get_data_retention_period(ctx.as_ref())?;
// use min(now - get_retention_period(), retention_time) as gc orphan files retention time
// to protect files that generated by txn which has not been committed being gc.
let retention_time = std::cmp::min(chrono::Utc::now() - retention, retention_time);
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;

let retention_period = match retention_policy {
RetentionPolicy::ByTimePeriod(retention_period) => retention_period,
RetentionPolicy::ByNumOfSnapshotsToKeep(_) => {
// Technically, we should derive a reasonable retention period from the ByNumOfSnapshotsToKeep policy,
// but it's not worth the effort since VACUUM2 will replace legacy purge and vacuum soon.
// Use the table retention period for now.
fuse_table.get_data_retention_period(ctx.as_ref())?
}
};

let retention_time = chrono::Utc::now() - retention_period;
if let Some(mut purge_files) = purge_files_opt {
let dry_run_limit = dry_run_limit.unwrap();
if purge_files.len() < dry_run_limit {
Expand Down
165 changes: 116 additions & 49 deletions src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::collections::HashSet;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Days;
use chrono::Duration;
use chrono::TimeDelta;
use chrono::Utc;
use databend_common_base::base::uuid::Uuid;
use databend_common_catalog::table::Table;
Expand All @@ -32,6 +32,7 @@ use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::SegmentsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::RetentionPolicy;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheManager;
use databend_storages_common_cache::LoadParams;
Expand Down Expand Up @@ -89,57 +90,93 @@ pub async fn do_vacuum2(
let fuse_table = FuseTable::try_from_table(table)?;
let start = std::time::Instant::now();

let retention_period_in_days = if fuse_table.is_transient() {
0
} else {
ctx.get_settings().get_data_retention_time_in_days()?
};
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;

let is_vacuum_all = retention_period_in_days == 0;
// By default, do not vacuum all the historical snapshots.
let mut is_vacuum_all = false;
let mut respect_flash_back_with_lvt = None;

let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period_in_days).await? else {
return Ok(vec![]);
};
let snapshots_before_lvt = match retention_policy {
RetentionPolicy::ByTimePeriod(delta_duration) => {
info!("using by ByTimePeriod policy {:?}", delta_duration);
let retention_period = if fuse_table.is_transient() {
// For transient table, keep no history data
TimeDelta::zero()
} else {
delta_duration
};

ctx.set_status_info(&format!(
"set lvt for table {} takes {:?}, lvt: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
lvt
));
// A zero retention period indicates that we should vacuum all the historical snapshots
is_vacuum_all = retention_period.is_zero();

let start = std::time::Instant::now();
let snapshots_before_lvt = if is_vacuum_all {
list_until_prefix(
fuse_table,
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
fuse_table.snapshot_loc().unwrap().as_str(),
true,
None,
)
.await?
} else {
list_until_timestamp(
fuse_table,
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
lvt,
true,
None,
)
.await?
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else {
return Ok(vec![]);
};

if respect_flash_back {
respect_flash_back_with_lvt = Some(lvt);
}

ctx.set_status_info(&format!(
"set lvt for table {} takes {:?}, lvt: {:?}",
fuse_table.get_table_info().desc,
start.elapsed(),
lvt
));

let snapshots_before_lvt =
collect_gc_candidates_by_retention_period(fuse_table, lvt, is_vacuum_all).await?;
snapshots_before_lvt
}
RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => {
info!(
"using by ByNumOfSnapshotsToKeep policy {:?}",
num_snapshots_to_keep
);
// List the snapshot order by timestamp asc, till the current snapshot(inclusively).
let need_one_more = true;
let mut snapshots = list_until_prefix(
fuse_table,
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
fuse_table.snapshot_loc().unwrap().as_str(),
need_one_more,
None,
)
.await?;

let len = snapshots.len();
if len <= num_snapshots_to_keep {
// Only the current snapshot is there, done
return Ok(vec![]);
}
if num_snapshots_to_keep == 1 {
// Expecting only one snapshot left, which means that we can use the current snapshot
// as gc root, this flag will be propagated to the select_gc_root func later.
is_vacuum_all = true;
}

// When selecting the GC root later, the last snapshot in `snapshots` (after truncation)
// is the candidate, but its commit status is uncertain, so its previous snapshot is used
// as the GC root instead (except in the is_vacuum_all case).

// Therefore, during snapshot truncation, we keep 2 extra snapshots;
// see `select_gc_root` for details.
let num_candidates = len - num_snapshots_to_keep + 2;
snapshots.truncate(num_candidates);
snapshots
}
};

let elapsed = start.elapsed();
ctx.set_status_info(&format!(
"list snapshots before lvt for table {} takes {:?}, snapshots_dir: {:?}, lvt: {:?}, snapshots: {:?}",
"list snapshots for table {} takes {:?}, snapshots_dir: {:?}, snapshots: {:?}",
fuse_table.get_table_info().desc,
elapsed,
fuse_table.meta_location_generator().snapshot_location_prefix(),
lvt,
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
slice_summary(&snapshots_before_lvt)
));

Expand All @@ -148,9 +185,8 @@ pub async fn do_vacuum2(
fuse_table,
&snapshots_before_lvt,
is_vacuum_all,
respect_flash_back,
respect_flash_back_with_lvt,
ctx.clone().get_abort_checker(),
lvt,
)
.await?
else {
Expand Down Expand Up @@ -341,13 +377,45 @@ pub async fn do_vacuum2(
Ok(files_to_gc)
}

async fn collect_gc_candidates_by_retention_period(
fuse_table: &FuseTable,
lvt: DateTime<Utc>,
is_vacuum_all: bool,
) -> Result<Vec<Entry>> {
let snapshots_before_lvt = if is_vacuum_all {
list_until_prefix(
fuse_table,
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
fuse_table.snapshot_loc().unwrap().as_str(),
true,
None,
)
.await?
} else {
list_until_timestamp(
fuse_table,
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
lvt,
true,
None,
)
.await?
};

Ok(snapshots_before_lvt)
}

/// Try set lvt as min(latest_snapshot.timestamp, now - retention_time).
///
/// Return `None` means we stop vacuumming, but don't want to report error to user.
async fn set_lvt(
fuse_table: &FuseTable,
ctx: &dyn TableContext,
retention: u64,
retention_period: TimeDelta,
) -> Result<Option<DateTime<Utc>>> {
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
info!(
Expand All @@ -366,7 +434,7 @@ async fn set_lvt(
let cat = ctx.get_default_catalog()?;
// safe to unwrap, as we have checked the version is v5
let latest_ts = latest_snapshot.timestamp.unwrap();
let lvt_point_candidate = std::cmp::min(Utc::now() - Days::new(retention), latest_ts);
let lvt_point_candidate = std::cmp::min(Utc::now() - retention_period, latest_ts);

let lvt_point = cat
.set_table_lvt(
Expand Down Expand Up @@ -538,14 +606,13 @@ async fn select_gc_root(
fuse_table: &FuseTable,
snapshots_before_lvt: &[Entry],
is_vacuum_all: bool,
respect_flash_back: bool,
respect_flash_back: Option<DateTime<Utc>>,
abort_checker: AbortChecker,
lvt: DateTime<Utc>,
) -> Result<Option<(Arc<TableSnapshot>, Vec<String>, DateTime<Utc>)>> {
let gc_root_path = if is_vacuum_all {
// safe to unwrap, or we should have stopped vacuuming in set_lvt()
fuse_table.snapshot_loc().unwrap()
} else if respect_flash_back {
} else if let Some(lvt) = respect_flash_back {
let latest_location = fuse_table.snapshot_loc().unwrap();
let gc_root = fuse_table
.find(latest_location, abort_checker, |snapshot| {
Expand Down
1 change: 0 additions & 1 deletion src/query/ee_features/vacuum_handler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ edition = { workspace = true }
[dependencies]
async-backtrace = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
databend-common-base = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-exception = { workspace = true }
Expand Down
8 changes: 1 addition & 7 deletions src/query/ee_features/vacuum_handler/src/vacuum_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
Expand All @@ -35,7 +33,6 @@ pub trait VacuumHandler: Sync + Send {
&self,
table: &dyn Table,
ctx: Arc<dyn TableContext>,
retention_time: DateTime<Utc>,
dry_run: bool,
) -> Result<Option<Vec<String>>>;

Expand Down Expand Up @@ -83,12 +80,9 @@ impl VacuumHandlerWrapper {
&self,
table: &dyn Table,
ctx: Arc<dyn TableContext>,
retention_time: DateTime<Utc>,
dry_run: bool,
) -> Result<Option<Vec<String>>> {
self.handler
.do_vacuum(table, ctx, retention_time, dry_run)
.await
self.handler.do_vacuum(table, ctx, dry_run).await
}

#[async_backtrace::framed]
Expand Down
Loading
Loading