Skip to content

Commit 57af75f

Browse files
authored
Merge pull request #9060 from lichuang/last_snapshot_bug
fix: update last_snapshot_hint file when purge
2 parents 99511d8 + 080fce6 commit 57af75f

File tree

8 files changed

+79
-13
lines changed

8 files changed

+79
-13
lines changed

src/query/service/tests/it/storages/fuse/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#![allow(clippy::too_many_arguments)]
1516
mod io;
1617
mod meta;
1718
mod misc;

src/query/service/tests/it/storages/fuse/operations/optimize.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ async fn do_purge_test(
8383
segment_count,
8484
block_count,
8585
index_count,
86+
Some(()),
8687
)
87-
.await;
88+
.await?;
8889
history_should_have_item(&fixture, case_name, snapshot_count).await?;
8990

9091
if let Some((snapshot_count, table_statistic_count, segment_count, block_count, index_count)) =
@@ -101,8 +102,9 @@ async fn do_purge_test(
101102
segment_count,
102103
block_count,
103104
index_count,
105+
Some(()),
104106
)
105-
.await;
107+
.await?;
106108

107109
history_should_have_item(&fixture, case_name, snapshot_count).await?;
108110
};

src/query/service/tests/it/storages/fuse/operations/purge_drop.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ async fn test_fuse_snapshot_truncate_in_drop_all_stmt() -> Result<()> {
5858
0, // 0 segments
5959
0, // 0 blocks
6060
0, // 0 index
61+
None,
6162
)
62-
.await;
63+
.await?;
6364
Ok(())
6465
}

src/query/service/tests/it/storages/fuse/operations/purge_truncate.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,17 @@ async fn test_fuse_truncate_purge_stmt() -> Result<()> {
3434

3535
let expected_index_count = 2;
3636
// there should be some data there: 2 snapshot, 2 segment, 2 block
37-
check_data_dir(&fixture, "truncate_purge", 2, 0, 2, 2, expected_index_count).await;
37+
check_data_dir(
38+
&fixture,
39+
"truncate_purge",
40+
2,
41+
0,
42+
2,
43+
2,
44+
expected_index_count,
45+
Some(()),
46+
)
47+
.await?;
3848

3949
// let's truncate
4050
let qry = format!("truncate table {}.{} purge", db, tbl);
@@ -57,7 +67,8 @@ async fn test_fuse_truncate_purge_stmt() -> Result<()> {
5767
0,
5868
0,
5969
0,
70+
Some(()),
6071
)
61-
.await;
72+
.await?;
6273
Ok(())
6374
}

src/query/service/tests/it/storages/fuse/table_test_fixture.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::VecDeque;
16+
use std::str;
1617
use std::sync::Arc;
1718

