Skip to content

Commit 99c06bd

Browse files
committed
new retention policy
1 parent b73a9eb commit 99c06bd

File tree

17 files changed

+409
-134
lines changed

17 files changed

+409
-134
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scripts/ci/deploy/config/databend-query-node-1.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ join_spilling_memory_ratio = 60
8282
[log]
8383

8484
[log.file]
85-
level = "DEBUG"
85+
level = "INFO"
8686
format = "text"
8787
dir = "./.databend/logs_1"
8888
limit = 12 # 12 files, 1 file per hour

src/query/ee/src/storages/fuse/operations/handler.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use chrono::DateTime;
18-
use chrono::Utc;
1917
use databend_common_base::base::GlobalInstance;
2018
use databend_common_catalog::table::Table;
2119
use databend_common_catalog::table_context::AbortChecker;
@@ -38,10 +36,9 @@ impl VacuumHandler for RealVacuumHandler {
3836
&self,
3937
table: &dyn Table,
4038
ctx: Arc<dyn TableContext>,
41-
retention_time: DateTime<Utc>,
4239
dry_run: bool,
4340
) -> Result<Option<Vec<String>>> {
44-
do_vacuum(table, ctx, retention_time, dry_run).await
41+
do_vacuum(table, ctx, dry_run).await
4542
}
4643

