Skip to content

Commit 81e8b52

Browse files
authored
Merge pull request #9119 from lichuang/purge_ts_files
fix: purge table statistic files
2 parents 69f83a6 + f939ac9 commit 81e8b52

File tree

7 files changed

+150
-13
lines changed

7 files changed

+150
-13
lines changed

docs/doc/14-sql-commands/00-ddl/20-table/60-optimize-table.md

+6-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ title: OPTIMIZE TABLE
55
The objective of optimizing a table in Databend is to compact or purge its historical data in your object storage. This helps save storage space and improve query efficiency.
66

77
:::caution
8-
Databend's Time Travel feature relies on historical data. If you purge historical data from a table with the command `OPTIMIZE TABLE <your_table> PURGE` or `OPTIMIZE TABLE <your_table> ALL`, the table will not be eligible for time travel. The command removes all snapshots (except the most recent one) and their associated segments and block files.
8+
Databend's Time Travel feature relies on historical data. If you purge historical data from a table with the command `OPTIMIZE TABLE <your_table> PURGE` or `OPTIMIZE TABLE <your_table> ALL`, the table will not be eligible for time travel. The command removes all snapshots (except the most recent one) and their associated segments,block files and table statistic file.
99
:::
1010

11-
## What are Snapshot, Segment, and Block?
11+
## What are Snapshot, Segment, Block and Table statistic file?
1212

1313
Snapshot, segment, and block are the concepts Databend uses for data storage. Databend uses them to construct a hierarchical structure for storing table data.
1414

@@ -20,6 +20,8 @@ A snapshot is a JSON file that does not save the table's data but indicate the s
2020

2121
A segment is a JSON file that organizes the storage blocks (at least 1, at most 1,000) where the data is stored. If you run [FUSE_SEGMENT](../../../15-sql-functions/111-system-functions/fuse_segment.md) against a snapshot with the snapshot ID, you can find which segments are referenced by the snapshot.
2222

23+
A table statistic file is a JSON file that save table statistic data, such as distinct values of table column.
24+
2325
Databends saves actual table data in parquet files and considers each parquet file as a block. If you run [FUSE_BLOCK](../../../15-sql-functions/111-system-functions/fuse_block.md) against a snapshot with the snapshot ID, you can find which blocks are referenced by the snapshot.
2426

2527
Databend creates a unique ID for each database and table for storing the snapshot, segment, and block files and saves them to your object storage in the path `<bucket_name>/[root]/<db_id>/<table_id>/`. Each snapshot, segment, and block file is named with a UUID (32-character lowercase hexadecimal string).
@@ -29,6 +31,7 @@ Databend creates a unique ID for each database and table for storing the snapsho
2931
| Snapshot | JSON | `<32bitUUID>_<version>.json` | `<bucket_name>/[root]/<db_id>/<table_id>/_ss/` |
3032
| Segment | JSON | `<32bitUUID>_<version>.json` | `<bucket_name>/[root]/<db_id>/<table_id>/_sg/` |
3133
| Block | parquet | `<32bitUUID>_<version>.parquet` | `<bucket_name>/[root]/<db_id>/<table_id>/_b/` |
34+
| Table statistic | JSON | `<32bitUUID>_<version>.json` | `<bucket_name>/[root]/<db_id>/<table_id>/_ts/` |
3235

3336
## Table Optimization Considerations
3437

