From 579112b8c5a945f4902bf6a22189ba873755bd09 Mon Sep 17 00:00:00 2001 From: 123789456ye <123789456ye@gmail.com> Date: Thu, 14 Aug 2025 18:14:46 +0800 Subject: [PATCH 1/6] feat: add tiered compactor and some metrics test --- Cargo.toml | 1 + src/compaction/leveled.rs | 50 ++ src/compaction/mod.rs | 348 +++++++++++ src/compaction/tiered.rs | 1248 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 36 +- src/option.rs | 10 +- 6 files changed, 1681 insertions(+), 12 deletions(-) create mode 100644 src/compaction/tiered.rs diff --git a/Cargo.toml b/Cargo.toml index d01be270..fefc9e13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,6 +128,7 @@ log = "0.4.22" redb = { version = "2", optional = true } rocksdb = { version = "0.23", optional = true } sled = { version = "0.34", optional = true } +rand = "0.9.2" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.3.1", features = ["wasm_js"] } diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 1c8ac82f..279c2586 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -1264,3 +1264,53 @@ pub(crate) mod tests { } } } + +#[cfg(all(test, feature = "tokio"))] +pub(crate) mod tests_metric { + + use fusio::path::Path; + use tempfile::TempDir; + + use crate::{ + compaction::{ + leveled::LeveledOptions, + tests_metric::{read_write_amplification_measurement, throughput}, + }, + inmem::immutable::tests::TestSchema, + trigger::TriggerType, + DbOption, + }; + + #[tokio::test(flavor = "multi_thread")] + #[ignore] + async fn read_write_amplification_measurement_leveled() { + let temp_dir = TempDir::new().unwrap(); + let leveled_options = LeveledOptions { + major_threshold_with_sst_size: 3, + level_sst_magnification: 4, + ..Default::default() + }; + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .leveled_compaction(leveled_options) + .max_sst_file_size(1024); // Small file size to force multiple files + + read_write_amplification_measurement(option).await; + } + + #[tokio::test(flavor = "multi_thread")] + #[ignore] + async fn throughput_leveled() { + let temp_dir = TempDir::new().unwrap(); + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .leveled_compaction(LeveledOptions::default()); + option.trigger_type = TriggerType::SizeOfMem(1 * 1024 * 1024); + + throughput(option).await; + } +} \ No newline at end of file diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 28d9cfa5..a8e60805 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -1,5 +1,6 @@ pub mod error; pub mod leveled; +pub mod tiered; use std::sync::Arc; @@ -605,3 +606,350 @@ pub(crate) mod tests { ) } } + +#[cfg(all(test, feature = "tokio"))] +pub(crate) mod tests_metric { + + use crate::{ + executor::tokio::TokioExecutor, inmem::immutable::tests::TestSchema, tests::Test, + version::MAX_LEVEL, DbOption, DB, + }; + + pub fn convert_test_ref_to_test( + entry: crate::transaction::TransactionEntry<'_, Test>, + ) -> Option { + match &entry { + crate::transaction::TransactionEntry::Stream(stream_entry) => { + if stream_entry.value().is_some() { + let test_ref = entry.get(); + Some(Test { + vstring: test_ref.vstring.to_string(), + vu32: test_ref.vu32.unwrap_or(0), + vbool: test_ref.vbool, + }) + } else { + None + } + } + crate::transaction::TransactionEntry::Local(_) => { + let test_ref = entry.get(); + Some(Test { + vstring: test_ref.vstring.to_string(), + vu32: test_ref.vu32.unwrap_or(0), + vbool: test_ref.vbool, + }) + } + } + } + + pub(crate) async fn read_write_amplification_measurement(option: DbOption) { + let db: DB = + DB::new(option.clone(), TokioExecutor::default(), TestSchema) + .await + .unwrap(); + + // Track metrics for amplification calculation + let mut total_bytes_written_by_user = 0u64; + let mut compaction_rounds = 0; + + // Insert initial dataset with more substantial data + let initial_records = 1000; + let iter_num = 10; + for i in 0..initial_records * iter_num { + let record = Test { + vstring: format!("this_is_a_longer_key_to_make_files_bigger_{:05}", i), + vu32: i as u32, + vbool: Some(i % 2 == 0), + }; + + // More accurate user data size calculation + let string_bytes = record.vstring.as_bytes().len(); + let u32_bytes = 4; + let bool_bytes = 1; + let record_size = string_bytes + u32_bytes + bool_bytes; + total_bytes_written_by_user += record_size as u64; + + db.insert(record).await.unwrap(); + + if i % initial_records == 0 { + // Force flush and compaction + db.flush().await.unwrap(); + compaction_rounds += 1; + } + } + + // Verify data integrity after all compactions (check a sample of keys) + for i in 0..initial_records * iter_num { + let key = format!("this_is_a_longer_key_to_make_files_bigger_{:05}", i); + let result = db.get(&key, convert_test_ref_to_test).await.unwrap(); + if result.is_some() { + let record = result.unwrap(); + assert_eq!( + record.vu32, i as u32, + "Value should be preserved after compaction" + ); + } else { + panic!("Key {} should exist after compaction", key); + } + } + + // Get final version to measure total file sizes + let final_version = db.ctx.manifest.current().await; + let mut files_per_level = vec![0; MAX_LEVEL]; + + // Verify that total scope.file_size matches total actual file size on disk + let manager = + crate::fs::manager::StoreManager::new(option.base_fs.clone(), vec![]).unwrap(); + let fs = manager.base_fs(); + let mut total_actual_file_size = 0u64; + + for level in 0..MAX_LEVEL { + files_per_level[level] = final_version.level_slice[level].len(); + for scope in &final_version.level_slice[level] { + let file = fs + .open_options( + &option.table_path(scope.gen, level), + crate::fs::FileType::Parquet.open_options(true), + ) + .await + .unwrap(); + let actual_size = file.size().await.unwrap(); + total_actual_file_size += actual_size; + } + } + + // Calculate amplification metrics using actual file sizes + let write_amplification = + total_actual_file_size as f64 / total_bytes_written_by_user as f64; + + // Read amplification estimation (simplified) + // In a real scenario, this would require tracking actual read operations + let estimated_read_amplification = { + let mut read_amp = 0.0; + for level in 0..MAX_LEVEL { + if files_per_level[level] > 0 { + // Level 0 files can overlap, so worst case is reading all files + if level == 0 { + read_amp += files_per_level[level] as f64; + } else { + // For other levels, typically 1 file per level for a point lookup + read_amp += 1.0; + } + } + } + read_amp + }; + + println!("=== Amplification Metrics ==="); + println!("User data written: {} bytes", total_bytes_written_by_user); + println!("Total file size: {} bytes", total_actual_file_size); + println!("Write Amplification: {:.2}x", write_amplification); + println!( + "Estimated Read Amplification: {:.2}x", + estimated_read_amplification + ); + println!("Compaction rounds: {}", compaction_rounds); + + for level in 0..MAX_LEVEL { + if files_per_level[level] > 0 { + println!("Level {}: {} files", level, files_per_level[level]); + } + } + + // Assertions for reasonable amplification + // Write amplification can be less than 1.0 in some cases due to compression + // and the way Parquet stores data efficiently. The important thing is that + // we can measure it and it's non-zero. + assert!( + write_amplification > 0.0, + "Write amplification should be positive" + ); + assert!( + write_amplification < 10.0, + "Write amplification should be reasonable (< 10x)" + ); + assert!( + estimated_read_amplification >= 1.0, + "Read amplification should be at least 1.0" + ); + assert!( + total_actual_file_size > 0, + "Should have written some data to disk" + ); + } + + pub(crate) async fn throughput(option: DbOption) { + use std::time::Instant; + + use futures_util::StreamExt; + use rand::{seq::SliceRandom, SeedableRng}; + + // Create DB with EcoTune compactor using the standard open method + let db: DB = + DB::new(option.clone(), TokioExecutor::default(), TestSchema) + .await + .unwrap(); + + // Test parameters based on EcoTune paper (Section 5.1: 35% Get, 35% Seek, 30% long range + // scans) + let total_operations = 100000; + let insert_ratio = 0.3; // 30% inserts to build up data + let get_ratio = 0.35; // 35% Get operations (point queries) + let seek_ratio = 0.35; // 35% Seek operations + let long_range_ratio = 0.30; // 30% long range scans (paper workload) + + let insert_count = (total_operations as f64 * insert_ratio) as usize; + let query_count = total_operations - insert_count; + let get_count = (query_count as f64 + * (get_ratio / (get_ratio + seek_ratio + long_range_ratio))) + as usize; + let seek_count = (query_count as f64 + * (seek_ratio / (get_ratio + seek_ratio + long_range_ratio))) + as usize; + let long_range_count = query_count - get_count - seek_count; + + println!("EcoTune throughput test with paper proportions:"); + println!("- {} inserts ({:.1}%)", insert_count, insert_ratio * 100.0); + println!( + "- {} Get queries ({:.1}%)", + get_count, + (get_count as f64 / total_operations as f64) * 100.0 + ); + println!( + "- {} Seek queries ({:.1}%)", + seek_count, + (seek_count as f64 / total_operations as f64) * 100.0 + ); + println!( + "- {} long-range scans ({:.1}%)", + long_range_count, + (long_range_count as f64 / total_operations as f64) * 100.0 + ); + + // Create mixed workload operations vector + + let mut operations = Vec::new(); + + // Add insert operations + for i in 0..insert_count { + operations.push(("insert", i)); + } + + // Add get operations + for i in 0..get_count { + operations.push(("get", i)); + } + + // Add seek operations + for i in 0..seek_count { + operations.push(("seek", i)); + } + + // Add long-range scan operations + for i in 0..long_range_count { + operations.push(("long_range", i)); + } + + // Shuffle operations to create mixed workload + let mut rng = rand::rngs::StdRng::seed_from_u64(42); // Fixed seed for reproducibility + operations.shuffle(&mut rng); + + // Execute mixed workload + let mixed_start = Instant::now(); + let mut insert_ops = 0; + let mut successful_queries = 0; + + for (op_type, index) in operations { + match op_type { + "insert" => { + let record = Test { + vstring: format!("test_key_{:06}", index), + vu32: index as u32, + vbool: Some(index % 2 == 0), + }; + db.insert(record).await.unwrap(); + insert_ops += 1; + } + "get" => { + // Use modulo to ensure key exists (only query from inserted keys) + let key = format!("test_key_{:06}", index % insert_ops.max(1)); + let found = db + .get(&key, |entry| match entry { + crate::transaction::TransactionEntry::Stream(stream_entry) => { + Some(stream_entry.value().is_some()) + } + crate::transaction::TransactionEntry::Local(_) => Some(true), + }) + .await + .unwrap(); + if found.unwrap_or(false) { + successful_queries += 1; + } + } + "seek" => { + let key = format!("test_key_{:06}", index % insert_ops.max(1)); + let scan = db + .scan( + (std::ops::Bound::Included(&key), std::ops::Bound::Unbounded), + |entry| match entry { + crate::transaction::TransactionEntry::Stream(_) => true, + crate::transaction::TransactionEntry::Local(_) => true, + }, + ) + .await + .take(1); + let mut scan = std::pin::pin!(scan); + + if let Some(result) = scan.next().await { + if result.is_ok() { + successful_queries += 1; + } + } + } + "long_range" => { + let start_key = format!("test_key_{:06}", index % insert_ops.max(1)); + let scan = db + .scan( + ( + std::ops::Bound::Included(&start_key), + std::ops::Bound::Unbounded, + ), + |entry| match entry { + crate::transaction::TransactionEntry::Stream(_) => true, + crate::transaction::TransactionEntry::Local(_) => true, + }, + ) + .await + .take(100); + let mut scan = std::pin::pin!(scan); + + let mut count = 0; + while let Some(result) = scan.next().await { + if result.is_ok() { + count += 1; + if count >= 100 { + break; + } // Limit to K=100 + } + } + if count > 0 { + successful_queries += 1; + } + } + _ => unreachable!(), + } + } + + let mixed_duration = mixed_start.elapsed(); + let mixed_throughput = total_operations as f64 / mixed_duration.as_secs_f64(); + + // Calculate mixed workload results + println!("Mixed Workload Throughput Results:"); + println!("Overall throughput: {:.2} ops/sec", mixed_throughput); + println!( + "Total operations: {} (inserts: {}, successful queries: {})", + total_operations, insert_ops, successful_queries + ); + println!("Total time: {:.3}s", mixed_duration.as_secs_f64()); + } +} diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs new file mode 100644 index 00000000..f993284e --- /dev/null +++ b/src/compaction/tiered.rs @@ -0,0 +1,1248 @@ +use std::{future::Future, ops::Bound, sync::Arc}; + +use async_trait::async_trait; +use fusio::MaybeSend; +use parquet::arrow::ProjectionMask; +use ulid::Ulid; + +use super::{CompactionError, Compactor}; +use crate::{ + compaction::RecordSchema, + context::Context, + fs::{FileId, FileType}, + inmem::immutable::ImmutableMemTable, + ondisk::sstable::{SsTable, SsTableID}, + record::{self, Record}, + scope::Scope, + stream::{level::LevelStream, ScanStream}, + version::{edit::VersionEdit, TransactionTs, Version, MAX_LEVEL}, + CompactionExecutor, DbOption, +}; + +pub struct TieredTask { + pub input: Vec<(usize, Vec)>, + pub target_tier: usize, +} + +#[derive(Clone, Debug)] +pub struct TieredOptions { + /// Maximum number of tiers + pub max_tiers: usize, + /// Base capacity for tier 0 + pub tier_base_capacity: usize, + /// Growth factor between tiers + pub tier_growth_factor: usize, +} + +impl Default for TieredOptions { + fn default() -> Self { + Self { + max_tiers: 4, + tier_base_capacity: 4, + tier_growth_factor: 4, + } + } +} + +impl TieredOptions { + /// Set maximum number of tiers + pub fn max_tiers(mut self, value: usize) -> Self { + self.max_tiers = value; + self + } + + /// Set tier base capacity + pub fn tier_base_capacity(mut self, value: usize) -> Self { + self.tier_base_capacity = value; + self + } + + /// Set tier growth factor + pub fn tier_growth_factor(mut self, value: usize) -> Self { + self.tier_growth_factor = value; + self + } +} + +pub struct TieredCompactor { + options: TieredOptions, + db_option: Arc, + ctx: Arc>, + record_schema: Arc, +} + +impl TieredCompactor { + pub(crate) fn new( + options: TieredOptions, + record_schema: Arc, + db_option: Arc, + ctx: Arc>, + ) -> Self { + Self { + options, + db_option, + ctx, + record_schema, + } + } +} + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl Compactor for TieredCompactor +where + R: Record, + <::Schema as record::Schema>::Columns: Send + Sync, +{ + async fn check_then_compaction( + &self, + batches: Option< + &[( + Option, + ImmutableMemTable<::Columns>, + )], + >, + recover_wal_ids: Option>, + is_manual: bool, + ) -> Result<(), CompactionError> { + // Perform minor compaction if batches are provided + if let Some(batches) = batches { + if let Some(scope) = Self::minor_compaction( + &self.db_option, + recover_wal_ids, + batches, + &self.record_schema, + &self.ctx.manager, + ) + .await? + { + // Update manifest with new L0 SST + let version_ref = self.ctx.manifest.current().await; + let mut version_edits = vec![VersionEdit::Add { level: 0, scope }]; + version_edits.push(VersionEdit::LatestTimeStamp { + ts: version_ref.increase_ts(), + }); + + self.ctx + .manifest + .update(version_edits, None) + .await + .map_err(|e| CompactionError::Manifest(e))?; + } + } + + // Perform major compaction + self.major_compaction(is_manual).await?; + + Ok(()) + } +} + +impl CompactionExecutor for TieredCompactor +where + R: Record, + ::Columns: Send + Sync, +{ + fn check_then_compaction<'a>( + &'a self, + batches: Option< + &'a [( + Option, + ImmutableMemTable<::Columns>, + )], + >, + recover_wal_ids: Option>, + is_manual: bool, + ) -> impl Future>> + MaybeSend + 'a { + >::check_then_compaction(self, batches, recover_wal_ids, is_manual) + } +} + +impl TieredCompactor +where + R: Record, + <::Schema as RecordSchema>::Columns: Send + Sync, +{ + async fn major_compaction(&self, is_manual: bool) -> Result<(), CompactionError> { + while let Some(tier) = self.should_major_compact().await { + if let Some(task) = self.plan_major(tier).await { + self.execute_major(task).await?; + } else { + break; + } + } + + if is_manual { + self.ctx + .manifest + .rewrite() + .await + .map_err(CompactionError::Manifest)?; + } + + Ok(()) + } + + pub async fn should_major_compact(&self) -> Option { + let version_ref = self.ctx.manifest.current().await; + for tier in 0..MAX_LEVEL - 1 { + if Self::is_tier_full(&self.options, &version_ref, tier) { + return Some(tier); + } + } + None + } + + pub async fn plan_major(&self, tier: usize) -> Option { + let version_ref = self.ctx.manifest.current().await; + let tier_files: Vec = version_ref.level_slice[tier] + .iter() + .map(|scope| scope.gen) + .collect(); + if !tier_files.is_empty() { + return Some(TieredTask { + input: vec![(tier, tier_files)], + target_tier: tier + 1, + }); + } + + None + } + + pub async fn execute_major(&self, task: TieredTask) -> Result<(), CompactionError> { + let version_ref = self.ctx.manifest.current().await; + let mut version_edits = vec![]; + let mut delete_gens = vec![]; + for (source_tier, file_gens) in &task.input { + if file_gens.is_empty() { + continue; + } + + let tier_scopes: Vec<&Scope<_>> = version_ref.level_slice[*source_tier] + .iter() + .filter(|scope| file_gens.contains(&scope.gen)) + .collect(); + if tier_scopes.is_empty() { + continue; + } + let min = tier_scopes.iter().map(|scope| &scope.min).min().unwrap(); + let max = tier_scopes.iter().map(|scope| &scope.max).max().unwrap(); + Self::tier_compaction( + &version_ref, + &self.db_option, + min, + max, + &mut version_edits, + &mut delete_gens, + &self.record_schema, + &self.ctx, + *source_tier, + task.target_tier, + ) + .await?; + break; + } + + if !version_edits.is_empty() { + version_edits.push(VersionEdit::LatestTimeStamp { + ts: version_ref.increase_ts(), + }); + + self.ctx + .manifest + .update(version_edits, Some(delete_gens)) + .await?; + } + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn tier_compaction( + version: &Version, + option: &DbOption, + _min: &::Key, + _max: &::Key, + version_edits: &mut Vec::Key>>, + delete_gens: &mut Vec, + instance: &R::Schema, + ctx: &Context, + source_tier: usize, + target_tier: usize, + ) -> Result<(), CompactionError> { + let source_scopes: Vec<&Scope<_>> = version.level_slice[source_tier].iter().collect(); + + if source_scopes.is_empty() { + return Ok(()); + } + + let source_tier_path = option + .level_fs_path(source_tier) + .unwrap_or(&option.base_path); + let source_tier_fs = ctx.manager.get_fs(source_tier_path); + let target_tier_path = option + .level_fs_path(target_tier) + .unwrap_or(&option.base_path); + let target_tier_fs = ctx.manager.get_fs(target_tier_path); + + let mut streams = Vec::with_capacity(source_scopes.len()); + + if source_tier == 0 { + for scope in source_scopes.iter() { + let file = source_tier_fs + .open_options( + &option.table_path(scope.gen, source_tier), + FileType::Parquet.open_options(true), + ) + .await?; + + streams.push(ScanStream::SsTable { + inner: SsTable::open(ctx.parquet_lru.clone(), scope.gen, file) + .await? + .scan( + (Bound::Unbounded, Bound::Unbounded), + u32::MAX.into(), + None, + ProjectionMask::all(), + None, + instance.primary_key_indices(), + ) + .await?, + }); + } + } else { + let (lower, upper) = as Compactor>::full_scope(&source_scopes)?; + let tier_scan = LevelStream::new( + version, + source_tier, + 0, + source_scopes.len() - 1, + (Bound::Included(lower), Bound::Included(upper)), + u32::MAX.into(), + None, + ProjectionMask::all(), + source_tier_fs.clone(), + ctx.parquet_lru.clone(), + None, + instance.primary_key_indices(), + ) + .ok_or(CompactionError::EmptyLevel)?; + + streams.push(ScanStream::Level { inner: tier_scan }); + } + + as Compactor>::build_tables( + option, + version_edits, + target_tier, + streams, + instance, + target_tier_fs, + ) + .await?; + + // Mark all source tier files for deletion + for scope in source_scopes { + version_edits.push(VersionEdit::Remove { + level: source_tier as u8, + gen: scope.gen, + }); + delete_gens.push(SsTableID::new(scope.gen, source_tier)); + } + + Ok(()) + } + + pub(crate) fn is_tier_full(options: &TieredOptions, version: &Version, tier: usize) -> bool { + let max_tiers = options.max_tiers; + if tier >= max_tiers || tier >= MAX_LEVEL { + return false; + } + + let tier_capacity = Self::tier_capacity(options, tier); + version.level_slice[tier].len() > tier_capacity + } + + fn tier_capacity(options: &TieredOptions, tier: usize) -> usize { + // Base capacity for tier 0 + // let base_capacity = option.tier_base_capacity.unwrap_or(4); + // let growth_factor = option.tier_growth_factor.unwrap_or(4); + + options.tier_base_capacity * options.tier_growth_factor.pow(tier as u32) + } +} + +#[cfg(all(test, feature = "tokio"))] +pub(crate) mod tests { + use std::sync::{atomic::AtomicU32, Arc}; + + use flume::bounded; + use fusio::{path::Path, DynFs}; + use fusio_dispatch::FsOptions; + use parquet_lru::NoCache; + use tempfile::TempDir; + + use crate::{ + compaction::{ + tests::build_parquet_table, + tiered::{TieredCompactor, TieredOptions}, + Compactor, + }, + context::Context, + executor::tokio::TokioExecutor, + fs::{generate_file_id, manager::StoreManager}, + inmem::{ + immutable::{tests::TestSchema, ImmutableMemTable}, + mutable::MutableMemTable, + }, + record::{Record, Schema}, + scope::Scope, + tests::Test, + trigger::{TriggerFactory, TriggerType}, + version::{ + cleaner::Cleaner, edit::VersionEdit, set::VersionSet, timestamp::Timestamp, Version, + MAX_LEVEL, + }, + wal::log::LogType, + DbError, DbOption, DB, + }; + + async fn build_immutable( + option: &DbOption, + records: Vec<(LogType, R, Timestamp)>, + schema: &Arc, + fs: &Arc, + ) -> Result::Columns>, DbError> + where + R: Record + Send, + { + let trigger = TriggerFactory::create(option.trigger_type); + + let mutable = MutableMemTable::new(option, trigger, fs.clone(), schema.clone()).await?; + + for (log_ty, record, ts) in records { + let _ = mutable.insert(log_ty, record, ts).await?; + } + Ok(mutable.into_immutable().await?.1) + } + + #[tokio::test(flavor = "multi_thread")] + async fn minor_compaction() { + let temp_dir = tempfile::tempdir().unwrap(); + let temp_dir_l0 = tempfile::tempdir().unwrap(); + + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .level_path( + 0, + Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), + FsOptions::Local, + ) + .unwrap(); + let manager = + StoreManager::new(option.base_fs.clone(), option.level_paths.clone()).unwrap(); + manager + .base_fs() + .create_dir_all(&option.wal_dir_path()) + .await + .unwrap(); + + let batch_1 = build_immutable::( + &option, + vec![ + ( + LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ], + &Arc::new(TestSchema), + manager.base_fs(), + ) + .await + .unwrap(); + + let batch_2 = build_immutable::( + &option, + vec![ + ( + LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ], + &Arc::new(TestSchema), + manager.base_fs(), + ) + .await + .unwrap(); + + // Minor compaction is the same for both leveled and tiered + let scope = TieredCompactor::::minor_compaction( + &option, + None, + &vec![ + (Some(generate_file_id()), batch_1), + (Some(generate_file_id()), batch_2), + ], + &TestSchema, + &manager, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(scope.min, 1.to_string()); + assert_eq!(scope.max, 6.to_string()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn tier_compaction() { + let temp_dir = TempDir::new().unwrap(); + let temp_dir_t0 = TempDir::new().unwrap(); + let temp_dir_t1 = TempDir::new().unwrap(); + + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .level_path( + 0, + Path::from_filesystem_path(temp_dir_t0.path()).unwrap(), + FsOptions::Local, + ) + .unwrap() + .level_path( + 1, + Path::from_filesystem_path(temp_dir_t1.path()).unwrap(), + FsOptions::Local, + ) + .unwrap(); + + let option = Arc::new(option); + let manager = Arc::new( + StoreManager::new(option.base_fs.clone(), option.level_paths.clone()).unwrap(), + ); + + manager + .base_fs() + .create_dir_all(&option.version_log_dir_path()) + .await + .unwrap(); + manager + .base_fs() + .create_dir_all(&option.wal_dir_path()) + .await + .unwrap(); + + // Create test data for tier 0 (4 files) + let tier_0_fs = manager.get_fs(&option.level_fs_path(0).unwrap_or(&option.base_path)); + + let table_gen_1 = generate_file_id(); + let table_gen_2 = generate_file_id(); + let table_gen_3 = generate_file_id(); + let table_gen_4 = generate_file_id(); + + // Build parquet files for tier 0 + build_parquet_table::( + &option, + table_gen_1, + vec![ + ( + LogType::Full, + Test { + vstring: "1".to_string(), + vu32: 1, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: "2".to_string(), + vu32: 2, + vbool: Some(true), + }, + 0.into(), + ), + ], + &Arc::new(TestSchema), + 0, + &tier_0_fs, + ) + .await + .unwrap(); + + build_parquet_table::( + &option, + table_gen_2, + vec![ + ( + LogType::Full, + Test { + vstring: "3".to_string(), + vu32: 3, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: "4".to_string(), + vu32: 4, + vbool: Some(true), + }, + 0.into(), + ), + ], + &Arc::new(TestSchema), + 0, + &tier_0_fs, + ) + .await + .unwrap(); + + build_parquet_table::( + &option, + table_gen_3, + vec![ + ( + LogType::Full, + Test { + vstring: "5".to_string(), + vu32: 5, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: "6".to_string(), + vu32: 6, + vbool: Some(true), + }, + 0.into(), + ), + ], + &Arc::new(TestSchema), + 0, + &tier_0_fs, + ) + .await + .unwrap(); + + build_parquet_table::( + &option, + table_gen_4, + vec![ + ( + LogType::Full, + Test { + vstring: "7".to_string(), + vu32: 7, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: "8".to_string(), + vu32: 8, + vbool: Some(true), + }, + 0.into(), + ), + ], + &Arc::new(TestSchema), + 0, + &tier_0_fs, + ) + .await + .unwrap(); + + // Create version with tier 0 at capacity (4 files) + let (sender, _) = bounded(1); + let mut version = + Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + + // Add all 4 files to tier 0 + version.level_slice[0].push(Scope { + min: "1".to_string(), + max: "2".to_string(), + gen: table_gen_1, + wal_ids: None, + file_size: 100, + }); + version.level_slice[0].push(Scope { + min: "3".to_string(), + max: "4".to_string(), + gen: table_gen_2, + wal_ids: None, + file_size: 100, + }); + version.level_slice[0].push(Scope { + min: "5".to_string(), + max: "6".to_string(), + gen: table_gen_3, + wal_ids: None, + file_size: 100, + }); + version.level_slice[0].push(Scope { + min: "7".to_string(), + max: "8".to_string(), + gen: table_gen_4, + wal_ids: None, + file_size: 100, + }); + + // Test tier compaction + let min = "1".to_string(); + let max = "8".to_string(); + let mut version_edits = Vec::new(); + let mut delete_gens = Vec::new(); + + let (_, clean_sender) = Cleaner::new(option.clone(), manager.clone()); + let manifest: VersionSet = + VersionSet::new(clean_sender, option.clone(), manager.clone()) + .await + .unwrap(); + let ctx = Context::new( + manager.clone(), + Arc::new(NoCache::default()), + Box::new(manifest), + TestSchema.arrow_schema().clone(), + ); + + TieredCompactor::::tier_compaction( + &version, + &option, + &min, + &max, + &mut version_edits, + &mut delete_gens, + &TestSchema, + &ctx, + 0, // source tier + 1, // target tier + ) + .await + .unwrap(); + + // Verify that new files are added to tier 1 + let add_edits: Vec<_> = version_edits + .iter() + .filter_map(|edit| match edit { + VersionEdit::Add { level, scope } => Some((*level, scope)), + _ => None, + }) + .collect(); + + assert!(!add_edits.is_empty(), "Should have added files to tier 1"); + for (level, scope) in add_edits { + assert_eq!(level, 1, "Files should be added to tier 1"); + assert!(scope.min <= scope.max, "Scope should be valid"); + } + + // Verify that all tier 0 files are marked for removal + let remove_edits: Vec<_> = version_edits + .iter() + .filter_map(|edit| match edit { + VersionEdit::Remove { level, gen } => Some((*level, *gen)), + _ => None, + }) + .collect(); + + assert_eq!(remove_edits.len(), 4, "Should remove all 4 tier 0 files"); + for (level, gen) in remove_edits { + assert_eq!(level, 0, "All removed files should be from tier 0"); + assert!( + [table_gen_1, table_gen_2, table_gen_3, table_gen_4].contains(&gen), + "Removed file should be one of the original tier 0 files" + ); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tier_capacity_and_compaction_planning() { + let temp_dir = TempDir::new().unwrap(); + let option = Arc::new(DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); + + let (sender, _) = bounded(1); + let mut version = + Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + + let options = TieredOptions { + tier_base_capacity: 2, + tier_growth_factor: 2, + max_tiers: 3, + ..Default::default() + }; + + // Test 1: Initially no tiers are full + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 0 + )); + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 1 + )); + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 2 + )); + + // Test 2: Add files to reach capacity (but not exceed) + version.level_slice[0].push(Scope { + min: "0".to_string(), + max: "1".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + version.level_slice[0].push(Scope { + min: "2".to_string(), + max: "3".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + + // Tier 0 should not be full yet (at capacity but not exceeding) + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 0 + )); + + // Test 3: Exceed capacity to trigger compaction planning + version.level_slice[0].push(Scope { + min: "4".to_string(), + max: "5".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + + // Now tier 0 should be full (exceeding capacity of 2) + assert!(TieredCompactor::::is_tier_full(&options, &version, 0)); + + // Test 4: Compaction planning logic + for tier in 0..MAX_LEVEL - 1 { + if TieredCompactor::::is_tier_full(&options, &version, tier) { + assert_eq!(tier, 0); // Only tier 0 should be full + + let tier_files: Vec<_> = version.level_slice[tier] + .iter() + .map(|scope| scope.gen) + .collect(); + assert_eq!(tier_files.len(), 3); // Should have 3 files (exceeding capacity of 2) + break; + } + } + + // Test 5: Multi-tier capacity behavior + // Add 4 files to tier 1 (capacity = 4) + for i in 0..4 { + version.level_slice[1].push(Scope { + min: format!("{}", i * 2 + 10), + max: format!("{}", i * 2 + 11), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + } + + // Tier 1 at capacity but not full + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 1 + )); + + // Exceed tier 1 capacity + version.level_slice[1].push(Scope { + min: "100".to_string(), + max: "101".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + + // Now both tiers should be full + assert!(TieredCompactor::::is_tier_full(&options, &version, 0)); + assert!(TieredCompactor::::is_tier_full(&options, &version, 1)); + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 2 + )); + + // Test 6: Boundary conditions + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 3 + )); + assert!(!TieredCompactor::::is_tier_full( + &options, &version, 4 + )); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tiered_edge_cases() { + let temp_dir = TempDir::new().unwrap(); + let option = Arc::new(DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + )); + + let (sender, _) = bounded(1); + let mut version = + Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + + // Test with max_tiers = 1 (only tier 0) + let options = TieredOptions { + tier_base_capacity: 2, + tier_growth_factor: 2, + max_tiers: 1, + ..Default::default() + }; + + // Add files to tier 0 to exceed capacity + version.level_slice[0].push(Scope { + min: "1".to_string(), + max: "2".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + version.level_slice[0].push(Scope { + min: "3".to_string(), + max: "4".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + version.level_slice[0].push(Scope { + min: "5".to_string(), + max: "6".to_string(), + gen: generate_file_id(), + wal_ids: None, + file_size: 100, + }); + + // With max_tiers = 1, tier 0 is still considered full when exceeding capacity + // but compaction planning should handle the case where there's no target tier + assert!(TieredCompactor::::is_tier_full(&options, &version, 0)); + + // Test with zero capacity + let zero_options = TieredOptions { + tier_base_capacity: 0, + tier_growth_factor: 2, + max_tiers: 4, + ..Default::default() + }; + + assert_eq!(TieredCompactor::::tier_capacity(&zero_options, 0), 0); + assert_eq!(TieredCompactor::::tier_capacity(&zero_options, 1), 0); + + // Test beyond MAX_LEVEL + let (sender2, _) = bounded(1); + let version_empty = + Version::::new(option.clone(), sender2, Arc::new(AtomicU32::default())); + let normal_options = TieredOptions::default(); + assert!(!TieredCompactor::::is_tier_full( + &normal_options, + &version_empty, + MAX_LEVEL + )); + assert!(!TieredCompactor::::is_tier_full( + &normal_options, + &version_empty, + MAX_LEVEL + 1 + )); + + // Test planning with max_tiers = 1 - this should create a task with target_tier = 1 + // even though max_tiers = 1, which might be a design issue + for tier in 0..MAX_LEVEL - 1 { + if TieredCompactor::::is_tier_full(&options, &version, tier) { + assert_eq!(tier, 0); + let tier_files: Vec<_> = version.level_slice[tier] + .iter() + .map(|scope| scope.gen) + .collect(); + assert_eq!(tier_files.len(), 3); + + // The planned target_tier would be 1, which exceeds max_tiers = 1 + let planned_target = tier + 1; + assert_eq!(planned_target, 1); + assert!(planned_target >= options.max_tiers); // This should be caught somewhere + break; + } + } + } + + // Test tiered compaction timing - when tier 0 reaches capacity, it should trigger compaction + #[tokio::test(flavor = "multi_thread")] + async fn test_tiered_compaction_timing() { + // Test different capacity configurations and their compaction behavior + + // Test case 1: Capacity 2 - should trigger compaction + { + let temp_dir = TempDir::new().unwrap(); + let tiered_options = TieredOptions { + tier_base_capacity: 2, + tier_growth_factor: 3, + max_tiers: 4, + }; + + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .tiered_compaction(tiered_options) + .immutable_chunk_num(1) + .immutable_chunk_max_num(1); + option.trigger_type = TriggerType::Length(5); + + let db: DB = DB::new(option, TokioExecutor::default(), TestSchema) + .await + .unwrap(); + + // Add files up to capacity + for batch in 0..2 { + for i in (batch * 5)..(batch + 1) * 5 { + db.insert(Test { + vstring: i.to_string(), + vu32: i, + vbool: Some(true), + }) + .await + .unwrap(); + } + db.flush().await.unwrap(); + } + + let version = db.ctx.manifest.current().await; + assert_eq!(version.level_slice[0].len(), 2); // At capacity + assert_eq!(version.level_slice[1].len(), 0); + + // Exceed capacity + for i in 10..15 { + db.insert(Test { + vstring: i.to_string(), + vu32: i, + vbool: Some(true), + }) + .await + .unwrap(); + } + db.flush().await.unwrap(); + + let version = db.ctx.manifest.current().await; + let total_files = version.level_slice[0].len() + version.level_slice[1].len(); + assert!(total_files >= 1); + // Should have triggered compaction to tier 1 + if version.level_slice[1].len() > 0 { + for scope in &version.level_slice[1] { + assert!(scope.min <= scope.max); + } + } + } + + // Test case 2: Capacity 4 - should NOT trigger compaction + { + let temp_dir = TempDir::new().unwrap(); + let tiered_options = TieredOptions { + tier_base_capacity: 4, // Higher capacity + tier_growth_factor: 2, + max_tiers: 4, + }; + + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .tiered_compaction(tiered_options) + .immutable_chunk_num(1) + .immutable_chunk_max_num(1); + option.trigger_type = TriggerType::Length(5); + + let db: DB = DB::new(option, TokioExecutor::default(), TestSchema) + .await + .unwrap(); + + // Add 3 files (under capacity of 4) + for batch in 0..3 { + for i in (batch * 5)..(batch + 1) * 5 { + db.insert(Test { + vstring: i.to_string(), + vu32: i, + vbool: Some(true), + }) + .await + .unwrap(); + } + db.flush().await.unwrap(); + } + + let version = db.ctx.manifest.current().await; + assert_eq!(version.level_slice[0].len(), 3); // Under capacity + assert_eq!(version.level_slice[1].len(), 0); // No compaction + } + + // Test case 3: Capacity 3 - precise threshold behavior + { + let temp_dir = TempDir::new().unwrap(); + let tiered_options = TieredOptions { + tier_base_capacity: 3, + tier_growth_factor: 2, + max_tiers: 3, + }; + + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .tiered_compaction(tiered_options) + .immutable_chunk_num(1) + .immutable_chunk_max_num(1); + option.trigger_type = TriggerType::Length(10); + + let db: DB = DB::new(option, TokioExecutor::default(), TestSchema) + .await + .unwrap(); + + // Add exactly at capacity + for batch in 0..3 { + for i in (batch * 10)..(batch + 1) * 10 { + db.insert(Test { + vstring: format!("{:03}", i), + vu32: i, + vbool: Some(i % 2 == 0), + }) + .await + .unwrap(); + } + db.flush().await.unwrap(); + } + + let version_before = db.ctx.manifest.current().await; + assert_eq!(version_before.level_slice[0].len(), 3); // At capacity + assert_eq!(version_before.level_slice[1].len(), 0); + + // Exceed capacity + for i in 30..40 { + db.insert(Test { + vstring: format!("{:03}", i), + vu32: i, + vbool: Some(i % 2 == 0), + }) + .await + .unwrap(); + } + db.flush().await.unwrap(); + + let version_after = db.ctx.manifest.current().await; + let tier0_files = version_after.level_slice[0].len(); + let tier1_files = version_after.level_slice[1].len(); + + // Should trigger compaction + assert!(tier0_files < 3 || tier1_files > 0); + let total_files = tier0_files + tier1_files; + assert!(total_files >= 1 && total_files <= 4); + } + } +} + +#[cfg(all(test, feature = "tokio"))] +pub(crate) mod tests_metric { + + use fusio::path::Path; + use tempfile::TempDir; + + use crate::{ + compaction::{ + tests_metric::{read_write_amplification_measurement, throughput}, + tiered::TieredOptions, + }, + inmem::immutable::tests::TestSchema, + trigger::TriggerType, + DbOption, + }; + + #[tokio::test(flavor = "multi_thread")] + #[ignore] + async fn read_write_amplification_measurement_tiered() { + let temp_dir = TempDir::new().unwrap(); + let tiered_options = TieredOptions { + tier_base_capacity: 3, + tier_growth_factor: 4, + ..Default::default() + }; + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .tiered_compaction(tiered_options) + .max_sst_file_size(1024); // Small file size to force multiple files + + read_write_amplification_measurement(option).await; + } + + #[tokio::test(flavor = "multi_thread")] + #[ignore] + async fn throughput_tiered() { + let temp_dir = TempDir::new().unwrap(); + let mut option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &TestSchema, + ) + .tiered_compaction(TieredOptions::default()); + option.trigger_type = TriggerType::SizeOfMem(1 * 1024 * 1024); + + throughput(option).await; + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index bcb8e38d..3235e7b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,7 +180,10 @@ pub use crate::record::{ArrowArrays, ArrowArraysBuilder}; #[doc(hidden)] pub use crate::version::timestamp::Ts; use crate::{ - compaction::{error::CompactionError, leveled::LeveledCompactor, CompactTask, Compactor}, + compaction::{ + error::CompactionError, leveled::LeveledCompactor, tiered::TieredCompactor, CompactTask, + Compactor, + }, executor::{Executor, RwLock as ExecutorRwLock}, fs::{manager::StoreManager, parse_file_id, FileType}, inmem::flush::minor_flush, @@ -302,6 +305,15 @@ where ); Self::finish_build(executor, mem_storage, ctx, compactor, cleaner, task_rx).await } + CompactionOption::Tiered(opt) => { + let compactor = TieredCompactor::::new( + opt.clone(), + record_schema.clone(), + option.clone(), + ctx.clone(), + ); + Self::finish_build(executor, mem_storage, ctx, compactor, cleaner, task_rx).await + } } } @@ -1398,6 +1410,7 @@ pub(crate) mod tests { use crate::{ compaction::{ leveled::{LeveledCompactor, LeveledOptions}, + tiered::TieredCompactor, CompactTask, }, context::Context, @@ -1537,7 +1550,7 @@ pub(crate) mod tests { pub(crate) async fn build_db( option: Arc, - compaction_rx: Receiver, + task_rx: Receiver, executor: E, mem_storage: crate::DbStorage, record_schema: Arc, @@ -1579,15 +1592,16 @@ pub(crate) mod tests { option.clone(), ctx.clone(), ); - DB::finish_build( - executor, - mem_storage, - ctx, - compactor, - cleaner, - compaction_rx, - ) - .await + DB::finish_build(executor, mem_storage, ctx, compactor, cleaner, task_rx).await + } + CompactionOption::Tiered(opt) => { + let compactor = TieredCompactor::::new( + opt.clone(), + record_schema.clone(), + option.clone(), + ctx.clone(), + ); + DB::finish_build(executor, mem_storage, ctx, compactor, cleaner, task_rx).await } } } diff --git a/src/option.rs b/src/option.rs index 207fb399..73663010 100644 --- a/src/option.rs +++ b/src/option.rs @@ -11,7 +11,7 @@ use parquet::{ use thiserror::Error; use crate::{ - compaction::leveled::LeveledOptions, + compaction::{leveled::LeveledOptions, tiered::TieredOptions}, fs::{FileId, FileType}, record::Schema, trigger::TriggerType, @@ -32,12 +32,14 @@ pub enum Order { pub enum CompactionOption { Leveled(LeveledOptions), + Tiered(TieredOptions), } impl std::fmt::Debug for CompactionOption { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { CompactionOption::Leveled(opts) => f.debug_tuple("Leveled").field(opts).finish(), + CompactionOption::Tiered(opts) => f.debug_tuple("Tiered").field(opts).finish(), } } } @@ -46,6 +48,7 @@ impl Clone for CompactionOption { fn clone(&self) -> Self { match self { CompactionOption::Leveled(opts) => CompactionOption::Leveled(opts.clone()), + CompactionOption::Tiered(opts) => CompactionOption::Tiered(opts.clone()), } } } @@ -144,6 +147,11 @@ impl DbOption { self } + pub fn tiered_compaction(mut self, options: TieredOptions) -> Self { + self.compaction_option = CompactionOption::Tiered(options); + self + } + /// Set maximum SST file size pub fn max_sst_file_size(mut self, value: usize) -> Self { self.max_sst_file_size = value; From 87f9869ecc7f69db879ea10e63eadf997528c6f2 Mon Sep 17 00:00:00 2001 From: 123789456ye <123789456ye@gmail.com> Date: Thu, 14 Aug 2025 18:18:27 +0800 Subject: [PATCH 2/6] chore: fmt --- src/compaction/leveled.rs | 2 +- src/compaction/tiered.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 279c2586..6b34015f 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -1313,4 +1313,4 @@ pub(crate) mod tests_metric { throughput(option).await; } -} \ No newline at end of file +} diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index f993284e..a1d5427a 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -1245,4 +1245,4 @@ pub(crate) mod tests_metric { throughput(option).await; } -} \ No newline at end of file +} From b2bd5ec11a08a0b4fae50da33694adb0c8a5eff1 Mon Sep 17 00:00:00 2001 From: 123789456ye <123789456ye@gmail.com> Date: Fri, 15 Aug 2025 13:21:28 +0800 Subject: [PATCH 3/6] chore: replace rand with fastrand --- Cargo.toml | 1 - src/compaction/mod.rs | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fefc9e13..d01be270 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,7 +128,6 @@ log = "0.4.22" redb = { version = "2", optional = true } rocksdb = { version = "0.23", optional = true } sled = { version = "0.34", optional = true } -rand = "0.9.2" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.3.1", features = ["wasm_js"] } diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index a8e60805..f67fbead 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -781,8 +781,8 @@ pub(crate) mod tests_metric { pub(crate) async fn throughput(option: DbOption) { use std::time::Instant; + use fastrand::{seed, shuffle}; use futures_util::StreamExt; - use rand::{seq::SliceRandom, SeedableRng}; // Create DB with EcoTune compactor using the standard open method let db: DB = @@ -851,8 +851,8 @@ pub(crate) mod tests_metric { } // Shuffle operations to create mixed workload - let mut rng = rand::rngs::StdRng::seed_from_u64(42); // Fixed seed for reproducibility - operations.shuffle(&mut rng); + seed(42); // Fixed seed for reproducibility + shuffle(&mut operations); // Execute mixed workload let mixed_start = Instant::now(); From 1309eae4e2976928957f1302b11b4fbeaa107d11 Mon Sep 17 00:00:00 2001 From: 123789456ye <123789456ye@gmail.com> Date: Fri, 15 Aug 2025 13:34:18 +0800 Subject: [PATCH 4/6] chore: change visibility --- src/compaction/tiered.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index a1d5427a..3b1be1da 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -19,19 +19,19 @@ use crate::{ CompactionExecutor, DbOption, }; -pub struct TieredTask { - pub input: Vec<(usize, Vec)>, - pub target_tier: usize, +struct TieredTask { + input: Vec<(usize, Vec)>, + target_tier: usize, } #[derive(Clone, Debug)] pub struct TieredOptions { /// Maximum number of tiers - pub max_tiers: usize, + max_tiers: usize, /// Base capacity for tier 0 - pub tier_base_capacity: usize, + tier_base_capacity: usize, /// Growth factor between tiers - pub tier_growth_factor: usize, + tier_growth_factor: usize, } impl Default for TieredOptions { @@ -183,7 +183,7 @@ where Ok(()) } - pub async fn should_major_compact(&self) -> Option { + async fn should_major_compact(&self) -> Option { let version_ref = self.ctx.manifest.current().await; for tier in 0..MAX_LEVEL - 1 { if Self::is_tier_full(&self.options, &version_ref, tier) { @@ -193,7 +193,7 @@ where None } - pub async fn plan_major(&self, tier: usize) -> Option { + async fn plan_major(&self, tier: usize) -> Option { let version_ref = self.ctx.manifest.current().await; let tier_files: Vec = version_ref.level_slice[tier] .iter() @@ -209,7 +209,7 @@ where None } - pub async fn execute_major(&self, task: TieredTask) -> Result<(), CompactionError> { + async fn execute_major(&self, task: TieredTask) -> Result<(), CompactionError> { let version_ref = self.ctx.manifest.current().await; let mut version_edits = vec![]; let mut delete_gens = vec![]; @@ -353,7 +353,7 @@ where Ok(()) } - pub(crate) fn is_tier_full(options: &TieredOptions, version: &Version, tier: usize) -> bool { + fn is_tier_full(options: &TieredOptions, version: &Version, tier: usize) -> bool { let max_tiers = options.max_tiers; if tier >= max_tiers || tier >= MAX_LEVEL { return false; From ef4eaa821efc039716ca58a6f889ecc8cbcf9187 Mon Sep 17 00:00:00 2001 From: 123789456ye <123789456ye@gmail.com> Date: Wed, 20 Aug 2025 15:48:20 +0800 Subject: [PATCH 5/6] chore: remove redundant parameters --- src/compaction/tiered.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 3b1be1da..14f4ebd0 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -185,7 +185,7 @@ where async fn should_major_compact(&self) -> Option { let version_ref = self.ctx.manifest.current().await; - for tier in 0..MAX_LEVEL - 1 { + for tier in 0..self.options.max_tiers - 1 { if Self::is_tier_full(&self.options, &version_ref, tier) { return Some(tier); } @@ -225,13 +225,9 @@ where if tier_scopes.is_empty() { continue; } - let min = tier_scopes.iter().map(|scope| &scope.min).min().unwrap(); - let max = tier_scopes.iter().map(|scope| &scope.max).max().unwrap(); Self::tier_compaction( &version_ref, &self.db_option, - min, - max, &mut version_edits, &mut delete_gens, &self.record_schema, @@ -261,8 +257,6 @@ where async fn tier_compaction( version: &Version, option: &DbOption, - _min: &::Key, - _max: &::Key, version_edits: &mut Vec::Key>>, delete_gens: &mut Vec, instance: &R::Schema, @@ -745,8 +739,6 @@ pub(crate) mod tests { }); // Test tier compaction - let min = "1".to_string(); - let max = "8".to_string(); let mut version_edits = Vec::new(); let mut delete_gens = Vec::new(); @@ -765,8 +757,6 @@ pub(crate) mod tests { TieredCompactor::::tier_compaction( &version, &option, - &min, - &max, &mut version_edits, &mut delete_gens, &TestSchema, From bea5ed5450bc9e128cb3ab7a402cc9d3f5e89594 Mon Sep 17 00:00:00 2001 From: 123789456ye <123789456ye@gmail.com> Date: Fri, 22 Aug 2025 15:13:05 +0800 Subject: [PATCH 6/6] refactor: change major_compaction in TieredCompactor to static method --- src/compaction/tiered.rs | 57 ++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 14f4ebd0..6ef131c9 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -132,7 +132,14 @@ where } // Perform major compaction - self.major_compaction(is_manual).await?; + Self::major_compaction( + &self.ctx, + &self.options, + &self.db_option, + &self.record_schema, + is_manual, + ) + .await?; Ok(()) } @@ -163,18 +170,23 @@ where R: Record, <::Schema as RecordSchema>::Columns: Send + Sync, { - async fn major_compaction(&self, is_manual: bool) -> Result<(), CompactionError> { - while let Some(tier) = self.should_major_compact().await { - if let Some(task) = self.plan_major(tier).await { - self.execute_major(task).await?; + pub async fn major_compaction( + ctx: &Context, + options: &TieredOptions, + db_option: &DbOption, + record_schema: &R::Schema, + is_manual: bool, + ) -> Result<(), CompactionError> { + while let Some(tier) = Self::should_major_compact(ctx, options).await { + if let Some(task) = Self::plan_major(ctx, tier).await { + Self::execute_major(ctx, db_option, record_schema, task).await?; } else { break; } } if is_manual { - self.ctx - .manifest + ctx.manifest .rewrite() .await .map_err(CompactionError::Manifest)?; @@ -183,18 +195,18 @@ where Ok(()) } - async fn should_major_compact(&self) -> Option { - let version_ref = self.ctx.manifest.current().await; - for tier in 0..self.options.max_tiers - 1 { - if Self::is_tier_full(&self.options, &version_ref, tier) { + async fn should_major_compact(ctx: &Context, options: &TieredOptions) -> Option { + let version_ref = ctx.manifest.current().await; + for tier in 0..options.max_tiers - 1 { + if Self::is_tier_full(options, &version_ref, tier) { return Some(tier); } } None } - async fn plan_major(&self, tier: usize) -> Option { - let version_ref = self.ctx.manifest.current().await; + async fn plan_major(ctx: &Context, tier: usize) -> Option { + let version_ref = ctx.manifest.current().await; let tier_files: Vec = version_ref.level_slice[tier] .iter() .map(|scope| scope.gen) @@ -209,8 +221,13 @@ where None } - async fn execute_major(&self, task: TieredTask) -> Result<(), CompactionError> { - let version_ref = self.ctx.manifest.current().await; + async fn execute_major( + ctx: &Context, + db_option: &DbOption, + record_schema: &R::Schema, + task: TieredTask, + ) -> Result<(), CompactionError> { + let version_ref = ctx.manifest.current().await; let mut version_edits = vec![]; let mut delete_gens = vec![]; for (source_tier, file_gens) in &task.input { @@ -227,11 +244,11 @@ where } Self::tier_compaction( &version_ref, - &self.db_option, + db_option, &mut version_edits, &mut delete_gens, - &self.record_schema, - &self.ctx, + record_schema, + ctx, *source_tier, task.target_tier, ) @@ -244,8 +261,7 @@ where ts: version_ref.increase_ts(), }); - self.ctx - .manifest + ctx.manifest .update(version_edits, Some(delete_gens)) .await?; } @@ -349,6 +365,7 @@ where fn is_tier_full(options: &TieredOptions, version: &Version, tier: usize) -> bool { let max_tiers = options.max_tiers; + // TODO: Move MAX_LEVEL out of Version if tier >= max_tiers || tier >= MAX_LEVEL { return false; }