From 99c06bda9b4d8c6b4c778bd54c4f3dfd84b1db65 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 23 Apr 2025 10:20:25 +0800 Subject: [PATCH 1/3] new retention policy --- Cargo.lock | 1 - .../deploy/config/databend-query-node-1.toml | 2 +- .../src/storages/fuse/operations/handler.rs | 5 +- .../storages/fuse/operations/vacuum_table.rs | 32 +++- .../fuse/operations/vacuum_table_v2.rs | 170 +++++++++++++----- .../ee_features/vacuum_handler/Cargo.toml | 1 - .../vacuum_handler/src/vacuum_handler.rs | 8 +- .../common/table_option_validation.rs | 14 ++ .../interpreters/interpreter_table_drop.rs | 2 +- .../interpreters/interpreter_table_vacuum.rs | 8 +- src/query/sql/src/planner/plans/ddl/table.rs | 2 +- src/query/storages/fuse/src/constants.rs | 2 + src/query/storages/fuse/src/fuse_table.rs | 23 ++- src/query/storages/fuse/src/lib.rs | 1 + .../common/processors/sink_commit.rs | 98 +++++++--- .../storages/fuse/src/operations/navigate.rs | 96 +++++++--- .../ee/03_ee_vacuum/03_0004_auto_vacuum.test | 78 +++++++- 17 files changed, 409 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1273c81935324..35fac14572c00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,7 +4832,6 @@ version = "0.1.0" dependencies = [ "async-backtrace", "async-trait", - "chrono", "databend-common-base", "databend-common-catalog", "databend-common-exception", diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index 71b6a58f02070..9ef189e500600 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -82,7 +82,7 @@ join_spilling_memory_ratio = 60 [log] [log.file] -level = "DEBUG" +level = "INFO" format = "text" dir = "./.databend/logs_1" limit = 12 # 12 files, 1 file per hour diff --git a/src/query/ee/src/storages/fuse/operations/handler.rs b/src/query/ee/src/storages/fuse/operations/handler.rs index 479dd25b3cc08..48e53d0e2122a 100644 --- a/src/query/ee/src/storages/fuse/operations/handler.rs +++ b/src/query/ee/src/storages/fuse/operations/handler.rs @@ -14,8 +14,6 @@ use std::sync::Arc; -use chrono::DateTime; -use chrono::Utc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::AbortChecker; @@ -38,10 +36,9 @@ impl VacuumHandler for RealVacuumHandler { &self, table: &dyn Table, ctx: Arc, - retention_time: DateTime, dry_run: bool, ) -> Result>> { - do_vacuum(table, ctx, retention_time, dry_run).await + do_vacuum(table, ctx, dry_run).await } async fn do_vacuum2( diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table.rs index f16f766d59054..141f987a8b3e3 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table.rs @@ -18,7 +18,6 @@ use std::time::Instant; use chrono::DateTime; use chrono::Utc; -use databend_common_catalog::table::NavigationPoint; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -28,6 +27,7 @@ use databend_common_storages_fuse::io::SnapshotLiteExtended; use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::RetentionPolicy; use databend_storages_common_cache::LoadParams; use databend_storages_common_table_meta::meta::SegmentInfo; @@ -358,23 +358,39 @@ pub async fn do_dry_run_orphan_files( pub async fn do_vacuum( table: &dyn Table, ctx: Arc, - retention_time: DateTime, dry_run: bool, ) -> Result>> { let fuse_table = FuseTable::try_from_table(table)?; let start = Instant::now(); // First, do purge - let instant = Some(NavigationPoint::TimePoint(retention_time)); let dry_run_limit = if dry_run { Some(DRY_RUN_LIMIT) } else { None }; + // Let the table navigate to the point according to the table's retention policy. + let navigation_point = None; + let keep_last_snapshot = true; let purge_files_opt = fuse_table - .purge(ctx.clone(), instant, dry_run_limit, true, dry_run) + .purge( + ctx.clone(), + navigation_point, + dry_run_limit, + keep_last_snapshot, + dry_run, + ) .await?; let status = format!("do_vacuum: purged table, cost:{:?}", start.elapsed()); ctx.set_status_info(&status); - let retention = fuse_table.get_data_retention_period(ctx.as_ref())?; - // use min(now - get_retention_period(), retention_time) as gc orphan files retention time - // to protect files that generated by txn which has not been committed being gc. - let retention_time = std::cmp::min(chrono::Utc::now() - retention, retention_time); + let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?; + + let retention_period = match retention_policy { + RetentionPolicy::ByTimePeriod(retention_period) => retention_period, + RetentionPolicy::ByNumOfSnapshotsToKeep(_) => { + // Technically, we should derive a reasonable retention period from the ByNumOfSnapshotsToKeep policy, + // but it's not worth the effort since VACUUM2 will replace legacy purge and vacuum soon. + // Use the table retention period for now. + fuse_table.get_data_retention_period(ctx.as_ref())? + } + }; + + let retention_time = chrono::Utc::now() - retention_period; if let Some(mut purge_files) = purge_files_opt { let dry_run_limit = dry_run_limit.unwrap(); if purge_files.len() < dry_run_limit { diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs index 2eb23de24027b..7fc7908fec832 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs @@ -16,8 +16,8 @@ use std::collections::HashSet; use std::sync::Arc; use chrono::DateTime; -use chrono::Days; use chrono::Duration; +use chrono::TimeDelta; use chrono::Utc; use databend_common_base::base::uuid::Uuid; use databend_common_catalog::table::Table; @@ -32,6 +32,7 @@ use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::SegmentsIO; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::RetentionPolicy; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::LoadParams; @@ -89,57 +90,91 @@ pub async fn do_vacuum2( let fuse_table = FuseTable::try_from_table(table)?; let start = std::time::Instant::now(); - let retention_period_in_days = if fuse_table.is_transient() { - 0 - } else { - ctx.get_settings().get_data_retention_time_in_days()? - }; + let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?; - let is_vacuum_all = retention_period_in_days == 0; + // Indicates whether to use the current table snapshot as gc root, + // true means vacuum all the historical snapshots. + let mut is_vacuum_all = false; + let mut respect_flash_back_with_lvt = None; - let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period_in_days).await? else { - return Ok(vec![]); - }; + let snapshots_before_lvt = match retention_policy { + RetentionPolicy::ByTimePeriod(delta_duration) => { + info!("using by ByTimePeriod policy {:?}", delta_duration); + let retention_period = if fuse_table.is_transient() { + // For transient table, keep no history data + TimeDelta::zero() + } else { + delta_duration + }; - ctx.set_status_info(&format!( - "set lvt for table {} takes {:?}, lvt: {:?}", - fuse_table.get_table_info().desc, - start.elapsed(), - lvt - )); + is_vacuum_all = retention_period.is_zero(); - let start = std::time::Instant::now(); - let snapshots_before_lvt = if is_vacuum_all { - list_until_prefix( - fuse_table, - fuse_table - .meta_location_generator() - .snapshot_location_prefix(), - fuse_table.snapshot_loc().unwrap().as_str(), - true, - None, - ) - .await? - } else { - list_until_timestamp( - fuse_table, - fuse_table - .meta_location_generator() - .snapshot_location_prefix(), - lvt, - true, - None, - ) - .await? + let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else { + return Ok(vec![]); + }; + + if respect_flash_back { + respect_flash_back_with_lvt = Some(lvt); + } + + ctx.set_status_info(&format!( + "set lvt for table {} takes {:?}, lvt: {:?}", + fuse_table.get_table_info().desc, + start.elapsed(), + lvt + )); + + let snapshots_before_lvt = + collect_gc_candidate_by_retention_period(fuse_table, lvt, is_vacuum_all).await?; + snapshots_before_lvt + } + RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => { + info!( + "using by ByNumOfSnapshotsToKeep policy {:?}", + num_snapshots_to_keep + ); + // List the snapshot order by timestamp asc, till the current snapshot(inclusively). + let need_one_more = true; + let mut snapshots = list_until_prefix( + fuse_table, + fuse_table + .meta_location_generator() + .snapshot_location_prefix(), + fuse_table.snapshot_loc().unwrap().as_str(), + need_one_more, + None, + ) + .await?; + + let len = snapshots.len(); + if len <= num_snapshots_to_keep { + // Only the current snapshot is there, done + return Ok(vec![]); + } + if num_snapshots_to_keep == 1 { + // Expecting only one snapshot left, which means that we can use the current snapshot + // as gc root, this flag will be propagated to the select_gc_root func later. + is_vacuum_all = true; + } + + // When selecting the GC root later, the last snapshot in `snapshots` is a candidate, + // but its commit status is uncertain, its previous snapshot is typically used as the GC root, except in the is_vacuum_all case. + // + // Therefore, during snapshot truncation, we keep 2 extra snapshots; see `select_gc_root` for details. + let num_candidates = len - num_snapshots_to_keep + 2; + snapshots.truncate(num_candidates); + snapshots + } }; let elapsed = start.elapsed(); ctx.set_status_info(&format!( - "list snapshots before lvt for table {} takes {:?}, snapshots_dir: {:?}, lvt: {:?}, snapshots: {:?}", + "list snapshots for table {} takes {:?}, snapshots_dir: {:?}, snapshots: {:?}", fuse_table.get_table_info().desc, elapsed, - fuse_table.meta_location_generator().snapshot_location_prefix(), - lvt, + fuse_table + .meta_location_generator() + .snapshot_location_prefix(), slice_summary(&snapshots_before_lvt) )); @@ -148,9 +183,8 @@ pub async fn do_vacuum2( fuse_table, &snapshots_before_lvt, is_vacuum_all, - respect_flash_back, + respect_flash_back_with_lvt, ctx.clone().get_abort_checker(), - lvt, ) .await? else { @@ -341,13 +375,45 @@ pub async fn do_vacuum2( Ok(files_to_gc) } +async fn collect_gc_candidate_by_retention_period( + fuse_table: &FuseTable, + lvt: DateTime, + is_vacuum_all: bool, +) -> Result> { + let snapshots_before_lvt = if is_vacuum_all { + list_until_prefix( + fuse_table, + fuse_table + .meta_location_generator() + .snapshot_location_prefix(), + fuse_table.snapshot_loc().unwrap().as_str(), + true, + None, + ) + .await? + } else { + list_until_timestamp( + fuse_table, + fuse_table + .meta_location_generator() + .snapshot_location_prefix(), + lvt, + true, + None, + ) + .await? + }; + + Ok(snapshots_before_lvt) +} + /// Try set lvt as min(latest_snapshot.timestamp, now - retention_time). /// /// Return `None` means we stop vacuumming, but don't want to report error to user. async fn set_lvt( fuse_table: &FuseTable, ctx: &dyn TableContext, - retention: u64, + retention_period: TimeDelta, ) -> Result>> { let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else { info!( @@ -366,7 +432,7 @@ async fn set_lvt( let cat = ctx.get_default_catalog()?; // safe to unwrap, as we have checked the version is v5 let latest_ts = latest_snapshot.timestamp.unwrap(); - let lvt_point_candidate = std::cmp::min(Utc::now() - Days::new(retention), latest_ts); + let lvt_point_candidate = std::cmp::min(Utc::now() - retention_period, latest_ts); let lvt_point = cat .set_table_lvt( @@ -391,6 +457,7 @@ async fn list_until_prefix( gc_root_meta_ts: Option>, ) -> Result> { info!("list until prefix: {}", until); + eprintln!("list until prefix inside: {}", until); let dal = fuse_table.get_operator_ref(); match dal.info().scheme() { @@ -457,8 +524,10 @@ async fn fs_list_until_prefix( let mut res = Vec::new(); for entry in entries { if entry.path() >= until { + eprintln!("entry path: {} >= until: {}", entry.path(), until); info!("entry path: {} >= until: {}", entry.path(), until); if need_one_more { + eprintln!("kept"); res.push(entry); } break; @@ -538,14 +607,13 @@ async fn select_gc_root( fuse_table: &FuseTable, snapshots_before_lvt: &[Entry], is_vacuum_all: bool, - respect_flash_back: bool, + respect_flash_back: Option>, abort_checker: AbortChecker, - lvt: DateTime, ) -> Result, Vec, DateTime)>> { let gc_root_path = if is_vacuum_all { // safe to unwrap, or we should have stopped vacuuming in set_lvt() fuse_table.snapshot_loc().unwrap() - } else if respect_flash_back { + } else if let Some(lvt) = respect_flash_back { let latest_location = fuse_table.snapshot_loc().unwrap(); let gc_root = fuse_table .find(latest_location, abort_checker, |snapshot| { @@ -580,6 +648,8 @@ async fn select_gc_root( gc_root_path }; + eprintln!("gc root path {}", gc_root_path); + let dal = fuse_table.get_operator_ref(); let gc_root = read_snapshot_from_location(fuse_table, &gc_root_path).await; @@ -637,6 +707,8 @@ async fn select_gc_root( })?; let snapshots_to_gc = gc_candidates[..gc_root_idx].to_vec(); + eprintln!("snapshots to gc {:?}", snapshots_to_gc); + Ok(Some((gc_root, snapshots_to_gc, gc_root_meta_ts))) } Err(e) => { diff --git a/src/query/ee_features/vacuum_handler/Cargo.toml b/src/query/ee_features/vacuum_handler/Cargo.toml index a15240d040824..7119689c57b02 100644 --- a/src/query/ee_features/vacuum_handler/Cargo.toml +++ b/src/query/ee_features/vacuum_handler/Cargo.toml @@ -10,7 +10,6 @@ edition = { workspace = true } [dependencies] async-backtrace = { workspace = true } async-trait = { workspace = true } -chrono = { workspace = true } databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-exception = { workspace = true } diff --git a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs index 03b4bc45c7478..2dec82feaf6a7 100644 --- a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs +++ b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs @@ -16,8 +16,6 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; -use chrono::DateTime; -use chrono::Utc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::AbortChecker; @@ -35,7 +33,6 @@ pub trait VacuumHandler: Sync + Send { &self, table: &dyn Table, ctx: Arc, - retention_time: DateTime, dry_run: bool, ) -> Result>>; @@ -83,12 +80,9 @@ impl VacuumHandlerWrapper { &self, table: &dyn Table, ctx: Arc, - retention_time: DateTime, dry_run: bool, ) -> Result>> { - self.handler - .do_vacuum(table, ctx, retention_time, dry_run) - .await + self.handler.do_vacuum(table, ctx, dry_run).await } #[async_backtrace::framed] diff --git a/src/query/service/src/interpreters/common/table_option_validation.rs b/src/query/service/src/interpreters/common/table_option_validation.rs index 7f58b09b6bd3d..03d25361b3816 100644 --- a/src/query/service/src/interpreters/common/table_option_validation.rs +++ b/src/query/service/src/interpreters/common/table_option_validation.rs @@ -25,6 +25,7 @@ use databend_common_settings::Settings; use databend_common_sql::BloomIndexColumns; use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP; use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; @@ -60,6 +61,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock> = LazyLock::new( r.insert(FUSE_OPT_KEY_FILE_SIZE); r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD); r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS); + r.insert(FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP); r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS); r.insert(OPT_KEY_TABLE_COMPRESSION); @@ -116,6 +118,7 @@ pub static UNSET_TABLE_OPTIONS_WHITE_LIST: LazyLock> = Laz r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD); r.insert(FUSE_OPT_KEY_FILE_SIZE); r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS); + r.insert(FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP); r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH); r }); @@ -182,6 +185,17 @@ pub fn is_valid_data_retention_period( ))); } } + + if let Some(value) = options.get(FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP) { + let new_val = value.parse::()?; + + if new_val < 1 { + return Err(ErrorCode::TableOptionInvalid(format!( + "Invalid value of the table option [{FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP}]: {:?}, it should be larger than 0", + new_val + ))); + } + } Ok(()) } diff --git a/src/query/service/src/interpreters/interpreter_table_drop.rs b/src/query/service/src/interpreters/interpreter_table_drop.rs index 5ca370bd417b0..d7de48bc11ddf 100644 --- a/src/query/service/src/interpreters/interpreter_table_drop.rs +++ b/src/query/service/src/interpreters/interpreter_table_drop.rs @@ -164,7 +164,7 @@ impl Interpreter for DropTableInterpreter { .do_truncate( self.ctx.clone(), &mut build_res.main_pipeline, - TruncateMode::Purge, + TruncateMode::DropAll, ) .await? } else { diff --git a/src/query/service/src/interpreters/interpreter_table_vacuum.rs b/src/query/service/src/interpreters/interpreter_table_vacuum.rs index 1711dd2c39383..78a05c04c683a 100644 --- a/src/query/service/src/interpreters/interpreter_table_vacuum.rs +++ b/src/query/service/src/interpreters/interpreter_table_vacuum.rs @@ -115,7 +115,6 @@ impl Interpreter for VacuumTableInterpreter { let catalog_name = self.plan.catalog.clone(); let db_name = self.plan.database.clone(); let tbl_name = self.plan.table.clone(); - let ctx = self.ctx.clone(); let table = self .ctx .get_table(&catalog_name, &db_name, &tbl_name) @@ -125,17 +124,12 @@ impl Interpreter for VacuumTableInterpreter { table.check_mutable()?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let duration = fuse_table.get_data_retention_period(ctx.as_ref())?; - - let retention_time = chrono::Utc::now() - duration; - let ctx = self.ctx.clone(); let handler = get_vacuum_handler(); let purge_files_opt = handler .do_vacuum( fuse_table, - ctx, - retention_time, + self.ctx.clone(), self.plan.option.dry_run.is_some(), ) .await?; diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index 7b80bdc6a2791..e682c4623eceb 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -417,7 +417,7 @@ pub enum TruncateMode { // Delete the data, used for delete operation. Delete, // Truncate and purge the historical data. - Purge, + DropAll, } /// Undrop. diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index 17ee47854b374..e2aa8c54af7d4 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -20,6 +20,8 @@ pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold" pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size"; pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours"; +pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str = + "data_retention_num_snapshots_to_keep"; pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids"; diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index e131790f8af81..f35d4ec829760 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -125,6 +125,7 @@ use crate::DEFAULT_ROW_PER_PAGE_FOR_BLOCKING; use crate::FUSE_OPT_KEY_ATTACH_COLUMN_IDS; use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use crate::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP; use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; use crate::FUSE_OPT_KEY_FILE_SIZE; use crate::FUSE_OPT_KEY_ROW_PER_BLOCK; @@ -468,7 +469,22 @@ impl FuseTable { } } - pub fn get_data_retention_period(&self, ctx: &dyn TableContext) -> Result { + pub fn get_data_retention_policy(&self, ctx: &dyn TableContext) -> Result { + let table_options = &self.table_info.meta.options; + + let policy = + if let Some(v) = table_options.get(FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP) { + let num_snapshot_keep = v.parse::()?; + RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshot_keep) + } else { + let duration = self.get_data_retention_period(ctx)?; + RetentionPolicy::ByTimePeriod(duration) + }; + + Ok(policy) + } + + pub fn get_data_retention_period(&self, ctx: &dyn TableContext) -> Result { let retention_period = if let Some(v) = self .table_info .meta @@ -1187,3 +1203,8 @@ impl Table for FuseTable { Ok(len) } } + +pub enum RetentionPolicy { + ByTimePeriod(TimeDelta), + ByNumOfSnapshotsToKeep(usize), +} diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index d6d806214ab31..dc462583286b3 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -46,6 +46,7 @@ pub use fuse_column::FuseTableColumnStatisticsProvider; pub use fuse_part::FuseBlockPartInfo; pub use fuse_part::FuseLazyPartInfo; pub use fuse_table::FuseTable; +pub use fuse_table::RetentionPolicy; pub use fuse_type::segment_format_from_location; pub use fuse_type::FuseSegmentFormat; pub use fuse_type::FuseStorageFormat; diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index e7c7be53ffbe7..b971390b78231 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -90,7 +90,7 @@ pub struct CommitSink { table: Arc, copied_files: Option, snapshot_gen: F, - purge: bool, + purge_mode: Option, retries: u64, max_retry_elapsed: Option, backoff: ExponentialBackoff, @@ -108,6 +108,11 @@ pub struct CommitSink { vacuum_handler: Option>, } +enum PurgeMode { + PurgeAllHistory, + PurgeAccordingToRetention, +} + impl CommitSink where F: SnapshotGenerator + Send + Sync + 'static { @@ -124,9 +129,7 @@ where F: SnapshotGenerator + Send + Sync + 'static deduplicated_label: Option, table_meta_timestamps: TableMetaTimestamps, ) -> Result { - let purge = Self::need_purge(table, &snapshot_gen) - || ctx.get_settings().get_enable_auto_vacuum()?; - + let purge_mode = Self::purge_mode(ctx.as_ref(), table, &snapshot_gen)?; let prefer_vacuum2 = ctx .get_settings() .get_enable_use_vacuum2_to_purge_transient_table_data()?; @@ -149,7 +152,7 @@ where F: SnapshotGenerator + Send + Sync + 'static table: Arc::new(table.clone()), copied_files, snapshot_gen, - purge, + purge_mode, backoff: ExponentialBackoff::default(), retries: 0, max_retry_elapsed, @@ -167,6 +170,21 @@ where F: SnapshotGenerator + Send + Sync + 'static }))) } + fn purge_mode( + ctx: &dyn TableContext, + table: &FuseTable, + snapshot_gen: &F, + ) -> Result> { + let mode = if Self::need_to_purge_all_history(table, snapshot_gen) { + Some(PurgeMode::PurgeAllHistory) + } else if ctx.get_settings().get_enable_auto_vacuum()? { + Some(PurgeMode::PurgeAccordingToRetention) + } else { + None + }; + Ok(mode) + } + fn is_error_recoverable(&self, e: &ErrorCode) -> bool { let code = e.code(); // When prev_snapshot_id is some, means it is an alter table column modification or truncate. @@ -176,7 +194,7 @@ where F: SnapshotGenerator + Send + Sync + 'static } code == ErrorCode::TABLE_VERSION_MISMATCHED - || (self.purge && code == ErrorCode::STORAGE_NOT_FOUND) + || (self.purge_mode.is_some() && code == ErrorCode::STORAGE_NOT_FOUND) } fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool { @@ -217,7 +235,7 @@ where F: SnapshotGenerator + Send + Sync + 'static Ok(Event::Async) } - fn need_purge(table: &FuseTable, snapshot_gen: &F) -> bool { + fn need_to_purge_all_history(table: &FuseTable, snapshot_gen: &F) -> bool { if table.is_transient() { return true; } @@ -225,7 +243,7 @@ where F: SnapshotGenerator + Send + Sync + 'static snapshot_gen .as_any() .downcast_ref::() - .is_some_and(|gen| matches!(gen.mode(), TruncateMode::Purge)) + .is_some_and(|gen| matches!(gen.mode(), TruncateMode::DropAll)) } fn need_truncate(&self) -> bool { @@ -242,25 +260,51 @@ where F: SnapshotGenerator + Send + Sync + 'static .is_some() } - async fn purge(&self, tbl: &FuseTable) -> Result<()> { + async fn exec_auto_purge(&self, tbl: &FuseTable, purge_mode: &PurgeMode) -> Result<()> { let keep_last_snapshot = true; - let snapshot_files = tbl.list_snapshot_files().await?; - if let Err(e) = tbl - .do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, false) - .await - { - // Errors of GC, if any, are ignored, since GC task can be picked up - warn!( - "GC of table not success (this is not a permanent error). the error : {}", - e - ); - } else { - info!("GC of table done"); + match purge_mode { + PurgeMode::PurgeAllHistory => { + let snapshot_files = tbl.list_snapshot_files().await?; + if let Err(e) = tbl + .do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, false) + .await + { + // Errors of GC, if any, are ignored, since GC task can be picked up + warn!( + "GC of table not success (this is not a permanent error). the error : {}", + e + ); + } else { + info!("GC of table done"); + } + } + PurgeMode::PurgeAccordingToRetention => { + let is_dry_run = false; + // Using setting or table option, no customized navigation point + let navigation_point = None; + // Do not limit the number of snapshots that could be removed + let num_snapshot_removal_limit = None; + + let purged_files = tbl + .purge( + self.ctx.clone(), + navigation_point, + num_snapshot_removal_limit, + keep_last_snapshot, + is_dry_run, + ) + .await?; + // `Files::delete_files` should have logged detail information of delete actions. + info!( + "auto vacuum, {} files cleared", + purged_files.map(|purged| purged.len()).unwrap_or(0) + ); + } } Ok(()) } - async fn vacuum2(&self, tbl: &FuseTable, vacuum_handler: &VacuumHandlerWrapper) { + async fn exec_auto_vacuum2(&self, tbl: &FuseTable, vacuum_handler: &VacuumHandlerWrapper) { warn!( "Vacuuming table: {}, ident: {}", tbl.table_info.name, tbl.table_info.ident @@ -277,7 +321,7 @@ where F: SnapshotGenerator + Send + Sync + 'static } } - async fn clean_history(&self) -> Result<()> { + async fn clean_history(&self, purge_mode: &PurgeMode) -> Result<()> { { let table_info = self.table.get_table_info(); info!( @@ -291,12 +335,12 @@ where F: SnapshotGenerator + Send + Sync + 'static if self.prefer_vacuum2 { if let Some(vacuum_handler) = &self.vacuum_handler { - self.vacuum2(tbl, vacuum_handler.as_ref()).await; + self.exec_auto_vacuum2(tbl, vacuum_handler.as_ref()).await; return Ok(()); } } - self.purge(tbl).await?; + self.exec_auto_purge(tbl, purge_mode).await?; Ok(()) } @@ -508,8 +552,8 @@ where F: SnapshotGenerator + Send + Sync + 'static .await?; } - if self.purge { - self.clean_history().await?; + if let Some(purge_mode) = &self.purge_mode { + self.clean_history(purge_mode).await?; } metrics_inc_commit_mutation_success(); diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 70088b150c7ca..1ec60dd3035dd 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -29,8 +29,10 @@ use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX; use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use databend_storages_common_table_meta::table::OPT_KEY_SOURCE_TABLE_ID; use futures::TryStreamExt; +use log::info; use opendal::EntryMode; +use crate::fuse_table::RetentionPolicy; use crate::io::MetaReaders; use crate::io::SnapshotHistoryReader; use crate::io::SnapshotsIO; @@ -107,6 +109,20 @@ impl FuseTable { .await } + pub async fn navigate_back_with_limit( + &self, + location: String, + limit: usize, + aborting: AbortChecker, + ) -> Result> { + let mut counter = 0; + self.find(location, aborting, |_snapshot| { + counter += 1; + counter >= limit + }) + .await + } + #[async_backtrace::framed] pub async fn navigate_to_snapshot( &self, @@ -218,9 +234,9 @@ impl FuseTable { pub async fn navigate_for_purge( &self, ctx: &Arc, - instant: Option, + navigation_point: Option, ) -> Result<(Arc, Vec)> { - let retention = self.get_data_retention_period(ctx.as_ref())?; + let retention_policy = self.get_data_retention_policy(ctx.as_ref())?; let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? { snapshot } else { @@ -230,26 +246,60 @@ impl FuseTable { }; assert!(root_snapshot.timestamp.is_some()); - let mut time_point = root_snapshot.timestamp.unwrap() - retention; - let (location, files) = match instant { - Some(NavigationPoint::TimePoint(point)) => { - time_point = std::cmp::min(point, time_point); - self.list_by_time_point(time_point).await + match retention_policy { + RetentionPolicy::ByTimePeriod(time_delta) => { + info!("navigate by time period, {:?}", time_delta); + let mut time_point = root_snapshot.timestamp.unwrap() - time_delta; + let (candidate_snapshot_path, files) = match navigation_point { + Some(NavigationPoint::TimePoint(point)) => { + time_point = std::cmp::min(point, time_point); + self.list_by_time_point(time_point).await + } + Some(NavigationPoint::SnapshotID(snapshot_id)) => { + self.list_by_snapshot_id(snapshot_id.as_str(), time_point) + .await + } + Some(NavigationPoint::StreamInfo(info)) => { + self.list_by_stream(info, time_point).await + } + None => self.list_by_time_point(time_point).await, + }?; + + let table = self + .navigate_to_time_point( + candidate_snapshot_path, + time_point, + ctx.clone().get_abort_checker(), + ) + .await?; + + Ok((table, files)) } - Some(NavigationPoint::SnapshotID(snapshot_id)) => { - self.list_by_snapshot_id(snapshot_id.as_str(), time_point) - .await + RetentionPolicy::ByNumOfSnapshotsToKeep(num) => { + assert!(num > 0); + info!("navigate by number of snapshots, {:?}", num); + let table = self + .navigate_back_with_limit( + self.snapshot_loc().unwrap(), + num, + ctx.clone().get_abort_checker(), + ) + .await?; + + // TODO + let timestamp = table + .read_table_snapshot() + .await? + .unwrap() + .timestamp + .unwrap(); + + let (_candidate_snapshot_path, files) = self.list_by_time_point(timestamp).await?; + + Ok((table, files)) } - Some(NavigationPoint::StreamInfo(info)) => self.list_by_stream(info, time_point).await, - None => self.list_by_time_point(time_point).await, - }?; - - let table = self - .navigate_to_time_point(location, time_point, ctx.clone().get_abort_checker()) - .await?; - - Ok((table, files)) + } } #[async_backtrace::framed] @@ -257,6 +307,10 @@ impl FuseTable { &self, time_point: DateTime, ) -> Result<(String, Vec)> { + let Some(location) = self.snapshot_loc() else { + return Err(ErrorCode::TableHistoricalDataNotFound("No historical data")); + }; + let prefix = format!( "{}/{}/", self.meta_location_generator().prefix(), @@ -272,10 +326,6 @@ impl FuseTable { )); } - let Some(location) = self.snapshot_loc() else { - return Err(ErrorCode::TableHistoricalDataNotFound("No historical data")); - }; - Ok((location, files)) } diff --git a/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0004_auto_vacuum.test b/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0004_auto_vacuum.test index a9e1c0980b67e..12c844112621f 100644 --- a/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0004_auto_vacuum.test +++ b/tests/sqllogictests/suites/ee/03_ee_vacuum/03_0004_auto_vacuum.test @@ -145,12 +145,84 @@ select count() from list_stage(location=> '@stage_av') where name like '%_ss%'; ---- 1 -# --------------------------------------------- -# We do not check default retention period works as expected, i.e. by default table historical data -# will be cleaned according to the retention period settings. As tests of streams will check it implicitly +################################################# +# Test autovacuum policy `ByNumSnapshotsToKeep` # +################################################# + +# CASE 1: Create table with data_retention_num_snapshots_to_keep table option +statement ok +create or replace table t (c int) 'fs:///tmp/auto_vacuum_case3/' data_retention_num_snapshots_to_keep = 3; + +statement ok +create or replace stage stage_av url = 'fs:///tmp/auto_vacuum_case3/'; + +statement ok +set enable_auto_vacuum = 1; + +statement ok +insert into t values(1); + +statement ok +insert into t values(2); + +statement ok +insert into t values(3); + +statement ok +insert into t values(4); + + +# Insert 4 time, but only 3 snapshots will be kept +onlyif mysql +query I +select count() from list_stage(location=> '@stage_av') where name like '%_ss%'; +---- +3 + +# CASE 2: Alter table option +statement ok +alter table t set options(data_retention_num_snapshots_to_keep = 1); + +statement ok +insert into t values(5); + +statement ok +optimize table t compact; + +onlyif mysql +query I +select count() from list_stage(location=> '@stage_av') where name like '%_sg%'; +---- +1 + +onlyif mysql +query I +select count() from list_stage(location=> '@stage_av') where name like '%\/_b\/%'; +---- +1 + +onlyif mysql +query I +select count() from list_stage(location=> '@stage_av') where name like '%_ss%'; +---- +1 + +query I +select * from t order by c; +---- +1 +2 +3 +4 +5 + +# CASE 3: Create table with invalid data_retention_num_snapshots_to_keep table option +# data_retention_num_snapshots_to_keep must be greater than 0 +statement error +create or replace table t (c int) 'fs:///tmp/auto_vacuum_case3/' data_retention_num_snapshots_to_keep = 0; statement ok remove @stage_av; From 7539d2cb1f4839fa3b3c80901493687b8b8be073 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 25 Apr 2025 22:54:42 +0800 Subject: [PATCH 2/3] cleanup --- .../ee/src/storages/fuse/operations/vacuum_table_v2.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs index 7fc7908fec832..dcc784e0c0314 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs @@ -457,7 +457,6 @@ async fn list_until_prefix( gc_root_meta_ts: Option>, ) -> Result> { info!("list until prefix: {}", until); - eprintln!("list until prefix inside: {}", until); let dal = fuse_table.get_operator_ref(); match dal.info().scheme() { @@ -524,10 +523,8 @@ async fn fs_list_until_prefix( let mut res = Vec::new(); for entry in entries { if entry.path() >= until { - eprintln!("entry path: {} >= until: {}", entry.path(), until); info!("entry path: {} >= until: {}", entry.path(), until); if need_one_more { - eprintln!("kept"); res.push(entry); } break; @@ -648,8 +645,6 @@ async fn select_gc_root( gc_root_path }; - eprintln!("gc root path {}", gc_root_path); - let dal = fuse_table.get_operator_ref(); let gc_root = read_snapshot_from_location(fuse_table, &gc_root_path).await; @@ -707,8 +702,6 @@ async fn select_gc_root( })?; let snapshots_to_gc = gc_candidates[..gc_root_idx].to_vec(); - eprintln!("snapshots to gc {:?}", snapshots_to_gc); - Ok(Some((gc_root, snapshots_to_gc, gc_root_meta_ts))) } Err(e) => { From 66e88ab7226ba7c3d37823308f4100e9811764ae Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 28 Apr 2025 12:42:05 +0800 Subject: [PATCH 3/3] cleanup --- .../deploy/config/databend-query-node-1.toml | 2 +- .../fuse/operations/vacuum_table_v2.rs | 18 ++++++++++-------- .../common/processors/sink_commit.rs | 4 ++++ .../storages/fuse/src/operations/navigate.rs | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index 9ef189e500600..71b6a58f02070 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -82,7 +82,7 @@ join_spilling_memory_ratio = 60 [log] [log.file] -level = "INFO" +level = "DEBUG" format = "text" dir = "./.databend/logs_1" limit = 12 # 12 files, 1 file per hour diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs index dcc784e0c0314..85e518afa6b6a 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs @@ -92,8 +92,7 @@ pub async fn do_vacuum2( let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?; - // Indicates whether to use the current table snapshot as gc root, - // true means vacuum all the historical snapshots. + // By default, do not vacuum all the historical snapshots. let mut is_vacuum_all = false; let mut respect_flash_back_with_lvt = None; @@ -107,6 +106,7 @@ pub async fn do_vacuum2( delta_duration }; + // A zero retention period indicates that we should vacuum all the historical snapshots is_vacuum_all = retention_period.is_zero(); let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else { @@ -125,7 +125,7 @@ pub async fn do_vacuum2( )); let snapshots_before_lvt = - collect_gc_candidate_by_retention_period(fuse_table, lvt, is_vacuum_all).await?; + collect_gc_candidates_by_retention_period(fuse_table, lvt, is_vacuum_all).await?; snapshots_before_lvt } RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => { @@ -157,10 +157,12 @@ pub async fn do_vacuum2( is_vacuum_all = true; } - // When selecting the GC root later, the last snapshot in `snapshots` is a candidate, - // but its commit status is uncertain, its previous snapshot is typically used as the GC root, except in the is_vacuum_all case. - // - // Therefore, during snapshot truncation, we keep 2 extra snapshots; see `select_gc_root` for details. + // When selecting the GC root later, the last snapshot in `snapshots` (after truncation) + // is the candidate, but its commit status is uncertain, so its previous snapshot is used + // as the GC root instead (except in the is_vacuum_all case). + + // Therefore, during snapshot truncation, we keep 2 extra snapshots; + // see `select_gc_root` for details. let num_candidates = len - num_snapshots_to_keep + 2; snapshots.truncate(num_candidates); snapshots @@ -375,7 +377,7 @@ pub async fn do_vacuum2( Ok(files_to_gc) } -async fn collect_gc_candidate_by_retention_period( +async fn collect_gc_candidates_by_retention_period( fuse_table: &FuseTable, lvt: DateTime, is_vacuum_all: bool, diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index b971390b78231..fc67f74cd955c 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -264,6 +264,8 @@ where F: SnapshotGenerator + Send + Sync + 'static let keep_last_snapshot = true; match purge_mode { PurgeMode::PurgeAllHistory => { + // Purge all history, using current table snapshot as gc root. + let snapshot_files = tbl.list_snapshot_files().await?; if let Err(e) = tbl .do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, false) @@ -279,6 +281,8 @@ where F: SnapshotGenerator + Send + Sync + 'static } } PurgeMode::PurgeAccordingToRetention => { + // Navigate to the retention point, and purge history before that point + let is_dry_run = false; // Using setting or table option, no customized navigation point let navigation_point = None; diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 1ec60dd3035dd..09e1dd1f5fcdf 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -287,7 +287,7 @@ impl FuseTable { ) .await?; - // TODO + // Safe to unwrap: table snapshot and snapshot timestamp exist, otherwise we should not be here let timestamp = table .read_table_snapshot() .await?