@@ -69,7 +72,7 @@ OPTIMIZE TABLE [database.]table_name [ PURGE | COMPACT | ALL | STATISTIC ] [SEGM
6972

7073
- `OPTIMIZE TABLE <table_name> PURGE`
7174

72-
Purges the historical data of table. Only the latest snapshot (including the segments and blocks referenced by this snapshot) will be kept.
75+
Purges the historical data of table. Only the latest snapshot (including the segments, blocks and table statistic file referenced by this snapshot) will be kept.
7376

7477
- `OPTIMIZE TABLE <table_name> COMPACT [LIMIT <segment_count>]`
7578

src/query/service/tests/it/storages/fuse/operations/optimize.rs

+29
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,33 @@ async fn test_fuse_snapshot_optimize_statistic() -> Result<()> {
4848
.await
4949
}
5050

51+
#[tokio::test]
52+
async fn test_fuse_snapshot_optimize_statistic_purge() -> Result<()> {
53+
let fixture = TestFixture::new().await;
54+
let db = fixture.default_db_name();
55+
let tbl = fixture.default_table_name();
56+
let case_name = "optimize_statistic_purge";
57+
do_insertions(&fixture).await?;
58+
59+
// optimize statistics twice
60+
for i in 0..1 {
61+
let qry = format!("optimize table {}.{} statistic", db, tbl);
62+
63+
let ctx = fixture.ctx();
64+
execute_command(ctx, &qry).await?;
65+
66+
check_data_dir(&fixture, case_name, 3, 1 + i, 2, 2, 2, Some(()), None).await?;
67+
}
68+
69+
// After compact, all the count will become 1
70+
let qry = format!("optimize table {}.{} all", db, tbl);
71+
execute_command(fixture.ctx().clone(), &qry).await?;
72+
73+
check_data_dir(&fixture, case_name, 1, 1, 1, 1, 1, Some(()), Some(())).await?;
74+
75+
Ok(())
76+
}
77+
5178
#[tokio::test]
5279
async fn test_fuse_snapshot_optimize_all() -> Result<()> {
5380
do_purge_test("explicit pure", "all", 1, 0, 1, 1, 1, None).await
@@ -84,6 +111,7 @@ async fn do_purge_test(
84111
block_count,
85112
index_count,
86113
Some(()),
114+
None,
87115
)
88116
.await?;
89117
history_should_have_item(&fixture, case_name, snapshot_count).await?;
@@ -103,6 +131,7 @@ async fn do_purge_test(
103131
block_count,
104132
index_count,
105133
Some(()),
134+
None,
106135
)
107136
.await?;
108137

src/query/service/tests/it/storages/fuse/operations/purge_drop.rs

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ async fn test_fuse_snapshot_truncate_in_drop_all_stmt() -> Result<()> {
5959
0, // 0 blocks
6060
0, // 0 index
6161
None,
62+
None,
6263
)
6364
.await?;
6465
Ok(())

src/query/service/tests/it/storages/fuse/operations/purge_truncate.rs

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ async fn test_fuse_truncate_purge_stmt() -> Result<()> {
4343
2,
4444
expected_index_count,
4545
Some(()),
46+
None,
4647
)
4748
.await?;
4849

@@ -68,6 +69,7 @@ async fn test_fuse_truncate_purge_stmt() -> Result<()> {
6869
0,
6970
0,
7071
Some(()),
72+
None,
7173
)
7274
.await?;
7375
Ok(())

src/query/service/tests/it/storages/fuse/table_test_fixture.rs

+23
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ pub async fn check_data_dir(
466466
block_count: u32,
467467
index_count: u32,
468468
check_last_snapshot: Option<()>,
469+
check_table_statistic_file: Option<()>,
469470
) -> Result<()> {
470471
let data_path = match &GlobalConfig::instance().storage.params {
471472
StorageParams::Fs(v) => v.root.clone(),
@@ -478,6 +479,7 @@ pub async fn check_data_dir(
478479
let mut b_count = 0;
479480
let mut i_count = 0;
480481
let mut last_snapshot_loc = "".to_string();
482+
let mut table_statistic_files = vec![];
481483
let prefix_snapshot = FUSE_TBL_SNAPSHOT_PREFIX;
482484
let prefix_snapshot_statistics = FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
483485
let prefix_segment = FUSE_TBL_SEGMENT_PREFIX;
@@ -501,6 +503,7 @@ pub async fn check_data_dir(
501503
i_count += 1;
502504
} else if path.starts_with(prefix_snapshot_statistics) {
503505
ts_count += 1;
506+
table_statistic_files.push(entry_path.to_string());
504507
} else if path.starts_with(prefix_last_snapshot_hint) && check_last_snapshot.is_some() {
505508
let content = fixture
506509
.ctx
@@ -553,6 +556,26 @@ pub async fn check_data_dir(
553556
);
554557
}
555558

559+
if check_table_statistic_file.is_some() {
560+
let table = fixture.latest_default_table().await?;
561+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
562+
let snapshot_opt = fuse_table.read_table_snapshot().await?;
563+
assert!(snapshot_opt.is_some());
564+
let snapshot = snapshot_opt.unwrap();
565+
let ts_location_opt = snapshot.table_statistics_location.clone();
566+
assert!(ts_location_opt.is_some());
567+
let ts_location = ts_location_opt.unwrap();
568+
println!(
569+
"ts_location_opt: {:?}, table_statistic_files: {:?}",
570+
ts_location, table_statistic_files
571+
);
572+
assert!(
573+
table_statistic_files
574+
.iter()
575+
.any(|e| e.contains(&ts_location))
576+
);
577+
}
578+
556579
Ok(())
557580
}
558581

src/query/storages/fuse/fuse/src/io/snapshots.rs

+24-2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,20 @@ impl SnapshotsIO {
102102
.map_err(|e| ErrorCode::StorageOther(format!("read snapshots failure, {}", e)))
103103
}
104104

105+
// Read all the table statistic files by the root file(exclude the root file).
106+
// limit: read how many table statistic files
107+
pub async fn read_table_statistic_files(
108+
&self,
109+
root_ts_file: &str,
110+
limit: Option<usize>,
111+
) -> Result<Vec<String>> {
112+
// Get all file list.
113+
if let Some(prefix) = Self::get_s3_prefix_from_file(root_ts_file) {
114+
return self.get_files(&prefix, limit, Some(root_ts_file)).await;
115+
}
116+
Ok(vec![])
117+
}
118+
105119
// Read all the snapshots by the root file.
106120
// limit: read how many snapshot files
107121
// with_segment_locations: if true will get the segments of the snapshot
@@ -123,7 +137,7 @@ impl SnapshotsIO {
123137
let mut snapshot_files = vec![];
124138
let mut segment_locations = HashSet::new();
125139
if let Some(prefix) = Self::get_s3_prefix_from_file(&root_snapshot_file) {
126-
snapshot_files = self.get_files(&prefix, limit).await?;
140+
snapshot_files = self.get_files(&prefix, limit, None).await?;
127141
}
128142

129143
// 1. Get all the snapshot by chunks.
@@ -199,14 +213,22 @@ impl SnapshotsIO {
199213
Ok((snapshot_chain, segment_locations))
200214
}
201215

202-
async fn get_files(&self, prefix: &str, limit: Option<usize>) -> Result<Vec<String>> {
216+
async fn get_files(
217+
&self,
218+
prefix: &str,
219+
limit: Option<usize>,
220+
exclude_file: Option<&str>,
221+
) -> Result<Vec<String>> {
203222
let data_accessor = self.operator.clone();
204223

205224
let mut file_list = vec![];
206225
let mut ds = data_accessor.object(prefix).list().await?;
207226
while let Some(de) = ds.try_next().await? {
208227
match de.mode().await? {
209228
ObjectMode::FILE => {
229+
if exclude_file.is_some() && Some(de.path()) == exclude_file {
230+
continue;
231+
}
210232
let location = de.path().to_string();
211233
let modified = de.last_modified().await?;
212234
file_list.push((location, modified));

src/query/storages/fuse/fuse/src/operations/gc.rs

+65-8
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,20 @@ impl FuseTable {
5858
// 1. Root snapshot.
5959
let mut segments_referenced_by_root = HashSet::new();
6060
let mut locations_referenced_by_root = Default::default();
61-
let (root_snapshot_id, root_snapshot_ts) = if let Some(root_snapshot) = snapshot_opt {
62-
let segments = root_snapshot.segments.clone();
63-
locations_referenced_by_root = self.get_block_locations(ctx.clone(), &segments).await?;
64-
segments_referenced_by_root = HashSet::from_iter(segments);
65-
(root_snapshot.snapshot_id, root_snapshot.timestamp)
66-
} else {
67-
(SnapshotId::new_v4(), None)
68-
};
61+
let (root_snapshot_id, root_snapshot_ts, root_ts_location_opt) =
62+
if let Some(ref root_snapshot) = snapshot_opt {
63+
let segments = root_snapshot.segments.clone();
64+
locations_referenced_by_root =
65+
self.get_block_locations(ctx.clone(), &segments).await?;
66+
segments_referenced_by_root = HashSet::from_iter(segments);
67+
(
68+
root_snapshot.snapshot_id,
69+
root_snapshot.timestamp,
70+
root_snapshot.table_statistics_location.clone(),
71+
)
72+
} else {
73+
(SnapshotId::new_v4(), None, None)
74+
};
6975

7076
// 2. Get all snapshot(including root snapshot).
7177
let mut all_snapshot_lites = vec![];
@@ -108,6 +114,7 @@ impl FuseTable {
108114
// 3. Find.
109115
let mut snapshots_to_be_purged = HashSet::new();
110116
let mut segments_to_be_purged = HashSet::new();
117+
let ts_to_be_purged: Vec<String> = vec![];
111118

112119
// 3.1 Find all the snapshots need to be deleted.
113120
{
@@ -131,6 +138,29 @@ impl FuseTable {
131138
}
132139
}
133140

141+
// 3.3 Find all the table statistic files need to be deleted
142+
{
143+
if let Some(root_ts_location) = root_ts_location_opt {
144+
let start = Instant::now();
145+
let snapshots_io = SnapshotsIO::create(
146+
ctx.clone(),
147+
self.operator.clone(),
148+
self.snapshot_format_version().await?,
149+
);
150+
let ts_to_be_purged = snapshots_io
151+
.read_table_statistic_files(&root_ts_location, None)
152+
.await?;
153+
let status_ts_scan_count = ts_to_be_purged.len();
154+
let status_ts_scan_cost = start.elapsed().as_secs();
155+
let status = format!(
156+
"gc: scan table statistic files:{} takes:{} sec.",
157+
status_ts_scan_count, status_ts_scan_cost,
158+
);
159+
self.data_metrics.set_status(&status);
160+
info!(status);
161+
}
162+
}
163+
134164
let chunk_size = ctx.get_settings().get_max_storage_io_requests()? as usize;
135165

136166
// 4. Purge segments&blocks by chunk size
@@ -240,6 +270,33 @@ impl FuseTable {
240270
}
241271
}
242272

273+
// 6. Purge table statistic files
274+
{
275+
let mut status_purged_count = 0;
276+
let status_need_purged_count = ts_to_be_purged.len();
277+
let start = Instant::now();
278+
for chunk in ts_to_be_purged.chunks(chunk_size) {
279+
let mut ts_locations_to_be_purged = HashSet::new();
280+
for file in chunk {
281+
ts_locations_to_be_purged.insert(file.clone());
282+
}
283+
self.try_purge_location_files(ctx.clone(), ts_locations_to_be_purged)
284+
.await?;
285+
// Refresh status.
286+
{
287+
status_purged_count += chunk.len();
288+
let status = format!(
289+
"gc: table statistic files need to be purged:{}, have purged:{}, take:{} sec",
290+
status_need_purged_count,
291+
status_purged_count,
292+
start.elapsed().as_secs()
293+
);
294+
self.data_metrics.set_status(&status);
295+
info!(status);
296+
}
297+
}
298+
}
299+
243300
Ok(())
244301
}
245302

0 commit comments

Comments
 (0)