From 1faf7312dd3116126bc0715bcd68b47f1dec09b8 Mon Sep 17 00:00:00 2001 From: meox3259 Date: Wed, 11 Jun 2025 14:36:05 +0800 Subject: [PATCH 1/6] feat: enable levels to cache sstables to local disk if uploaded to S3 tmp modify tmp commit feat: enable levels to cache sstables to local disk if uploaded to S3 feat: enable levels to cache sstables to local disk if uploaded to S3 fix ci & unit test & format fix ut fix wasm & python ci fix ci remove garbage info & fix token --- benches/common.rs | 5 + bindings/js/src/db.rs | 6 +- bindings/js/src/options.rs | 9 +- bindings/python/example/fusion_storage.py | 4 +- bindings/python/src/options.rs | 9 +- bindings/python/tests/test_s3.py | 6 +- bindings/python/tests/test_table_level.py | 4 +- guide/src/start.md | 4 +- guide/src/usage/advance.md | 2 +- guide/src/usage/tonbo.md | 2 +- src/compaction/leveled.rs | 231 +++++++++++++++++++--- src/compaction/mod.rs | 162 +++++++++++---- src/fs/manager.rs | 25 ++- src/inmem/immutable.rs | 2 +- src/lib.rs | 41 ++-- src/option.rs | 16 +- src/snapshot.rs | 2 +- src/stream/level.rs | 2 +- src/transaction.rs | 8 +- src/version/cleaner.rs | 10 + src/version/mod.rs | 14 +- tests/wasm.rs | 4 +- 22 files changed, 455 insertions(+), 113 deletions(-) diff --git a/benches/common.rs b/benches/common.rs index ceb535ab..8947766c 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -257,30 +257,35 @@ impl BenchDatabase for TonboS3BenchDataBase { 0, fusio::path::Path::from_url_path("/l0").unwrap(), fs_options.clone(), + false, ) .unwrap() .level_path( 1, fusio::path::Path::from_url_path("/l1").unwrap(), fs_options.clone(), + false, ) .unwrap() .level_path( 2, fusio::path::Path::from_url_path("/l2").unwrap(), fs_options.clone(), + false, ) .unwrap() .level_path( 3, fusio::path::Path::from_url_path("/l3").unwrap(), fs_options.clone(), + false, ) .unwrap() .level_path( 4, fusio::path::Path::from_url_path("/l4").unwrap(), fs_options.clone(), + false, ) .unwrap() .disable_wal(); diff --git a/bindings/js/src/db.rs b/bindings/js/src/db.rs index 0284222b..4f4b560b 100644 --- a/bindings/js/src/db.rs +++ b/bindings/js/src/db.rs @@ -349,11 +349,11 @@ mod tests { let option = DbOption::new("write_s3".to_string()) .expect("cannot open DB") - .level_path(0, "js/l0".to_string(), fs_option.clone()) + .level_path(0, "js/l0".to_string(), fs_option.clone(), true) .unwrap() - .level_path(1, "js/l1".to_string(), fs_option.clone()) + .level_path(1, "js/l1".to_string(), fs_option.clone(), false) .unwrap() - .level_path(2, "js/l2".to_string(), fs_option.clone()) + .level_path(2, "js/l2".to_string(), fs_option.clone(), true) .unwrap(); let schema = schema(); diff --git a/bindings/js/src/options.rs b/bindings/js/src/options.rs index 6ffea70d..569e3ba5 100644 --- a/bindings/js/src/options.rs +++ b/bindings/js/src/options.rs @@ -26,7 +26,7 @@ pub struct DbOption { /// build the `DB` storage directory based on the passed path path: String, base_fs: FsOptions, - level_paths: Vec>, + level_paths: Vec>, } #[wasm_bindgen] @@ -57,8 +57,9 @@ impl DbOption { level: usize, path: String, fs_options: FsOptions, + cached: bool, ) -> Result { - self.level_paths[level] = Some((path.to_string(), fs_options)); + self.level_paths[level] = Some((path.to_string(), fs_options, cached)); Ok(self) } } @@ -77,10 +78,10 @@ impl DbOption { .base_fs(self.base_fs.into_fs_options()); for (level, path) in self.level_paths.into_iter().enumerate() { - if let Some((path, fs_options)) = path { + if let Some((path, fs_options, cached)) = path { let path = fs_options.path(path).unwrap(); opt = opt - .level_path(level, path, fs_options.into_fs_options()) + .level_path(level, path, fs_options.into_fs_options(), cached) .unwrap(); } } diff --git a/bindings/python/example/fusion_storage.py b/bindings/python/example/fusion_storage.py index 87b8f079..c2206cc1 100644 --- a/bindings/python/example/fusion_storage.py +++ b/bindings/python/example/fusion_storage.py @@ -21,8 +21,8 @@ async def main(): os.makedirs("db_path/user/l1") option = DbOption(from_filesystem_path("db_path/user")) - option.level_path(0, from_filesystem_path("db_path/user/l0"), FsOptions.Local()) - option.level_path(1, from_filesystem_path("db_path/user/l1"), FsOptions.Local()) + option.level_path(0, from_filesystem_path("db_path/user/l0"), FsOptions.Local(), False) + option.level_path(1, from_filesystem_path("db_path/user/l1"), FsOptions.Local(), False) option.immutable_chunk_num = 1 option.major_threshold_with_sst_size = 3 diff --git a/bindings/python/src/options.rs b/bindings/python/src/options.rs index 886bdbea..dbd19f7a 100644 --- a/bindings/python/src/options.rs +++ b/bindings/python/src/options.rs @@ -38,7 +38,7 @@ pub struct DbOption { path: String, #[pyo3(get, set)] base_fs: FsOptions, - level_paths: Vec>, + level_paths: Vec>, } #[pymethods] @@ -61,11 +61,11 @@ impl DbOption { } } - fn level_path(&mut self, level: usize, path: String, fs_options: FsOptions) -> PyResult<()> { + fn level_path(&mut self, level: usize, path: String, fs_options: FsOptions, cached: bool) -> PyResult<()> { if level >= MAX_LEVEL { ExceedsMaxLevelError::new_err("Exceeds max level"); } - self.level_paths[level] = Some((path, fs_options)); + self.level_paths[level] = Some((path, fs_options, cached)); Ok(()) } } @@ -82,12 +82,13 @@ impl DbOption { .version_log_snapshot_threshold(self.version_log_snapshot_threshold) .base_fs(tonbo::option::FsOptions::from(self.base_fs)); for (level, path) in self.level_paths.into_iter().enumerate() { - if let Some((path, fs_options)) = path { + if let Some((path, fs_options, cached)) = path { opt = opt .level_path( level, Path::from(path), tonbo::option::FsOptions::from(fs_options), + cached, ) .unwrap(); } diff --git a/bindings/python/tests/test_s3.py b/bindings/python/tests/test_s3.py index 4c6c834e..3b924200 100644 --- a/bindings/python/tests/test_s3.py +++ b/bindings/python/tests/test_s3.py @@ -27,9 +27,9 @@ async def test_s3_read_write(): fs_option = FsOptions.S3("wasm-data", credential,"ap-southeast-2",None, None, None) option = DbOption(temp_dir.name) - option.level_path(0, from_url_path("l0"), fs_option) - option.level_path(1, from_url_path("l1"), fs_option) - option.level_path(2, from_url_path("l2"), fs_option) + option.level_path(0, from_url_path("l0"), fs_option, False) + option.level_path(1, from_url_path("l1"), fs_option, False) + option.level_path(2, from_url_path("l2"), fs_option, False) option.immutable_chunk_num = 1 option.major_threshold_with_sst_size = 3 diff --git a/bindings/python/tests/test_table_level.py b/bindings/python/tests/test_table_level.py index 6f4a4d34..7e22856e 100644 --- a/bindings/python/tests/test_table_level.py +++ b/bindings/python/tests/test_table_level.py @@ -36,8 +36,8 @@ async def test_table_level_local(): temp_dir1 = tempfile.TemporaryDirectory() option = DbOption(from_filesystem_path(temp_dir.name)) - option.level_path(0, from_filesystem_path(temp_dir0.name), FsOptions.Local()) - option.level_path(1, from_filesystem_path(temp_dir1.name), FsOptions.Local()) + option.level_path(0, from_filesystem_path(temp_dir0.name), FsOptions.Local(), False) + option.level_path(1, from_filesystem_path(temp_dir1.name), FsOptions.Local(), False) option.immutable_chunk_num = 1 option.major_threshold_with_sst_size = 3 diff --git a/guide/src/start.md b/guide/src/start.md index ef7f23bf..6f0b7e6e 100644 --- a/guide/src/start.md +++ b/guide/src/start.md @@ -219,8 +219,8 @@ let s3_option = FsOptions::S3 { let options = DbOption::new( Path::from_filesystem_path("./db_path/users").unwrap(), &UserSchema, -).level_path(2, "l2", s3_option.clone()) -).level_path(3, "l3", s3_option); +).level_path(2, "l2", s3_option.clone(), false) +).level_path(3, "l3", s3_option, true); ``` In this example, data for level 2 and level 3 will be stored in S3, while all other levels remain on the local disk. If there is data in level 2 and level 3, you can verify and access it in S3: diff --git a/guide/src/usage/advance.md b/guide/src/usage/advance.md index eb27faeb..e5a58fa6 100644 --- a/guide/src/usage/advance.md +++ b/guide/src/usage/advance.md @@ -256,7 +256,7 @@ async fn main() { ]; let schema = DynSchema::new(descs, 0); let options = DbOption::new(Path::from_filesystem_path("s3_path").unwrap(), &schema) - .level_path(2, "l2", fs_option); + .level_path(2, "l2", fs_option, true); let db = DB::::new(options, TokioExecutor::current(), schema) diff --git a/guide/src/usage/tonbo.md b/guide/src/usage/tonbo.md index c8f4c6b6..3422c45f 100644 --- a/guide/src/usage/tonbo.md +++ b/guide/src/usage/tonbo.md @@ -202,7 +202,7 @@ async fn main() { }; let options = DbOption::new(Path::from_filesystem_path("s3_path").unwrap(), &UserSchema) - .level_path(2, "l2", fs_option); + .level_path(2, "l2", fs_option, false); let db = DB::::new(options, TokioExecutor::current(), UserSchema) .await diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 2c079192..9972c7c0 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -1,4 +1,4 @@ -use std::{cmp, collections::Bound, mem, sync::Arc}; +use std::{cmp, collections::Bound, marker::PhantomData, mem, sync::Arc}; use async_lock::{RwLock, RwLockUpgradableReadGuard}; use fusio_parquet::writer::AsyncWriter; @@ -8,6 +8,7 @@ use super::Compactor; use crate::{ compaction::CompactionError, context::Context, + executor::Executor, fs::{generate_file_id, manager::StoreManager, FileId, FileType}, inmem::{immutable::Immutable, mutable::MutableMemTable}, ondisk::sstable::SsTable, @@ -18,19 +19,23 @@ use crate::{ DbOption, DbStorage, }; -pub(crate) struct LeveledCompactor +pub(crate) struct LeveledCompactor where - R: Record, + R: Record + Send + 'static, + E: Executor + Send + Sync + 'static, { option: Arc, schema: Arc>>, ctx: Arc>, record_schema: Arc, + _p: PhantomData, } -impl LeveledCompactor +impl LeveledCompactor where - R: Record, + R: Record + Send + Sync + 'static, + E: Executor + Send + Sync + 'static, + R::Schema: 'static, { pub(crate) fn new( schema: Arc>>, @@ -38,17 +43,19 @@ where option: Arc, ctx: Arc>, ) -> Self { - LeveledCompactor:: { + LeveledCompactor:: { option, schema, ctx, record_schema, + _p: PhantomData, } } pub(crate) async fn check_then_compaction( &mut self, is_manual: bool, + executor: Arc, ) -> Result<(), CompactionError> { let mut guard = self.schema.write().await; @@ -110,6 +117,7 @@ where &mut delete_gens, &guard.record_schema, &self.ctx, + &executor, ) .await?; } @@ -117,7 +125,6 @@ where version_edits.push(VersionEdit::LatestTimeStamp { ts: version_ref.increase_ts(), }); - self.ctx .version_set .apply_edits(version_edits, Some(delete_gens), false) @@ -184,6 +191,39 @@ where } } writer.close().await?; + + let level_0_cached = option.level_fs_cached(0); + if level_0_cached { + let level_0_cache_fs = manager.get_cache(level_0_path); + let mut writer = AsyncArrowWriter::try_new( + AsyncWriter::new( + level_0_cache_fs + .open_options( + &option.table_path(gen, 0), + FileType::Parquet.open_options(true), + ) + .await?, + ), + schema.arrow_schema().clone(), + Some(option.write_parquet_properties.clone()), + )?; + for (file_id, batch) in batches { + if let (Some(batch_min), Some(batch_max)) = batch.scope() { + if matches!(min.as_ref().map(|min| min > batch_min), Some(true) | None) { + min = Some(batch_min.clone()) + } + if matches!(max.as_ref().map(|max| max < batch_max), Some(true) | None) { + max = Some(batch_max.clone()) + } + } + writer.write(batch.as_record_batch()).await?; + if let Some(file_id) = file_id { + wal_ids.push(*file_id); + } + } + writer.close().await?; + } + return Ok(Some(Scope { min: min.ok_or(CompactionError::EmptyLevel)?, max: max.ok_or(CompactionError::EmptyLevel)?, @@ -197,13 +237,14 @@ where #[allow(clippy::too_many_arguments)] async fn major_compaction( version: &Version, - option: &DbOption, + option: &Arc, mut min: &::Key, mut max: &::Key, version_edits: &mut Vec::Key>>, delete_gens: &mut Vec<(FileId, usize)>, - instance: &R::Schema, + instance: &Arc, ctx: &Context, + executor: &Arc, ) -> Result<(), CompactionError> { let mut level = 0; @@ -216,6 +257,7 @@ where Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?; let level_path = option.level_fs_path(level).unwrap_or(&option.base_path); + let level_fs = ctx.manager.get_fs(level_path); let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len()); // This Level @@ -241,7 +283,7 @@ where }); } } else { - let (lower, upper) = Compactor::::full_scope(&meet_scopes_l)?; + let (lower, upper) = Compactor::::full_scope(&meet_scopes_l)?; let level_scan_l = LevelStream::new( version, level, @@ -260,9 +302,12 @@ where inner: level_scan_l, }); } + let level_l_path = option.level_fs_path(level + 1).unwrap_or(&option.base_path); + let level_l_fs = ctx.manager.get_fs(level_l_path); + let level_l_cache = ctx.manager.get_cache(level_l_path); if !meet_scopes_ll.is_empty() { // Next Level - let (lower, upper) = Compactor::::full_scope(&meet_scopes_ll)?; + let (lower, upper) = Compactor::::full_scope(&meet_scopes_ll)?; let level_scan_ll = LevelStream::new( version, level + 1, @@ -272,7 +317,7 @@ where u32::MAX.into(), None, ProjectionMask::all(), - level_fs.clone(), + level_l_fs.clone(), ctx.parquet_lru.clone(), ) .ok_or(CompactionError::EmptyLevel)?; @@ -282,18 +327,19 @@ where }); } - let level_l_path = option.level_fs_path(level + 1).unwrap_or(&option.base_path); - let level_l_fs = ctx.manager.get_fs(level_l_path); - Compactor::::build_tables( + let cached_to_local = option.level_fs_cached(level + 1); + Compactor::::build_tables( option, version_edits, level + 1, streams, instance, level_l_fs, + level_l_cache, + &executor, + cached_to_local, ) .await?; - for scope in meet_scopes_l { version_edits.push(VersionEdit::Remove { level: level as u8, @@ -468,6 +514,7 @@ pub(crate) mod tests { 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), FsOptions::Local, + false, ) .unwrap(); let manager = @@ -552,7 +599,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = LeveledCompactor::::minor_compaction( + let scope = LeveledCompactor::::minor_compaction( &option, None, &vec![ @@ -615,7 +662,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = LeveledCompactor::::minor_compaction( + let scope = LeveledCompactor::::minor_compaction( &option, None, &vec![ @@ -639,11 +686,141 @@ pub(crate) mod tests { } #[tokio::test(flavor = "multi_thread")] - async fn major_compaction() { + async fn major_compaction_cached() { + use fusio::remotes::aws::AwsCredential; + + if option_env!("AWS_ACCESS_KEY_ID").is_none() + || option_env!("AWS_SECRET_ACCESS_KEY").is_none() + { + eprintln!("can not get `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`"); + return; + } + let key_id = std::option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string(); + let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY") + .unwrap() + .to_string(); + let token = None; + let bucket = std::env::var("BUCKET_NAME").expect("expected s3 bucket not to be empty"); + let region = Some(std::env::var("AWS_REGION").expect("expected s3 region not to be empty")); + + let fs_option = fusio_log::FsOptions::S3 { + bucket, + credential: Some(AwsCredential { + key_id, + secret_key, + token, + }), + endpoint: None, + sign_payload: None, + checksum: None, + region, + }; + let temp_dir = TempDir::new().unwrap(); let temp_dir_l0 = TempDir::new().unwrap(); let temp_dir_l1 = TempDir::new().unwrap(); + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .level_path( + 0, + Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), + FsOptions::Local, + false, + ) + .unwrap() + .level_path( + 1, + Path::from_filesystem_path(temp_dir_l1.path()).unwrap(), + fs_option.clone(), + true, + ) + .unwrap(); + option.major_threshold_with_sst_size = 2; + let option = Arc::new(option); + let manager = Arc::new( + StoreManager::new(option.base_fs.clone(), option.level_paths.clone()).unwrap(), + ); + + manager + .base_fs() + .create_dir_all(&option.version_log_dir_path()) + .await + .unwrap(); + manager + .base_fs() + .create_dir_all(&option.wal_dir_path()) + .await + .unwrap(); + + let ((table_gen_1, table_gen_2, table_gen_3, table_gen_4, _), version) = + build_version(&option, &manager, &Arc::new(TestSchema), false).await; + let ((_, _, _, _, _), _) = + build_version(&option, &manager, &Arc::new(TestSchema), true).await; + + let min = 2.to_string(); + let max = 5.to_string(); + let mut version_edits = Vec::new(); + let (_, clean_sender) = Cleaner::new(option.clone(), manager.clone()); + let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()) + .await + .unwrap(); + let ctx = Context::new( + manager.clone(), + Arc::new(NoCache::default()), + version_set, + TestSchema.arrow_schema().clone(), + ); + + let executor = Arc::new(TokioExecutor::current()); + LeveledCompactor::::major_compaction( + &version, + &option, + &min, + &max, + &mut version_edits, + &mut vec![], + &Arc::new(TestSchema), + &ctx, + &executor, + ) + .await + .unwrap(); + if let VersionEdit::Add { level, scope } = &version_edits[0] { + assert_eq!(*level, 1); + assert_eq!(scope.min, 1.to_string()); + assert_eq!(scope.max, 6.to_string()); + } + assert_eq!( + version_edits[1..5].to_vec(), + vec![ + VersionEdit::Remove { + level: 0, + gen: table_gen_1, + }, + VersionEdit::Remove { + level: 0, + gen: table_gen_2, + }, + VersionEdit::Remove { + level: 1, + gen: table_gen_3, + }, + VersionEdit::Remove { + level: 1, + gen: table_gen_4, + }, + ] + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn major_compaction() { + let temp_dir = TempDir::new().unwrap(); + let temp_dir_l0 = TempDir::new().unwrap(); + let temp_dir_l1 = TempDir::new().unwrap(); let mut option = DbOption::new( Path::from_filesystem_path(temp_dir.path()).unwrap(), &TestSchema, @@ -652,12 +829,14 @@ pub(crate) mod tests { 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), FsOptions::Local, + false, ) .unwrap() .level_path( 1, Path::from_filesystem_path(temp_dir_l1.path()).unwrap(), FsOptions::Local, + false, ) .unwrap(); option.major_threshold_with_sst_size = 2; @@ -678,7 +857,7 @@ pub(crate) mod tests { .unwrap(); let ((table_gen_1, table_gen_2, table_gen_3, table_gen_4, _), version) = - build_version(&option, &manager, &Arc::new(TestSchema)).await; + build_version(&option, &manager, &Arc::new(TestSchema), false).await; let min = 2.to_string(); let max = 5.to_string(); @@ -695,15 +874,17 @@ pub(crate) mod tests { TestSchema.arrow_schema().clone(), ); - LeveledCompactor::::major_compaction( + let executor = Arc::new(TokioExecutor::current()); + LeveledCompactor::::major_compaction( &version, &option, &min, &max, &mut version_edits, &mut vec![], - &TestSchema, + &Arc::new(TestSchema), &ctx, + &executor, ) .await .unwrap(); @@ -843,15 +1024,17 @@ pub(crate) mod tests { version_set, TestSchema.arrow_schema().clone(), ); - LeveledCompactor::::major_compaction( + let executor = Arc::new(TokioExecutor::current()); + LeveledCompactor::::major_compaction( &version, &option, &min, &max, &mut version_edits, &mut vec![], - &TestSchema, + &Arc::new(TestSchema), &ctx, + &executor, ) .await .unwrap(); diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 91e37efe..e6f91f86 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -5,11 +5,13 @@ use fusio::DynFs; use fusio_parquet::writer::AsyncWriter; use futures_util::StreamExt; use leveled::LeveledCompactor; +use log::error; use parquet::arrow::AsyncArrowWriter; use thiserror::Error; use tokio::sync::oneshot; use crate::{ + executor::Executor, fs::{generate_file_id, FileType}, inmem::immutable::{ArrowArrays, Builder}, record::{KeyRef, Record, Schema as RecordSchema}, @@ -20,11 +22,12 @@ use crate::{ DbOption, }; -pub(crate) enum Compactor +pub(crate) enum Compactor where R: Record, + E: Executor + Send + Sync + 'static, { - Leveled(LeveledCompactor), + Leveled(LeveledCompactor), } #[derive(Debug)] @@ -33,26 +36,31 @@ pub enum CompactTask { Flush(Option>), } -impl Compactor +impl Compactor where - R: Record, + R: Record + Send + Sync, + E: Executor + Send + Sync + 'static, { pub(crate) async fn check_then_compaction( &mut self, is_manual: bool, + executor: Arc, ) -> Result<(), CompactionError> { match self { - Compactor::Leveled(leveled) => leveled.check_then_compaction(is_manual).await, + Compactor::Leveled(leveled) => leveled.check_then_compaction(is_manual, executor).await, } } async fn build_tables<'scan>( - option: &DbOption, + option: &Arc, version_edits: &mut Vec::Key>>, level: usize, streams: Vec>, - schema: &R::Schema, + schema: &Arc, fs: &Arc, + cache: &Arc, + executor: &Arc, + cached_to_local: bool, ) -> Result<(), CompactionError> { let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; @@ -61,7 +69,7 @@ where ::Columns::builder(schema.arrow_schema().clone(), 8192); let mut min = None; let mut max = None; - + let (manifest_tx, manifest_cx) = flume::unbounded(); while let Some(result) = Pin::new(&mut stream).next().await { let entry = result?; let key = entry.key(); @@ -73,32 +81,92 @@ where builder.push(key, entry.value()); if builder.written_size() >= option.max_sst_file_size { + let columns = Arc::new(builder.finish(None)); + if cached_to_local { + let option = Arc::clone(option); + let schema = schema.clone(); + let cache = Arc::clone(cache); + let mut min = min.clone(); + let mut max = max.clone(); + let columns = Arc::clone(&columns); + let manifest_tx = manifest_tx.clone(); + executor.spawn(async move { + if let Err(e) = Self::build_table( + &option, + level, + &columns, + &mut min, + &mut max, + &schema, + &cache, + &manifest_tx, + cached_to_local, + ) + .await + { + error!("Build Table Error: {}", e); + } + }); + } Self::build_table( option, - version_edits, level, - &mut builder, + &columns, &mut min, &mut max, - schema, - fs, + &schema, + &fs, + &manifest_tx, + !cached_to_local, ) .await?; } } if builder.written_size() > 0 { + let columns = Arc::new(builder.finish(None)); + if cached_to_local { + let option = Arc::clone(option); + let schema = schema.clone(); + let cache = Arc::clone(cache); + let mut min = min.clone(); + let mut max = max.clone(); + let columns = Arc::clone(&columns); + let manifest_tx = manifest_tx.clone(); + executor.spawn(async move { + if let Err(err) = Self::build_table( + &option, + level, + &columns, + &mut min, + &mut max, + &schema, + &cache, + &manifest_tx, + cached_to_local, + ) + .await + { + error!("Build Table Error: {}", err); + } + }); + } Self::build_table( option, - version_edits, level, - &mut builder, + &columns, &mut min, &mut max, schema, - fs, + &fs, + &manifest_tx, + !cached_to_local, ) .await?; } + drop(manifest_tx); + while let Ok(version_edit) = manifest_cx.recv_async().await { + version_edits.push(version_edit); + } Ok(()) } @@ -119,19 +187,19 @@ where #[allow(clippy::too_many_arguments)] async fn build_table( option: &DbOption, - version_edits: &mut Vec::Key>>, level: usize, - builder: &mut <::Columns as ArrowArrays>::Builder, + columns: &Arc<<::Schema as RecordSchema>::Columns>, min: &mut Option<::Key>, max: &mut Option<::Key>, schema: &R::Schema, fs: &Arc, + manifest_tx: &flume::Sender::Key>>, + modify_manifest: bool, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); debug_assert!(max.is_some()); let gen = generate_file_id(); - let columns = builder.finish(None); let mut writer = AsyncArrowWriter::try_new( AsyncWriter::new( fs.open_options( @@ -145,15 +213,21 @@ where )?; writer.write(columns.as_record_batch()).await?; writer.close().await?; - version_edits.push(VersionEdit::Add { - level: level as u8, - scope: Scope { - min: min.take().ok_or(CompactionError::EmptyLevel)?, - max: max.take().ok_or(CompactionError::EmptyLevel)?, - gen, - wal_ids: None, - }, - }); + + if modify_manifest { + manifest_tx + .send_async(VersionEdit::Add { + level: level as u8, + scope: Scope { + min: min.take().ok_or(CompactionError::EmptyLevel)?, + max: max.take().ok_or(CompactionError::EmptyLevel)?, + gen, + wal_ids: None, + }, + }) + .await + .map_err(|_| CompactionError::ChannelClose)?; + } Ok(()) } } @@ -259,15 +333,33 @@ pub(crate) mod tests { option: &Arc, manager: &StoreManager, schema: &Arc, + test_cached: bool, ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { - let level_0_fs = option - .level_fs_path(0) - .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); - let level_1_fs = option - .level_fs_path(1) - .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); + let level_0_cache = option.level_fs_cached(0); + let level_0_fs = if test_cached && level_0_cache { + option + .level_fs_path(0) + .map(|path| manager.get_cache(path)) + .unwrap_or(manager.base_fs()) + } else { + option + .level_fs_path(0) + .map(|path| manager.get_fs(path)) + .unwrap_or(manager.base_fs()) + }; + + let level_1_cache = option.level_fs_cached(1); + let level_1_fs = if test_cached && level_1_cache { + option + .level_fs_path(1) + .map(|path| manager.get_cache(path)) + .unwrap_or(manager.base_fs()) + } else { + option + .level_fs_path(1) + .map(|path| manager.get_fs(path)) + .unwrap_or(manager.base_fs()) + }; // level 0 let table_gen_1 = generate_file_id(); diff --git a/src/fs/manager.rs b/src/fs/manager.rs index c6eb11d0..a695bb56 100644 --- a/src/fs/manager.rs +++ b/src/fs/manager.rs @@ -1,29 +1,36 @@ use std::{collections::HashMap, sync::Arc}; -use fusio::{disk::LocalFs, dynamic::DynFs, path::Path, Error}; +use fusio::{disk::LocalFs, dynamic::DynFs, fs::FileSystemTag, path::Path, Error}; use fusio_dispatch::FsOptions; pub struct StoreManager { base_fs: Arc, local_fs: Arc, fs_map: HashMap>, + cache_map: HashMap>, } impl StoreManager { pub fn new( base_options: FsOptions, - levels_fs: Vec>, + levels_fs: Vec>, ) -> Result { let mut fs_map = HashMap::with_capacity(levels_fs.len()); - - for (path, fs_options) in levels_fs.into_iter().flatten() { - fs_map.entry(path).or_insert(fs_options.parse()?); + let mut cache_map: HashMap> = HashMap::with_capacity(levels_fs.len()); + for (path, fs_options, enable_local_cache) in levels_fs.into_iter().flatten() { + let fs = fs_options.parse()?; + if enable_local_cache && fs.file_system() == FileSystemTag::S3 { + cache_map + .entry(path.clone()) + .or_insert(Arc::new(LocalFs {})); + } + fs_map.entry(path).or_insert(fs); } let base_fs = base_options.parse()?; - Ok(StoreManager { base_fs, fs_map, + cache_map, local_fs: Arc::new(LocalFs {}), }) } @@ -39,6 +46,12 @@ impl StoreManager { pub fn get_fs(&self, path: &Path) -> &Arc { self.fs_map.get(path).unwrap_or(&self.base_fs) } + + pub fn get_cache(&self, path: &Path) -> &Arc { + self.cache_map + .get(path) + .unwrap_or(self.fs_map.get(path).unwrap_or(&self.base_fs)) + } } // TODO: TestCases diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 3cbd405c..84ce60f5 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -15,7 +15,7 @@ use crate::{ timestamp::{Timestamp, Ts, TsRef, EPOCH}, }; -pub trait ArrowArrays: Sized + Sync { +pub trait ArrowArrays: Sized + Sync + Send { type Record: Record; type Builder: Builder; diff --git a/src/lib.rs b/src/lib.rs index dc8ed384..01a3b139 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,7 +187,7 @@ where impl DB where R: Record + Send + Sync, - ::Columns: Send + Sync, + ::Columns: Send + Sync + 'static, E: Executor + Send + Sync + 'static, { /// Open [`DB`] with a [`DbOption`]. This will create a new directory at the @@ -267,7 +267,7 @@ where record_schema.arrow_schema().clone(), )); let mut compactor = match option.compaction_option { - CompactionOption::Leveled => Compactor::Leveled(LeveledCompactor::::new( + CompactionOption::Leveled => Compactor::Leveled(LeveledCompactor::::new( schema.clone(), record_schema, option.clone(), @@ -275,18 +275,26 @@ where )), }; - executor.spawn(async move { + let executor_arc = Arc::new(executor); + executor_arc.spawn(async move { if let Err(err) = cleaner.listen().await { error!("[Cleaner Error]: {}", err) } }); - executor.spawn(async move { + let executor_arc_clone = Arc::clone(&executor_arc); + executor_arc.spawn(async move { while let Ok(task) = task_rx.recv_async().await { if let Err(err) = match task { - CompactTask::Freeze => compactor.check_then_compaction(false).await, + CompactTask::Freeze => { + compactor + .check_then_compaction(false, Arc::clone(&executor_arc_clone)) + .await + } CompactTask::Flush(option_tx) => { - let mut result = compactor.check_then_compaction(true).await; + let mut result = compactor + .check_then_compaction(true, Arc::clone(&executor_arc_clone)) + .await; if let Some(tx) = option_tx { if result.is_ok() { result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -1407,7 +1415,7 @@ pub(crate) mod tests { TestSchema.arrow_schema().clone(), )); let mut compactor = match option.compaction_option { - CompactionOption::Leveled => Compactor::Leveled(LeveledCompactor::::new( + CompactionOption::Leveled => Compactor::Leveled(LeveledCompactor::::new( schema.clone(), record_schema, option.clone(), @@ -1415,17 +1423,26 @@ pub(crate) mod tests { )), }; - executor.spawn(async move { + let executor_arc = Arc::new(executor); + executor_arc.spawn(async move { if let Err(err) = cleaner.listen().await { error!("[Cleaner Error]: {}", err) } }); - executor.spawn(async move { + + let executor_arc_clone = Arc::clone(&executor_arc); + executor_arc.spawn(async move { while let Ok(task) = compaction_rx.recv_async().await { if let Err(err) = match task { - CompactTask::Freeze => compactor.check_then_compaction(false).await, + CompactTask::Freeze => { + compactor + .check_then_compaction(false, Arc::clone(&executor_arc_clone)) + .await + } CompactTask::Flush(option_tx) => { - let mut result = compactor.check_then_compaction(true).await; + let mut result = compactor + .check_then_compaction(true, Arc::clone(&executor_arc_clone)) + .await; if let Some(tx) = option_tx { let channel_result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -1470,7 +1487,7 @@ pub(crate) mod tests { let path_l0 = Path::from_filesystem_path(temp_dir_l0.path()).unwrap(); let mut option = DbOption::new(path, &TestSchema) - .level_path(0, path_l0, FsOptions::Local) + .level_path(0, path_l0, FsOptions::Local, false) .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; diff --git a/src/option.rs b/src/option.rs index 3b4bcab2..f6cc4d83 100644 --- a/src/option.rs +++ b/src/option.rs @@ -30,7 +30,7 @@ pub struct DbOption { pub(crate) clean_channel_buffer: usize, pub(crate) base_path: Path, pub(crate) base_fs: FsOptions, - pub(crate) level_paths: Vec>, + pub(crate) level_paths: Vec>, pub(crate) immutable_chunk_num: usize, pub(crate) immutable_chunk_max_num: usize, pub(crate) level_sst_magnification: usize, @@ -180,11 +180,12 @@ impl DbOption { level: usize, path: Path, fs_options: FsOptions, + cached: bool, ) -> Result { if level >= MAX_LEVEL { return Err(ExceedsMaxLevel); } - self.level_paths[level] = Some((path, fs_options)); + self.level_paths[level] = Some((path, fs_options, cached)); Ok(self) } @@ -213,7 +214,7 @@ impl DbOption { pub(crate) fn table_path(&self, gen: FileId, level: usize) -> Path { self.level_paths[level] .as_ref() - .map(|(path, _)| path) + .map(|(path, _, _)| path) .unwrap_or(&self.base_path) .child(format!("{}.{}", gen, FileType::Parquet)) } @@ -237,7 +238,14 @@ impl DbOption { } pub(crate) fn level_fs_path(&self, level: usize) -> Option<&Path> { - self.level_paths[level].as_ref().map(|(path, _)| path) + self.level_paths[level].as_ref().map(|(path, _, _)| path) + } + + pub(crate) fn level_fs_cached(&self, level: usize) -> bool { + self.level_paths[level] + .as_ref() + .map(|(_, _, cached)| *cached) + .unwrap_or(false) } pub(crate) fn is_threshold_exceeded_major( diff --git a/src/snapshot.rs b/src/snapshot.rs index cee43c04..004ac50a 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -145,7 +145,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema), false).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); diff --git a/src/stream/level.rs b/src/stream/level.rs index f603cd56..e3b1582f 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -253,7 +253,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema), false).await; { let mut level_stream_1 = LevelStream::new( diff --git a/src/transaction.rs b/src/transaction.rs index bf1c342e..bfc8f4cc 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -381,7 +381,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema), false).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -561,7 +561,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema), false).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -658,7 +658,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema), false).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); @@ -836,7 +836,7 @@ mod tests { .await .unwrap(); - let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema)).await; + let (_, version) = build_version(&option, &manager, &Arc::new(TestSchema), false).await; let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) .await .unwrap(); diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 9b1e9c63..5bf34d68 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -69,6 +69,16 @@ impl Cleaner { .map(|path| self.manager.get_fs(path)) .unwrap_or(self.manager.base_fs()); fs.remove(&self.option.table_path(gen, level)).await?; + + let cached_to_local = self.option.level_fs_cached(level); + if cached_to_local { + let fs = self + .option + .level_fs_path(level) + .map(|path| self.manager.get_cache(path)) + .unwrap_or(self.manager.base_fs()); + fs.remove(&self.option.table_path(gen, level)).await?; + } } } } diff --git a/src/version/mod.rs b/src/version/mod.rs index e67d9613..f3e80cf5 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -128,7 +128,12 @@ where .option .level_fs_path(0) .unwrap_or(&self.option.base_path); - let level_0_fs = manager.get_fs(level_0_path); + let level_0_cached = self.option.level_fs_cached(0); + let level_0_fs = if level_0_cached { + manager.get_cache(level_0_path) + } else { + manager.get_fs(level_0_path) + }; for scope in self.level_slice[0].iter().rev() { if !scope.contains(key.value()) { continue; @@ -153,7 +158,12 @@ where .option .level_fs_path(leve) .unwrap_or(&self.option.base_path); - let level_fs = manager.get_fs(level_path); + let level_cached = self.option.level_fs_cached(leve); + let level_fs = if level_cached { + manager.get_cache(level_path) + } else { + manager.get_fs(level_path) + }; if sort_runs.is_empty() { continue; } diff --git a/tests/wasm.rs b/tests/wasm.rs index c5049a34..cb885d0e 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -361,15 +361,17 @@ mod tests { 0, Path::from_url_path("tonbo/l0").unwrap(), fs_option.clone(), + true, ) .unwrap() .level_path( 1, Path::from_url_path("tonbo/l1").unwrap(), fs_option.clone(), + false, ) .unwrap() - .level_path(2, Path::from_url_path("tonbo/l2").unwrap(), fs_option) + .level_path(2, Path::from_url_path("tonbo/l2").unwrap(), fs_option, true) .unwrap() .major_threshold_with_sst_size(3) .level_sst_magnification(1) From 2b2b7d64836908c18e7f19db59ae8a0d34e8b8f6 Mon Sep 17 00:00:00 2001 From: meox3259 Date: Tue, 17 Jun 2025 23:20:27 +0900 Subject: [PATCH 2/6] add ut & fix cache folder --- src/compaction/leveled.rs | 103 +++++++++++++++++++++++++++++++++++++- src/compaction/mod.rs | 40 +++++++++------ src/option.rs | 5 ++ 3 files changed, 131 insertions(+), 17 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 9972c7c0..896e2ee9 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -199,7 +199,7 @@ where AsyncWriter::new( level_0_cache_fs .open_options( - &option.table_path(gen, 0), + &option.cached_table_path(gen), FileType::Parquet.open_options(true), ) .await?, @@ -685,6 +685,103 @@ pub(crate) mod tests { ); } + #[tokio::test(flavor = "multi_thread")] + async fn test_cached_read_path() { + use fusio::remotes::aws::AwsCredential; + + use crate::Projection; + + if option_env!("AWS_ACCESS_KEY_ID").is_none() + || option_env!("AWS_SECRET_ACCESS_KEY").is_none() + { + eprintln!("can not get `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`"); + return; + } + let key_id = std::option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string(); + let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY") + .unwrap() + .to_string(); + let token = std::option_env!("AWS_SESSION_TOKEN").map(|v| v.to_string()); + let bucket = std::env::var("BUCKET_NAME").expect("expected s3 bucket not to be empty"); + let region = Some(std::env::var("AWS_REGION").expect("expected s3 region not to be empty")); + + let fs_option = fusio_log::FsOptions::S3 { + bucket, + credential: Some(AwsCredential { + key_id, + secret_key, + token, + }), + endpoint: None, + sign_payload: None, + checksum: None, + region, + }; + + let temp_dir = TempDir::new().unwrap(); + let temp_dir_l0 = TempDir::new().unwrap(); + let temp_dir_l1 = TempDir::new().unwrap(); + let temp_dir_l2 = TempDir::new().unwrap(); + + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .level_path( + 0, + Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), + fs_option.clone(), + true, + ) + .unwrap() + .level_path( + 1, + Path::from_filesystem_path(temp_dir_l1.path()).unwrap(), + fs_option.clone(), + true, + ) + .unwrap() + .level_path( + 2, + Path::from_filesystem_path(temp_dir_l2.path()).unwrap(), + fs_option.clone(), + true, + ).unwrap(); + + option.immutable_chunk_num = 1; + option.immutable_chunk_max_num = 1; + option.major_threshold_with_sst_size = 3; + option.level_sst_magnification = 10; + option.max_sst_file_size = 2 * 1024 * 1024; + option.major_default_oldest_table_num = 1; + option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); + + let db: DB = DB::new(option, TokioExecutor::current(), TestSchema) + .await + .unwrap(); + + let mut items = Vec::new(); + for i in 0..10000 { + items.push(Test { + vstring: i.to_string(), + vu32: i, + vbool: Some(true), + }); + } + + for (_, item) in items.clone().into_iter().enumerate() { + db.write(item, 0.into()).await.unwrap(); + } + + for (i, item) in items.clone().into_iter().enumerate() { + let tx = db.transaction().await; + let key = item.key().to_string(); + let option1 = tx.get(&key, Projection::All).await.unwrap().unwrap(); + + assert_eq!(option1.get().vu32, Some(i as u32)); + } + } + #[tokio::test(flavor = "multi_thread")] async fn major_compaction_cached() { use fusio::remotes::aws::AwsCredential; @@ -699,7 +796,7 @@ pub(crate) mod tests { let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY") .unwrap() .to_string(); - let token = None; + let token = std::option_env!("AWS_SESSION_TOKEN").map(|v| v.to_string()); let bucket = std::env::var("BUCKET_NAME").expect("expected s3 bucket not to be empty"); let region = Some(std::env::var("AWS_REGION").expect("expected s3 region not to be empty")); @@ -979,6 +1076,7 @@ pub(crate) mod tests { &Arc::new(TestSchema), 0, level_0_fs, + false, ) .await .unwrap(); @@ -989,6 +1087,7 @@ pub(crate) mod tests { &Arc::new(TestSchema), 1, level_1_fs, + false, ) .await .unwrap(); diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index e6f91f86..7402ae93 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -194,19 +194,21 @@ where schema: &R::Schema, fs: &Arc, manifest_tx: &flume::Sender::Key>>, - modify_manifest: bool, + cached_to_local: bool, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); debug_assert!(max.is_some()); let gen = generate_file_id(); + let path = if cached_to_local { + &option.cached_table_path(gen) + } else { + &option.table_path(gen, level) + }; let mut writer = AsyncArrowWriter::try_new( AsyncWriter::new( - fs.open_options( - &option.table_path(gen, level), - FileType::Parquet.open_options(false), - ) - .await?, + fs.open_options(path, FileType::Parquet.open_options(false)) + .await?, ), schema.arrow_schema().clone(), Some(option.write_parquet_properties.clone()), @@ -214,7 +216,7 @@ where writer.write(columns.as_record_batch()).await?; writer.close().await?; - if modify_manifest { + if cached_to_local { manifest_tx .send_async(VersionEdit::Add { level: level as u8, @@ -307,18 +309,21 @@ pub(crate) mod tests { schema: &Arc, level: usize, fs: &Arc, + build_cache: bool, ) -> Result<(), DbError> where R: Record + Send, { let immutable = build_immutable::(option, records, schema, fs).await?; + let path = if build_cache { + &option.cached_table_path(gen) + } else { + &option.table_path(gen, level) + }; let mut writer = AsyncArrowWriter::try_new( AsyncWriter::new( - fs.open_options( - &option.table_path(gen, level), - FileType::Parquet.open_options(false), - ) - .await?, + fs.open_options(path, FileType::Parquet.open_options(false)) + .await?, ), schema.arrow_schema().clone(), None, @@ -333,10 +338,10 @@ pub(crate) mod tests { option: &Arc, manager: &StoreManager, schema: &Arc, - test_cached: bool, + test_cache: bool, ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { let level_0_cache = option.level_fs_cached(0); - let level_0_fs = if test_cached && level_0_cache { + let level_0_fs = if test_cache && level_0_cache { option .level_fs_path(0) .map(|path| manager.get_cache(path)) @@ -349,7 +354,7 @@ pub(crate) mod tests { }; let level_1_cache = option.level_fs_cached(1); - let level_1_fs = if test_cached && level_1_cache { + let level_1_fs = if test_cache && level_1_cache { option .level_fs_path(1) .map(|path| manager.get_cache(path)) @@ -399,6 +404,7 @@ pub(crate) mod tests { schema, 0, level_0_fs, + test_cache, ) .await .unwrap(); @@ -437,6 +443,7 @@ pub(crate) mod tests { schema, 0, level_0_fs, + test_cache, ) .await .unwrap(); @@ -480,6 +487,7 @@ pub(crate) mod tests { schema, 1, level_1_fs, + test_cache, ) .await .unwrap(); @@ -518,6 +526,7 @@ pub(crate) mod tests { schema, 1, level_1_fs, + test_cache, ) .await .unwrap(); @@ -556,6 +565,7 @@ pub(crate) mod tests { schema, 1, level_1_fs, + test_cache, ) .await .unwrap(); diff --git a/src/option.rs b/src/option.rs index f6cc4d83..2afe0c64 100644 --- a/src/option.rs +++ b/src/option.rs @@ -219,6 +219,11 @@ impl DbOption { .child(format!("{}.{}", gen, FileType::Parquet)) } + pub(crate) fn cached_table_path(&self, gen: FileId) -> Path { + self.base_path + .child(format!("{}.{}", gen, FileType::Parquet)) + } + pub(crate) fn wal_dir_path(&self) -> Path { self.base_path.child("wal") } From cf635dd67a880c37ac72d68533027abd770c5830 Mon Sep 17 00:00:00 2001 From: meox3259 Date: Wed, 25 Jun 2025 12:26:36 +0900 Subject: [PATCH 3/6] fix ci & fmt & fix read path --- src/compaction/leveled.rs | 12 +++++++++--- src/compaction/mod.rs | 9 ++++++++- src/lib.rs | 4 ++-- src/version/mod.rs | 11 +++++++---- src/wal/mod.rs | 2 +- 5 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 896e2ee9..39c61d5b 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -200,7 +200,7 @@ where level_0_cache_fs .open_options( &option.cached_table_path(gen), - FileType::Parquet.open_options(true), + FileType::Parquet.open_options(false), ) .await?, ), @@ -746,7 +746,8 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir_l2.path()).unwrap(), fs_option.clone(), true, - ).unwrap(); + ) + .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; @@ -761,7 +762,7 @@ pub(crate) mod tests { .unwrap(); let mut items = Vec::new(); - for i in 0..10000 { + for i in 0..100 { items.push(Test { vstring: i.to_string(), vu32: i, @@ -769,8 +770,13 @@ pub(crate) mod tests { }); } + let mut count = 0; for (_, item) in items.clone().into_iter().enumerate() { db.write(item, 0.into()).await.unwrap(); + count += 1; + if count % 5 == 0 { + db.flush().await.unwrap(); + } } for (i, item) in items.clone().into_iter().enumerate() { diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 7402ae93..b3b49e90 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -9,6 +9,7 @@ use log::error; use parquet::arrow::AsyncArrowWriter; use thiserror::Error; use tokio::sync::oneshot; +use ulid::Ulid; use crate::{ executor::Executor, @@ -82,6 +83,7 @@ where if builder.written_size() >= option.max_sst_file_size { let columns = Arc::new(builder.finish(None)); + let gen = generate_file_id(); if cached_to_local { let option = Arc::clone(option); let schema = schema.clone(); @@ -93,6 +95,7 @@ where executor.spawn(async move { if let Err(e) = Self::build_table( &option, + gen, level, &columns, &mut min, @@ -110,6 +113,7 @@ where } Self::build_table( option, + gen, level, &columns, &mut min, @@ -124,6 +128,7 @@ where } if builder.written_size() > 0 { let columns = Arc::new(builder.finish(None)); + let gen = generate_file_id(); if cached_to_local { let option = Arc::clone(option); let schema = schema.clone(); @@ -135,6 +140,7 @@ where executor.spawn(async move { if let Err(err) = Self::build_table( &option, + gen, level, &columns, &mut min, @@ -152,6 +158,7 @@ where } Self::build_table( option, + gen, level, &columns, &mut min, @@ -187,6 +194,7 @@ where #[allow(clippy::too_many_arguments)] async fn build_table( option: &DbOption, + gen: Ulid, level: usize, columns: &Arc<<::Schema as RecordSchema>::Columns>, min: &mut Option<::Key>, @@ -199,7 +207,6 @@ where debug_assert!(min.is_some()); debug_assert!(max.is_some()); - let gen = generate_file_id(); let path = if cached_to_local { &option.cached_table_path(gen) } else { diff --git a/src/lib.rs b/src/lib.rs index 01a3b139..5a095546 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1468,7 +1468,7 @@ pub(crate) mod tests { fn test_items() -> Vec { let mut items = Vec::new(); - for i in 0..32 { + for i in 0..100 { items.push(Test { vstring: i.to_string(), vu32: i, @@ -1576,7 +1576,7 @@ pub(crate) mod tests { .to_string(); let bucket = std::env::var("BUCKET_NAME").expect("expected s3 bucket not to be empty"); let region = Some(std::env::var("AWS_REGION").expect("expected s3 region not to be empty")); - let token = Some(std::option_env!("AWS_SESSION_TOKEN").unwrap().to_string()); + let token = std::option_env!("AWS_SESSION_TOKEN").map(|v| v.to_string()); let s3_option = FsOptions::S3 { bucket, diff --git a/src/version/mod.rs b/src/version/mod.rs index f3e80cf5..32340165 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -198,11 +198,14 @@ where projection_mask: ProjectionMask, parquet_lru: ParquetLru, ) -> Result>, VersionError> { + let cached = self.option.level_fs_cached(level); + let path = if cached { + self.option.cached_table_path(gen) + } else { + self.option.table_path(gen, level) + }; let file = store - .open_options( - &self.option.table_path(gen, level), - FileType::Parquet.open_options(true), - ) + .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; SsTable::::open(parquet_lru, gen, file) diff --git a/src/wal/mod.rs b/src/wal/mod.rs index fc88a179..8329708d 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -242,7 +242,7 @@ mod tests { let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY") .unwrap() .to_string(); - let token = Some(std::option_env!("AWS_SESSION_TOKEN").unwrap().to_string()); + let token = std::option_env!("AWS_SESSION_TOKEN").map(|v| v.to_string()); let bucket = std::env::var("BUCKET_NAME").expect("expected s3 bucket not to be empty"); let region = Some(std::env::var("AWS_REGION").expect("expected s3 region not to be empty")); From a9ed61cf2368a31b008aeadf645d12acc139e129 Mon Sep 17 00:00:00 2001 From: meox3259 Date: Wed, 25 Jun 2025 17:09:59 +0900 Subject: [PATCH 4/6] fix ci & fmt --- src/compaction/leveled.rs | 6 +-- src/compaction/mod.rs | 86 ++++++++++++++++++++------------------- src/stream/level.rs | 3 +- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index c0b76d15..b5d4e19e 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -302,7 +302,7 @@ where inner: level_scan_l, }); } - + let level_l_path = option.level_fs_path(level + 1).unwrap_or(&option.base_path); let level_l_fs = ctx.manager.get_fs(level_l_path); let level_l_cache = ctx.manager.get_cache(level_l_path); @@ -747,8 +747,8 @@ pub(crate) mod tests { .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), - fs_option.clone(), - true, + FsOptions::Local, + false, ) .unwrap() .level_path( diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index b3b49e90..ea4a8fe7 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -84,10 +84,10 @@ where if builder.written_size() >= option.max_sst_file_size { let columns = Arc::new(builder.finish(None)); let gen = generate_file_id(); - if cached_to_local { + { let option = Arc::clone(option); let schema = schema.clone(); - let cache = Arc::clone(cache); + let fs = Arc::clone(fs); let mut min = min.clone(); let mut max = max.clone(); let columns = Arc::clone(&columns); @@ -101,9 +101,9 @@ where &mut min, &mut max, &schema, - &cache, + &fs, &manifest_tx, - cached_to_local, + true, ) .await { @@ -111,34 +111,36 @@ where } }); } - Self::build_table( - option, - gen, - level, - &columns, - &mut min, - &mut max, - &schema, - &fs, - &manifest_tx, - !cached_to_local, - ) - .await?; + if cached_to_local { + Self::build_table( + &option, + gen, + level, + &columns, + &mut min, + &mut max, + &schema, + &cache, + &manifest_tx, + false, + ) + .await?; + } } } if builder.written_size() > 0 { let columns = Arc::new(builder.finish(None)); let gen = generate_file_id(); - if cached_to_local { + { let option = Arc::clone(option); let schema = schema.clone(); - let cache = Arc::clone(cache); + let fs = Arc::clone(fs); let mut min = min.clone(); let mut max = max.clone(); let columns = Arc::clone(&columns); let manifest_tx = manifest_tx.clone(); executor.spawn(async move { - if let Err(err) = Self::build_table( + if let Err(e) = Self::build_table( &option, gen, level, @@ -146,29 +148,31 @@ where &mut min, &mut max, &schema, - &cache, + &fs, &manifest_tx, - cached_to_local, + true, ) .await { - error!("Build Table Error: {}", err); + error!("Build Table Error: {}", e); } }); } - Self::build_table( - option, - gen, - level, - &columns, - &mut min, - &mut max, - schema, - &fs, - &manifest_tx, - !cached_to_local, - ) - .await?; + if cached_to_local { + Self::build_table( + &option, + gen, + level, + &columns, + &mut min, + &mut max, + &schema, + &cache, + &manifest_tx, + false, + ) + .await?; + } } drop(manifest_tx); while let Ok(version_edit) = manifest_cx.recv_async().await { @@ -202,15 +206,15 @@ where schema: &R::Schema, fs: &Arc, manifest_tx: &flume::Sender::Key>>, - cached_to_local: bool, + upload_to_s3: bool, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); debug_assert!(max.is_some()); - let path = if cached_to_local { - &option.cached_table_path(gen) - } else { + let path = if upload_to_s3 { &option.table_path(gen, level) + } else { + &option.cached_table_path(gen) }; let mut writer = AsyncArrowWriter::try_new( AsyncWriter::new( @@ -223,7 +227,7 @@ where writer.write(columns.as_record_batch()).await?; writer.close().await?; - if cached_to_local { + if upload_to_s3 { manifest_tx .send_async(VersionEdit::Add { level: level as u8, diff --git a/src/stream/level.rs b/src/stream/level.rs index e3b1582f..80b91ab6 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -119,7 +119,8 @@ where FutureStatus::Init(gen) => { let gen = *gen; self.path = Some(self.option.table_path(gen, self.level)); - + println!("path: {:?}", self.path); + println!("gen: {:?}", gen); let reader = self.fs.open_options( self.path.as_ref().unwrap(), FileType::Parquet.open_options(true), From 587997c24da8af81461d676e5e6ba8198b2bdb86 Mon Sep 17 00:00:00 2001 From: meox3259 Date: Thu, 26 Jun 2025 11:43:41 +0900 Subject: [PATCH 5/6] fix ci --- src/compaction/leveled.rs | 16 +++++--- src/compaction/mod.rs | 78 +++++++++++++++++++-------------------- src/lib.rs | 1 + src/stream/level.rs | 3 +- 4 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index b5d4e19e..2780d19d 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -194,6 +194,8 @@ where let level_0_cached = option.level_fs_cached(0); if level_0_cached { + let mut min = None; + let mut max = None; let level_0_cache_fs = manager.get_cache(level_0_path); let mut writer = AsyncArrowWriter::try_new( AsyncWriter::new( @@ -207,7 +209,7 @@ where schema.arrow_schema().clone(), Some(option.write_parquet_properties.clone()), )?; - for (file_id, batch) in batches { + for (_, batch) in batches { if let (Some(batch_min), Some(batch_max)) = batch.scope() { if matches!(min.as_ref().map(|min| min > batch_min), Some(true) | None) { min = Some(batch_min.clone()) @@ -217,9 +219,6 @@ where } } writer.write(batch.as_record_batch()).await?; - if let Some(file_id) = file_id { - wal_ids.push(*file_id); - } } writer.close().await?; } @@ -780,11 +779,16 @@ pub(crate) mod tests { let mut items = Vec::new(); for i in 0..100 { - items.push(Test { + let item = Test { vstring: i.to_string(), vu32: i, vbool: Some(true), - }); + }; + items.push(item.clone()); + db.write(item, 0.into()).await.unwrap(); + if i % 5 == 0 { + db.flush().await.unwrap(); + } } let mut count = 0; diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index ea4a8fe7..114ea2b8 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -63,6 +63,7 @@ where executor: &Arc, cached_to_local: bool, ) -> Result<(), CompactionError> { + println!("cached_to_local = {}", cached_to_local); let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? @@ -84,12 +85,12 @@ where if builder.written_size() >= option.max_sst_file_size { let columns = Arc::new(builder.finish(None)); let gen = generate_file_id(); - { + if cached_to_local { let option = Arc::clone(option); let schema = schema.clone(); - let fs = Arc::clone(fs); let mut min = min.clone(); let mut max = max.clone(); + let cache = Arc::clone(cache); let columns = Arc::clone(&columns); let manifest_tx = manifest_tx.clone(); executor.spawn(async move { @@ -101,9 +102,9 @@ where &mut min, &mut max, &schema, - &fs, + &cache, &manifest_tx, - true, + false, ) .await { @@ -111,32 +112,30 @@ where } }); } - if cached_to_local { - Self::build_table( - &option, - gen, - level, - &columns, - &mut min, - &mut max, - &schema, - &cache, - &manifest_tx, - false, - ) - .await?; - } + Self::build_table( + &option, + gen, + level, + &columns, + &mut min, + &mut max, + &schema, + &fs, + &manifest_tx, + true, + ) + .await?; } } if builder.written_size() > 0 { let columns = Arc::new(builder.finish(None)); let gen = generate_file_id(); - { + if cached_to_local { let option = Arc::clone(option); let schema = schema.clone(); - let fs = Arc::clone(fs); let mut min = min.clone(); let mut max = max.clone(); + let cache = Arc::clone(cache); let columns = Arc::clone(&columns); let manifest_tx = manifest_tx.clone(); executor.spawn(async move { @@ -148,9 +147,9 @@ where &mut min, &mut max, &schema, - &fs, + &cache, &manifest_tx, - true, + false, ) .await { @@ -158,24 +157,22 @@ where } }); } - if cached_to_local { - Self::build_table( - &option, - gen, - level, - &columns, - &mut min, - &mut max, - &schema, - &cache, - &manifest_tx, - false, - ) - .await?; - } + Self::build_table( + &option, + gen, + level, + &columns, + &mut min, + &mut max, + &schema, + &fs, + &manifest_tx, + true, + ) + .await?; } drop(manifest_tx); - while let Ok(version_edit) = manifest_cx.recv_async().await { + while let Ok(version_edit) = manifest_cx.recv() { version_edits.push(version_edit); } Ok(()) @@ -229,7 +226,7 @@ where if upload_to_s3 { manifest_tx - .send_async(VersionEdit::Add { + .send(VersionEdit::Add { level: level as u8, scope: Scope { min: min.take().ok_or(CompactionError::EmptyLevel)?, @@ -238,7 +235,6 @@ where wal_ids: None, }, }) - .await .map_err(|_| CompactionError::ChannelClose)?; } Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 5a095546..079830f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -380,6 +380,7 @@ where /// trigger compaction manually. This will flush the WAL and trigger compaction pub async fn flush(&self) -> Result<(), CommitError> { + println!("fffflush"); let (tx, rx) = oneshot::channel(); let compaction_tx = { self.schema.read().await.compaction_tx.clone() }; compaction_tx diff --git a/src/stream/level.rs b/src/stream/level.rs index 80b91ab6..e3b1582f 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -119,8 +119,7 @@ where FutureStatus::Init(gen) => { let gen = *gen; self.path = Some(self.option.table_path(gen, self.level)); - println!("path: {:?}", self.path); - println!("gen: {:?}", gen); + let reader = self.fs.open_options( self.path.as_ref().unwrap(), FileType::Parquet.open_options(true), From ae23dfe87ac9cd123e6c9eb21add60c64ac09efa Mon Sep 17 00:00:00 2001 From: meox3259 Date: Thu, 26 Jun 2025 17:23:27 +0900 Subject: [PATCH 6/6] fix chan --- src/compaction/mod.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 114ea2b8..f4c2aaad 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -63,7 +63,6 @@ where executor: &Arc, cached_to_local: bool, ) -> Result<(), CompactionError> { - println!("cached_to_local = {}", cached_to_local); let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? @@ -86,12 +85,12 @@ where let columns = Arc::new(builder.finish(None)); let gen = generate_file_id(); if cached_to_local { - let option = Arc::clone(option); + let option = option.clone(); let schema = schema.clone(); let mut min = min.clone(); let mut max = max.clone(); - let cache = Arc::clone(cache); - let columns = Arc::clone(&columns); + let cache = cache.clone(); + let columns = columns.clone(); let manifest_tx = manifest_tx.clone(); executor.spawn(async move { if let Err(e) = Self::build_table( @@ -105,6 +104,7 @@ where &cache, &manifest_tx, false, + cached_to_local, ) .await { @@ -123,6 +123,7 @@ where &fs, &manifest_tx, true, + !cached_to_local, ) .await?; } @@ -131,12 +132,12 @@ where let columns = Arc::new(builder.finish(None)); let gen = generate_file_id(); if cached_to_local { - let option = Arc::clone(option); + let option = option.clone(); let schema = schema.clone(); let mut min = min.clone(); let mut max = max.clone(); - let cache = Arc::clone(cache); - let columns = Arc::clone(&columns); + let cache = cache.clone(); + let columns = columns.clone(); let manifest_tx = manifest_tx.clone(); executor.spawn(async move { if let Err(e) = Self::build_table( @@ -150,6 +151,7 @@ where &cache, &manifest_tx, false, + cached_to_local, ) .await { @@ -168,6 +170,7 @@ where &fs, &manifest_tx, true, + !cached_to_local, ) .await?; } @@ -200,10 +203,11 @@ where columns: &Arc<<::Schema as RecordSchema>::Columns>, min: &mut Option<::Key>, max: &mut Option<::Key>, - schema: &R::Schema, + schema: &Arc, fs: &Arc, manifest_tx: &flume::Sender::Key>>, upload_to_s3: bool, + send_manifest: bool, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); debug_assert!(max.is_some()); @@ -224,7 +228,7 @@ where writer.write(columns.as_record_batch()).await?; writer.close().await?; - if upload_to_s3 { + if send_manifest { manifest_tx .send(VersionEdit::Add { level: level as u8,