1819
use common_ast::ast::Engine;
@@ -30,6 +31,7 @@ use common_sql::plans::CreateDatabasePlan;
3031
use common_sql::plans::CreateTablePlanV2;
3132
use common_storage::StorageFsConfig;
3233
use common_storage::StorageParams;
34+
use common_storages_fuse::FuseTable;
3335
use common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
3436
use common_storages_table_meta::table::OPT_KEY_DATABASE_ID;
3537
use databend_query::interpreters::append2table;
@@ -46,6 +48,7 @@ use databend_query::sql::Planner;
4648
use databend_query::storages::fuse::table_functions::ClusteringInformationTable;
4749
use databend_query::storages::fuse::table_functions::FuseSnapshotTable;
4850
use databend_query::storages::fuse::FUSE_TBL_BLOCK_PREFIX;
51+
use databend_query::storages::fuse::FUSE_TBL_LAST_SNAPSHOT_HINT;
4952
use databend_query::storages::fuse::FUSE_TBL_SEGMENT_PREFIX;
5053
use databend_query::storages::fuse::FUSE_TBL_SNAPSHOT_PREFIX;
5154
use databend_query::storages::fuse::FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
@@ -461,7 +464,8 @@ pub async fn check_data_dir(
461464
segment_count: u32,
462465
block_count: u32,
463466
index_count: u32,
464-
) {
467+
check_last_snapshot: Option<()>,
468+
) -> Result<()> {
465469
let data_path = match fixture.ctx().get_config().storage.params {
466470
StorageParams::Fs(v) => v.root,
467471
_ => panic!("storage type is not fs"),
@@ -472,11 +476,13 @@ pub async fn check_data_dir(
472476
let mut sg_count = 0;
473477
let mut b_count = 0;
474478
let mut i_count = 0;
479+
let mut last_snapshot_loc = "".to_string();
475480
let prefix_snapshot = FUSE_TBL_SNAPSHOT_PREFIX;
476481
let prefix_snapshot_statistics = FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
477482
let prefix_segment = FUSE_TBL_SEGMENT_PREFIX;
478483
let prefix_block = FUSE_TBL_BLOCK_PREFIX;
479484
let prefix_index = FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
485+
let prefix_last_snapshot_hint = FUSE_TBL_LAST_SNAPSHOT_HINT;
480486
for entry in WalkDir::new(root) {
481487
let entry = entry.unwrap();
482488
if entry.file_type().is_file() {
@@ -494,6 +500,14 @@ pub async fn check_data_dir(
494500
i_count += 1;
495501
} else if path.starts_with(prefix_snapshot_statistics) {
496502
ts_count += 1;
503+
} else if path.starts_with(prefix_last_snapshot_hint) && check_last_snapshot.is_some() {
504+
let content = fixture
505+
.ctx
506+
.get_data_operator()?
507+
.object(entry_path)
508+
.read()
509+
.await?;
510+
last_snapshot_loc = str::from_utf8(&content)?.to_string();
497511
}
498512
}
499513
}
@@ -525,6 +539,20 @@ pub async fn check_data_dir(
525539
"case [{}], check index count",
526540
case_name
527541
);
542+
543+
if check_last_snapshot.is_some() {
544+
let table = fixture.latest_default_table().await?;
545+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
546+
let snapshot_loc = fuse_table.snapshot_loc().await?;
547+
let snapshot_loc = snapshot_loc.unwrap();
548+
assert!(last_snapshot_loc.contains(&snapshot_loc));
549+
assert_eq!(
550+
last_snapshot_loc.find(&snapshot_loc),
551+
Some(last_snapshot_loc.len() - snapshot_loc.len())
552+
);
553+
}
554+
555+
Ok(())
528556
}
529557

530558
pub async fn history_should_have_item(

src/query/storages/fuse/fuse/src/operations/commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ impl FuseTable {
443443
}
444444

445445
// Left a hint file which indicates the location of the latest snapshot
446-
async fn write_last_snapshot_hint(
446+
pub async fn write_last_snapshot_hint(
447447
operator: &Operator,
448448
location_generator: &TableMetaLocationGenerator,
449449
last_snapshot_path: String,

src/query/storages/fuse/fuse/src/operations/gc.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,25 @@ impl FuseTable {
8282

8383
let start = Instant::now();
8484
(all_snapshot_lites, all_segment_locations) = snapshots_io
85-
.read_snapshot_lites(root_snapshot_location, None, true, root_snapshot_ts, |x| {
86-
self.data_metrics.set_status(&x);
87-
})
85+
.read_snapshot_lites(
86+
root_snapshot_location.clone(),
87+
None,
88+
true,
89+
root_snapshot_ts,
90+
|x| {
91+
self.data_metrics.set_status(&x);
92+
},
93+
)
8894
.await?;
8995

96+
// try keep a hit file of last snapshot
97+
Self::write_last_snapshot_hint(
98+
&self.operator,
99+
&self.meta_location_generator,
100+
root_snapshot_location,
101+
)
102+
.await;
103+
90104
status_snapshot_scan_count += all_snapshot_lites.len();
91105
status_snapshot_scan_cost += start.elapsed().as_secs();
92106
}

src/query/storages/fuse/fuse/src/operations/truncate.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ impl FuseTable {
5656

5757
let mut new_table_meta = self.table_info.meta.clone();
5858
// update snapshot location
59-
new_table_meta
60-
.options
61-
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), new_snapshot_loc);
59+
new_table_meta.options.insert(
60+
OPT_KEY_SNAPSHOT_LOCATION.to_owned(),
61+
new_snapshot_loc.clone(),
62+
);
6263

6364
// update table statistics, all zeros
6465
new_table_meta.statistics = TableStatistics::default();
@@ -80,6 +81,14 @@ impl FuseTable {
8081
catalog
8182
.truncate_table(&tenant, &db_name, TruncateTableReq { table_id })
8283
.await?;
84+
85+
// try keep a hit file of last snapshot
86+
Self::write_last_snapshot_hint(
87+
&self.operator,
88+
&self.meta_location_generator,
89+
new_snapshot_loc,
90+
)
91+
.await;
8392
}
8493

8594
Ok(())

0 commit comments

Comments
 (0)