4744
async fn do_vacuum2(

src/query/ee/src/storages/fuse/operations/vacuum_table.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::time::Instant;
1818

1919
use chrono::DateTime;
2020
use chrono::Utc;
21-
use databend_common_catalog::table::NavigationPoint;
2221
use databend_common_catalog::table::Table;
2322
use databend_common_catalog::table_context::TableContext;
2423
use databend_common_exception::ErrorCode;
@@ -28,6 +27,7 @@ use databend_common_storages_fuse::io::SnapshotLiteExtended;
2827
use databend_common_storages_fuse::io::SnapshotsIO;
2928
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3029
use databend_common_storages_fuse::FuseTable;
30+
use databend_common_storages_fuse::RetentionPolicy;
3131
use databend_storages_common_cache::LoadParams;
3232
use databend_storages_common_table_meta::meta::SegmentInfo;
3333

@@ -358,23 +358,39 @@ pub async fn do_dry_run_orphan_files(
358358
pub async fn do_vacuum(
359359
table: &dyn Table,
360360
ctx: Arc<dyn TableContext>,
361-
retention_time: DateTime<Utc>,
362361
dry_run: bool,
363362
) -> Result<Option<Vec<String>>> {
364363
let fuse_table = FuseTable::try_from_table(table)?;
365364
let start = Instant::now();
366365
// First, do purge
367-
let instant = Some(NavigationPoint::TimePoint(retention_time));
368366
let dry_run_limit = if dry_run { Some(DRY_RUN_LIMIT) } else { None };
367+
// Let the table navigate to the point according to the table's retention policy.
368+
let navigation_point = None;
369+
let keep_last_snapshot = true;
369370
let purge_files_opt = fuse_table
370-
.purge(ctx.clone(), instant, dry_run_limit, true, dry_run)
371+
.purge(
372+
ctx.clone(),
373+
navigation_point,
374+
dry_run_limit,
375+
keep_last_snapshot,
376+
dry_run,
377+
)
371378
.await?;
372379
let status = format!("do_vacuum: purged table, cost:{:?}", start.elapsed());
373380
ctx.set_status_info(&status);
374-
let retention = fuse_table.get_data_retention_period(ctx.as_ref())?;
375-
// use min(now - get_retention_period(), retention_time) as gc orphan files retention time
376-
// to protect files that generated by txn which has not been committed being gc.
377-
let retention_time = std::cmp::min(chrono::Utc::now() - retention, retention_time);
381+
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;
382+
383+
let retention_period = match retention_policy {
384+
RetentionPolicy::ByTimePeriod(retention_period) => retention_period,
385+
RetentionPolicy::ByNumOfSnapshotsToKeep(_) => {
386+
// Technically, we should derive a reasonable retention period from the ByNumOfSnapshotsToKeep policy,
387+
// but it's not worth the effort since VACUUM2 will replace legacy purge and vacuum soon.
388+
// Use the table retention period for now.
389+
fuse_table.get_data_retention_period(ctx.as_ref())?
390+
}
391+
};
392+
393+
let retention_time = chrono::Utc::now() - retention_period;
378394
if let Some(mut purge_files) = purge_files_opt {
379395
let dry_run_limit = dry_run_limit.unwrap();
380396
if purge_files.len() < dry_run_limit {

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 121 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use std::collections::HashSet;
1616
use std::sync::Arc;
1717

1818
use chrono::DateTime;
19-
use chrono::Days;
2019
use chrono::Duration;
20+
use chrono::TimeDelta;
2121
use chrono::Utc;
2222
use databend_common_base::base::uuid::Uuid;
2323
use databend_common_catalog::table::Table;
@@ -32,6 +32,7 @@ use databend_common_storages_fuse::io::MetaReaders;
3232
use databend_common_storages_fuse::io::SegmentsIO;
3333
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3434
use databend_common_storages_fuse::FuseTable;
35+
use databend_common_storages_fuse::RetentionPolicy;
3536
use databend_storages_common_cache::CacheAccessor;
3637
use databend_storages_common_cache::CacheManager;
3738
use databend_storages_common_cache::LoadParams;
@@ -89,57 +90,91 @@ pub async fn do_vacuum2(
8990
let fuse_table = FuseTable::try_from_table(table)?;
9091
let start = std::time::Instant::now();
9192

92-
let retention_period_in_days = if fuse_table.is_transient() {
93-
0
94-
} else {
95-
ctx.get_settings().get_data_retention_time_in_days()?
96-
};
93+
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;
9794

98-
let is_vacuum_all = retention_period_in_days == 0;
95+
// Indicates whether to use the current table snapshot as gc root,
96+
// true means vacuum all the historical snapshots.
97+
let mut is_vacuum_all = false;
98+
let mut respect_flash_back_with_lvt = None;
9999

100-
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period_in_days).await? else {
101-
return Ok(vec![]);
102-
};
100+
let snapshots_before_lvt = match retention_policy {
101+
RetentionPolicy::ByTimePeriod(delta_duration) => {
102+
info!("using by ByTimePeriod policy {:?}", delta_duration);
103+
let retention_period = if fuse_table.is_transient() {
104+
// For transient table, keep no history data
105+
TimeDelta::zero()
106+
} else {
107+
delta_duration
108+
};
103109

104-
ctx.set_status_info(&format!(
105-
"set lvt for table {} takes {:?}, lvt: {:?}",
106-
fuse_table.get_table_info().desc,
107-
start.elapsed(),
108-
lvt
109-
));
110+
is_vacuum_all = retention_period.is_zero();
110111

111-
let start = std::time::Instant::now();
112-
let snapshots_before_lvt = if is_vacuum_all {
113-
list_until_prefix(
114-
fuse_table,
115-
fuse_table
116-
.meta_location_generator()
117-
.snapshot_location_prefix(),
118-
fuse_table.snapshot_loc().unwrap().as_str(),
119-
true,
120-
None,
121-
)
122-
.await?
123-
} else {
124-
list_until_timestamp(
125-
fuse_table,
126-
fuse_table
127-
.meta_location_generator()
128-
.snapshot_location_prefix(),
129-
lvt,
130-
true,
131-
None,
132-
)
133-
.await?
112+
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else {
113+
return Ok(vec![]);
114+
};
115+
116+
if respect_flash_back {
117+
respect_flash_back_with_lvt = Some(lvt);
118+
}
119+
120+
ctx.set_status_info(&format!(
121+
"set lvt for table {} takes {:?}, lvt: {:?}",
122+
fuse_table.get_table_info().desc,
123+
start.elapsed(),
124+
lvt
125+
));
126+
127+
let snapshots_before_lvt =
128+
collect_gc_candidate_by_retention_period(fuse_table, lvt, is_vacuum_all).await?;
129+
snapshots_before_lvt
130+
}
131+
RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => {
132+
info!(
133+
"using by ByNumOfSnapshotsToKeep policy {:?}",
134+
num_snapshots_to_keep
135+
);
136+
// List the snapshot order by timestamp asc, till the current snapshot(inclusively).
137+
let need_one_more = true;
138+
let mut snapshots = list_until_prefix(
139+
fuse_table,
140+
fuse_table
141+
.meta_location_generator()
142+
.snapshot_location_prefix(),
143+
fuse_table.snapshot_loc().unwrap().as_str(),
144+
need_one_more,
145+
None,
146+
)
147+
.await?;
148+
149+
let len = snapshots.len();
150+
if len <= num_snapshots_to_keep {
151+
// Only the current snapshot is there, done
152+
return Ok(vec![]);
153+
}
154+
if num_snapshots_to_keep == 1 {
155+
// Expecting only one snapshot left, which means that we can use the current snapshot
156+
// as gc root, this flag will be propagated to the select_gc_root func later.
157+
is_vacuum_all = true;
158+
}
159+
160+
// When selecting the GC root later, the last snapshot in `snapshots` is a candidate,
161+
// but its commit status is uncertain, its previous snapshot is typically used as the GC root, except in the is_vacuum_all case.
162+
//
163+
// Therefore, during snapshot truncation, we keep 2 extra snapshots; see `select_gc_root` for details.
164+
let num_candidates = len - num_snapshots_to_keep + 2;
165+
snapshots.truncate(num_candidates);
166+
snapshots
167+
}
134168
};
135169

136170
let elapsed = start.elapsed();
137171
ctx.set_status_info(&format!(
138-
"list snapshots before lvt for table {} takes {:?}, snapshots_dir: {:?}, lvt: {:?}, snapshots: {:?}",
172+
"list snapshots for table {} takes {:?}, snapshots_dir: {:?}, snapshots: {:?}",
139173
fuse_table.get_table_info().desc,
140174
elapsed,
141-
fuse_table.meta_location_generator().snapshot_location_prefix(),
142-
lvt,
175+
fuse_table
176+
.meta_location_generator()
177+
.snapshot_location_prefix(),
143178
slice_summary(&snapshots_before_lvt)
144179
));
145180

@@ -148,9 +183,8 @@ pub async fn do_vacuum2(
148183
fuse_table,
149184
&snapshots_before_lvt,
150185
is_vacuum_all,
151-
respect_flash_back,
186+
respect_flash_back_with_lvt,
152187
ctx.clone().get_abort_checker(),
153-
lvt,
154188
)
155189
.await?
156190
else {
@@ -341,13 +375,45 @@ pub async fn do_vacuum2(
341375
Ok(files_to_gc)
342376
}
343377

378+
async fn collect_gc_candidate_by_retention_period(
379+
fuse_table: &FuseTable,
380+
lvt: DateTime<Utc>,
381+
is_vacuum_all: bool,
382+
) -> Result<Vec<Entry>> {
383+
let snapshots_before_lvt = if is_vacuum_all {
384+
list_until_prefix(
385+
fuse_table,
386+
fuse_table
387+
.meta_location_generator()
388+
.snapshot_location_prefix(),
389+
fuse_table.snapshot_loc().unwrap().as_str(),
390+
true,
391+
None,
392+
)
393+
.await?
394+
} else {
395+
list_until_timestamp(
396+
fuse_table,
397+
fuse_table
398+
.meta_location_generator()
399+
.snapshot_location_prefix(),
400+
lvt,
401+
true,
402+
None,
403+
)
404+
.await?
405+
};
406+
407+
Ok(snapshots_before_lvt)
408+
}
409+
344410
/// Try set lvt as min(latest_snapshot.timestamp, now - retention_time).
345411
///
346412
/// Return `None` means we stop vacuumming, but don't want to report error to user.
347413
async fn set_lvt(
348414
fuse_table: &FuseTable,
349415
ctx: &dyn TableContext,
350-
retention: u64,
416+
retention_period: TimeDelta,
351417
) -> Result<Option<DateTime<Utc>>> {
352418
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
353419
info!(
@@ -366,7 +432,7 @@ async fn set_lvt(
366432
let cat = ctx.get_default_catalog()?;
367433
// safe to unwrap, as we have checked the version is v5
368434
let latest_ts = latest_snapshot.timestamp.unwrap();
369-
let lvt_point_candidate = std::cmp::min(Utc::now() - Days::new(retention), latest_ts);
435+
let lvt_point_candidate = std::cmp::min(Utc::now() - retention_period, latest_ts);
370436

371437
let lvt_point = cat
372438
.set_table_lvt(
@@ -391,6 +457,7 @@ async fn list_until_prefix(
391457
gc_root_meta_ts: Option<DateTime<Utc>>,
392458
) -> Result<Vec<Entry>> {
393459
info!("list until prefix: {}", until);
460+
eprintln!("list until prefix inside: {}", until);
394461
let dal = fuse_table.get_operator_ref();
395462

396463
match dal.info().scheme() {
@@ -457,8 +524,10 @@ async fn fs_list_until_prefix(
457524
let mut res = Vec::new();
458525
for entry in entries {
459526
if entry.path() >= until {
527+
eprintln!("entry path: {} >= until: {}", entry.path(), until);
460528
info!("entry path: {} >= until: {}", entry.path(), until);
461529
if need_one_more {
530+
eprintln!("kept");
462531
res.push(entry);
463532
}
464533
break;
@@ -538,14 +607,13 @@ async fn select_gc_root(
538607
fuse_table: &FuseTable,
539608
snapshots_before_lvt: &[Entry],
540609
is_vacuum_all: bool,
541-
respect_flash_back: bool,
610+
respect_flash_back: Option<DateTime<Utc>>,
542611
abort_checker: AbortChecker,
543-
lvt: DateTime<Utc>,
544612
) -> Result<Option<(Arc<TableSnapshot>, Vec<String>, DateTime<Utc>)>> {
545613
let gc_root_path = if is_vacuum_all {
546614
// safe to unwrap, or we should have stopped vacuuming in set_lvt()
547615
fuse_table.snapshot_loc().unwrap()
548-
} else if respect_flash_back {
616+
} else if let Some(lvt) = respect_flash_back {
549617
let latest_location = fuse_table.snapshot_loc().unwrap();
550618
let gc_root = fuse_table
551619
.find(latest_location, abort_checker, |snapshot| {
@@ -580,6 +648,8 @@ async fn select_gc_root(
580648
gc_root_path
581649
};
582650

651+
eprintln!("gc root path {}", gc_root_path);
652+
583653
let dal = fuse_table.get_operator_ref();
584654
let gc_root = read_snapshot_from_location(fuse_table, &gc_root_path).await;
585655

@@ -637,6 +707,8 @@ async fn select_gc_root(
637707
})?;
638708
let snapshots_to_gc = gc_candidates[..gc_root_idx].to_vec();
639709

710+
eprintln!("snapshots to gc {:?}", snapshots_to_gc);
711+
640712
Ok(Some((gc_root, snapshots_to_gc, gc_root_meta_ts)))
641713
}
642714
Err(e) => {

src/query/ee_features/vacuum_handler/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ edition = { workspace = true }
1010
[dependencies]
1111
async-backtrace = { workspace = true }
1212
async-trait = { workspace = true }
13-
chrono = { workspace = true }
1413
databend-common-base = { workspace = true }
1514
databend-common-catalog = { workspace = true }
1615
databend-common-exception = { workspace = true }

0 commit comments

Comments
 (0)