diff --git a/Cargo.toml b/Cargo.toml index dbe5308d..7ef406f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = ["tonbo_macros"] } +workspace = { members = ["parquet-lru", "tonbo_macros"] } [package] description = "An embedded persistent KV database in Rust." @@ -83,7 +83,7 @@ fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "80389936 futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" -lockable = "0.0.8" +lockable = "0.1.1" once_cell = "1" parquet = { version = "53", default-features = false, features = [ "async", @@ -93,14 +93,15 @@ parquet = { version = "53", default-features = false, features = [ "lz4", "snap", ] } +parquet-lru = { version = "0.2.0", path = "parquet-lru" } pin-project-lite = "0.2" regex = "1" -thiserror = "1" +thiserror = "2.0.3" tokio = { version = "1", features = ["io-util"], default-features = false } tokio-util = { version = "0.7" } tonbo_macros = { version = "0.2.0", path = "tonbo_macros" } tracing = "0.1" -ulid = "1" +ulid = { version = "1", features = ["serde"] } # Only used for benchmarks log = "0.4.22" diff --git a/parquet-lru/Cargo.toml b/parquet-lru/Cargo.toml new file mode 100644 index 00000000..26456580 --- /dev/null +++ b/parquet-lru/Cargo.toml @@ -0,0 +1,22 @@ +[package] +description = "Implement LRU cache reader for parquet::arrow::async_reader::AsyncFileReader." +documentation = "https://docs.rs/parquet-lru" +edition = "2021" +license = "Apache-2.0" +name = "parquet-lru" +version = "0.2.0" + +[package.metadata.docs.rs] +all-features = true + +[features] +default = [] +foyer = ["dep:foyer", "dep:serde"] + +[dependencies] +bytes = { version = "1.8.0", features = ["serde"] } +foyer = { version = "0.12.2", optional = true } +futures-core = "0.3.31" +futures-util = "0.3.31" +parquet = { version = "53.2.0", features = ["async"] } +serde = { version = "1.0.214", optional = true } diff --git a/parquet-lru/src/dyn.rs b/parquet-lru/src/dyn.rs new file mode 100644 index 00000000..c63258ed --- /dev/null +++ b/parquet-lru/src/dyn.rs @@ -0,0 +1,49 @@ +use std::{ops::Range, sync::Arc}; + +use bytes::Bytes; +use futures_core::future::BoxFuture; +use parquet::{ + arrow::async_reader::AsyncFileReader, errors::Result, file::metadata::ParquetMetaData, +}; + +use crate::LruCache; + +pub struct BoxedFileReader { + inner: Box, +} + +impl BoxedFileReader { + pub fn new(inner: T) -> Self { + Self { + inner: Box::new(inner), + } + } +} + +impl AsyncFileReader for BoxedFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.inner.get_bytes(range) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { + self.inner.get_metadata() + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + self.inner.get_byte_ranges(ranges) + } +} + +pub trait DynLruCache { + fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader>; +} + +impl DynLruCache for C +where + K: 'static + Send, + C: LruCache + Sized + Send + Sync, +{ + fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader> { + Box::pin(async move { BoxedFileReader::new(self.get_reader(key, reader).await) }) + } +} diff --git a/parquet-lru/src/foyer.rs b/parquet-lru/src/foyer.rs new file mode 100644 index 00000000..ad6f672c --- /dev/null +++ b/parquet-lru/src/foyer.rs @@ -0,0 +1,145 @@ +use std::{hash::Hash, ops::Range, sync::Arc}; + +use bytes::Bytes; +use futures_core::future::BoxFuture; +use futures_util::FutureExt; +use parquet::{ + arrow::async_reader::AsyncFileReader, + errors::{ParquetError, Result}, + file::metadata::ParquetMetaData, +}; +use serde::{Deserialize, Serialize}; + +use crate::LruCache; + +#[derive(Clone)] +pub struct FoyerCache +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, +{ + inner: Arc>, +} + +pub struct FoyerCacheInner +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, +{ + meta: foyer::Cache>, + data: foyer::HybridCache<(K, Range), Bytes>, +} + +impl LruCache for FoyerCache +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static, +{ + type LruReader + = FoyerReader + where + R: AsyncFileReader + 'static; + + async fn get_reader(&self, key: K, reader: R) -> FoyerReader + where + R: AsyncFileReader, + { + FoyerReader::new(self.clone(), key, reader) + } +} + +pub struct FoyerReader +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, +{ + cache: FoyerCache, + key: K, + reader: R, +} + +impl FoyerReader +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, + R: AsyncFileReader, +{ + fn new(cache: FoyerCache, key: K, reader: R) -> Self { + Self { cache, key, reader } + } +} + +impl AsyncFileReader for FoyerReader +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static, + R: AsyncFileReader, +{ + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { + if let Some(data) = self + .cache + .inner + .data + .get(&(self.key.clone(), range.clone())) + .await + .map_err(|e| ParquetError::External(e.into()))? + { + Ok(data.value().clone()) + } else { + let data = self.reader.get_bytes(range.clone()).await?; + self.cache + .inner + .data + .insert((self.key.clone(), range), data.clone()); + Ok(data) + } + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { + async move { + if let Some(meta) = self.cache.inner.meta.get(&self.key) { + Ok(meta.value().clone()) + } else { + let meta = self.reader.get_metadata().await?; + self.cache.inner.meta.insert(self.key.clone(), meta.clone()); + Ok(meta) + } + } + .boxed() + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + async move { + let mut missed = Vec::with_capacity(ranges.len()); + let mut results = Vec::with_capacity(ranges.len()); + for (id, range) in ranges.iter().enumerate() { + if let Some(data) = self + .cache + .inner + .data + .get(&(self.key.clone(), range.clone())) + .await + .map_err(|e| ParquetError::External(e.into()))? + { + results.push((id, data.value().clone())); + } else { + missed.push((id, range)); + } + } + if !missed.is_empty() { + let data = self + .reader + .get_byte_ranges(missed.iter().map(|&(_, r)| r.clone()).collect()) + .await?; + for (id, range) in missed { + let data = data[id].clone(); + self.cache + .inner + .data + .insert((self.key.clone(), range.clone()), data.clone()); + results.push((id, data)); + } + } + results.sort_by_key(|(id, _)| *id); + Ok(results.into_iter().map(|(_, data)| data).collect()) + } + .boxed() + } +} diff --git a/parquet-lru/src/lib.rs b/parquet-lru/src/lib.rs new file mode 100644 index 00000000..4e9520e6 --- /dev/null +++ b/parquet-lru/src/lib.rs @@ -0,0 +1,57 @@ +mod r#dyn; +#[cfg(feature = "foyer")] +pub mod foyer; + +use std::{future::Future, marker::PhantomData}; + +use parquet::arrow::async_reader::AsyncFileReader; + +pub use crate::r#dyn::*; + +pub trait LruCache +where + K: 'static, +{ + type LruReader: AsyncFileReader + 'static + where + R: AsyncFileReader + 'static; + + fn get_reader(&self, key: K, reader: R) -> impl Future> + Send + where + R: AsyncFileReader + 'static; +} + +#[derive(Default)] +pub struct NoCache { + _phantom: PhantomData, +} + +impl Clone for NoCache { + fn clone(&self) -> Self { + Self { + _phantom: PhantomData, + } + } +} + +unsafe impl Send for NoCache {} + +unsafe impl Sync for NoCache {} + +impl LruCache for NoCache +where + K: 'static, +{ + type LruReader + = R + where + R: AsyncFileReader + 'static; + + #[allow(clippy::manual_async_fn)] + fn get_reader(&self, _key: K, reader: R) -> impl Future + Send + where + R: AsyncFileReader, + { + async move { reader } + } +} diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 72d212e6..a358f606 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -22,7 +22,7 @@ use crate::{ version::{ edit::VersionEdit, set::VersionSet, TransactionTs, Version, VersionError, MAX_LEVEL, }, - DbOption, Schema, + DbOption, ParquetLru, Schema, }; #[derive(Debug)] @@ -59,7 +59,10 @@ where } } - pub(crate) async fn check_then_compaction(&mut self) -> Result<(), CompactionError> { + pub(crate) async fn check_then_compaction( + &mut self, + parquet_lru: ParquetLru, + ) -> Result<(), CompactionError> { let mut guard = self.schema.write().await; guard.trigger.reset(); @@ -107,6 +110,7 @@ where &mut delete_gens, &guard.record_instance, &self.manager, + parquet_lru, ) .await?; } @@ -194,6 +198,7 @@ where delete_gens: &mut Vec<(FileId, usize)>, instance: &RecordInstance, manager: &StoreManager, + parquet_lru: ParquetLru, ) -> Result<(), CompactionError> { let mut level = 0; @@ -219,7 +224,7 @@ where .await?; streams.push(ScanStream::SsTable { - inner: SsTable::open(file) + inner: SsTable::open(parquet_lru.clone(), scope.gen, file) .await? .scan( (Bound::Unbounded, Bound::Unbounded), @@ -242,6 +247,7 @@ where None, ProjectionMask::all(), level_fs.clone(), + parquet_lru.clone(), ) .ok_or(CompactionError::EmptyLevel)?; @@ -262,6 +268,7 @@ where None, ProjectionMask::all(), level_fs.clone(), + parquet_lru.clone(), ) .ok_or(CompactionError::EmptyLevel)?; @@ -512,6 +519,7 @@ pub(crate) mod tests { use fusio_dispatch::FsOptions; use fusio_parquet::writer::AsyncWriter; use parquet::arrow::AsyncArrowWriter; + use parquet_lru::NoCache; use tempfile::TempDir; use crate::{ @@ -809,6 +817,7 @@ pub(crate) mod tests { &mut vec![], &RecordInstance::Normal, &manager, + Arc::new(NoCache::default()), ) .await .unwrap(); @@ -1200,6 +1209,7 @@ pub(crate) mod tests { &mut vec![], &RecordInstance::Normal, &manager, + Arc::new(NoCache::default()), ) .await .unwrap(); diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 1bffbb50..0f97191b 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -8,7 +8,7 @@ use std::{ use fusio::{fs::OpenOptions, path::Path}; use ulid::{DecodeError, Ulid}; -pub(crate) type FileId = Ulid; +pub type FileId = Ulid; pub enum FileType { Wal, diff --git a/src/lib.rs b/src/lib.rs index ad79f401..c5983607 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -137,6 +137,7 @@ pub use arrow; use async_lock::RwLock; use async_stream::stream; use flume::{bounded, Sender}; +use fs::FileId; use futures_core::Stream; use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; @@ -147,6 +148,7 @@ use parquet::{ arrow::{arrow_to_parquet_schema, ProjectionMask}, errors::ParquetError, }; +use parquet_lru::{DynLruCache, NoCache}; use record::{ColumnDesc, DynRecord, Record, RecordInstance}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; @@ -159,7 +161,7 @@ pub use crate::option::*; use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::Executor, - fs::{manager::StoreManager, parse_file_id, FileId, FileType}, + fs::{manager::StoreManager, parse_file_id, FileType}, serdes::Decode, snapshot::Snapshot, stream::{ @@ -181,10 +183,11 @@ where version_set: VersionSet, lock_map: LockMap, manager: Arc, + parquet_lru: ParquetLru, _p: PhantomData, } -impl DB +impl DB where E: Executor + Send + Sync + 'static, { @@ -200,7 +203,7 @@ where let instance = RecordInstance::Runtime(DynRecord::empty_record(column_descs, primary_index)); - Self::build(option, executor, instance).await + Self::build(option, executor, instance, Arc::new(NoCache::default())).await } } @@ -216,13 +219,27 @@ where /// /// For more configurable options, please refer to [`DbOption`]. pub async fn new(option: DbOption, executor: E) -> Result> { - Self::build(Arc::new(option), executor, RecordInstance::Normal).await + Self::build( + Arc::new(option), + executor, + RecordInstance::Normal, + Arc::new(NoCache::default()), + ) + .await } +} +impl DB +where + R: Record + Send + Sync, + R::Columns: Send + Sync, + E: Executor + Send + Sync + 'static, +{ async fn build( option: Arc>, executor: E, instance: RecordInstance, + lru_cache: ParquetLru, ) -> Result> { let manager = Arc::new(StoreManager::new( option.base_fs.clone(), @@ -260,12 +277,20 @@ where error!("[Cleaner Error]: {}", err) } }); + + let compact_task_cache = lru_cache.clone(); executor.spawn(async move { while let Ok(task) = task_rx.recv_async().await { if let Err(err) = match task { - CompactTask::Freeze => compactor.check_then_compaction().await, + CompactTask::Freeze => { + compactor + .check_then_compaction(compact_task_cache.clone()) + .await + } CompactTask::Flush(option_tx) => { - let mut result = compactor.check_then_compaction().await; + let mut result = compactor + .check_then_compaction(compact_task_cache.clone()) + .await; if let Some(tx) = option_tx { if result.is_ok() { result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -278,11 +303,13 @@ where } } }); + Ok(Self { schema, version_set, lock_map: Arc::new(Default::default()), manager, + parquet_lru: lru_cache, _p: Default::default(), }) } @@ -297,6 +324,7 @@ where self.schema.read().await, self.version_set.current().await, self.manager.clone(), + self.parquet_lru.clone(), ) } @@ -353,6 +381,7 @@ where key, self.version_set.load_ts(), Projection::All, + self.parquet_lru.clone(), ) .await? .and_then(|entry| { @@ -380,6 +409,7 @@ where self.version_set.load_ts(), &*current, Box::new(|_| None), + self.parquet_lru.clone(), ).take().await?; while let Some(record) = scan.next().await { @@ -566,6 +596,7 @@ where key: &'get R::Key, ts: Timestamp, projection: Projection, + parquet_lru: ParquetLru, ) -> Result>, DbError> { if let Some(entry) = self.mutable.get(key, ts) { return Ok(Some(Entry::Mutable(entry))); @@ -595,7 +626,12 @@ where } Ok(version - .query(manager, TimestampedRef::new(key, ts), projection) + .query( + manager, + TimestampedRef::new(key, ts), + projection, + parquet_lru, + ) .await? .map(|entry| Entry::RecordBatch(entry))) } @@ -634,6 +670,8 @@ where limit: Option, projection_indices: Option>, projection: ProjectionMask, + + parquet_lru: ParquetLru, } impl<'scan, 'range, R> Scan<'scan, 'range, R> @@ -649,6 +687,7 @@ where fn_pre_stream: Box< dyn FnOnce(Option) -> Option> + Send + 'scan, >, + parquet_lru: ParquetLru, ) -> Self { Self { schema, @@ -661,6 +700,7 @@ where limit: None, projection_indices: None, projection: ProjectionMask::all(), + parquet_lru, } } @@ -736,6 +776,7 @@ where self.ts, self.limit, self.projection, + self.parquet_lru, ) .await?; @@ -788,6 +829,7 @@ where self.ts, self.limit, self.projection, + self.parquet_lru, ) .await?; let merge_stream = MergeStream::from_vec(streams, self.ts).await?; @@ -833,6 +875,8 @@ pub enum Projection { Parts(Vec), } +pub type ParquetLru = Arc + Send + Sync>; + #[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{ @@ -852,6 +896,7 @@ pub(crate) mod tests { use futures::StreamExt; use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; + use parquet_lru::NoCache; use tempfile::TempDir; use tracing::error; @@ -1276,9 +1321,15 @@ pub(crate) mod tests { executor.spawn(async move { while let Ok(task) = compaction_rx.recv_async().await { if let Err(err) = match task { - CompactTask::Freeze => compactor.check_then_compaction().await, + CompactTask::Freeze => { + compactor + .check_then_compaction(Arc::new(NoCache::default())) + .await + } CompactTask::Flush(option_tx) => { - let mut result = compactor.check_then_compaction().await; + let mut result = compactor + .check_then_compaction(Arc::new(NoCache::default())) + .await; if let Some(tx) = option_tx { let channel_result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -1299,6 +1350,7 @@ pub(crate) mod tests { version_set, lock_map: Arc::new(Default::default()), manager, + parquet_lru: Arc::new(NoCache::default()), _p: Default::default(), }) } diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 5a8ce66d..4957142e 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -6,9 +6,11 @@ use std::{ }; use arrow::datatypes::Schema; -use fusio_parquet::reader::AsyncReader; use futures_core::{ready, Stream}; -use parquet::arrow::{async_reader::ParquetRecordBatchStream, ProjectionMask}; +use parquet::arrow::{ + async_reader::{AsyncFileReader, ParquetRecordBatchStream}, + ProjectionMask, +}; use pin_project_lite::pin_project; use crate::{ @@ -18,9 +20,9 @@ use crate::{ pin_project! { #[derive(Debug)] - pub struct SsTableScan<'scan, R>{ + pub struct SsTableScan<'scan, R> { #[pin] - stream: ParquetRecordBatchStream, + stream: ParquetRecordBatchStream>, iter: Option>, projection_mask: ProjectionMask, full_schema: Arc, @@ -30,7 +32,7 @@ pin_project! { impl SsTableScan<'_, R> { pub fn new( - stream: ParquetRecordBatchStream, + stream: ParquetRecordBatchStream>, projection_mask: ProjectionMask, full_schema: Arc, ) -> Self { diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 859e6c56..60e05af3 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -1,12 +1,18 @@ -use std::{marker::PhantomData, ops::Bound}; +use std::{marker::PhantomData, ops::Bound, sync::Arc}; use fusio::{dynamic::DynFile, DynRead}; use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; -use parquet::arrow::{ - arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, - ParquetRecordBatchStreamBuilder, ProjectionMask, +use parquet::{ + arrow::{ + arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, + async_reader::{AsyncFileReader, AsyncReader as ParquetAsyncReader}, + ParquetRecordBatchStreamBuilder, ProjectionMask, + }, + errors::Result as ParquetResult, }; +use parquet_lru::{BoxedFileReader, DynLruCache}; +use ulid::Ulid; use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ @@ -19,7 +25,7 @@ pub(crate) struct SsTable where R: Record, { - reader: AsyncReader, + reader: BoxedFileReader, _marker: PhantomData, } @@ -27,11 +33,20 @@ impl SsTable where R: Record, { - pub(crate) async fn open(file: Box) -> Result { + pub(crate) async fn open( + lru_cache: Arc + Send + Sync>, + id: Ulid, + file: Box, + ) -> Result { let size = file.size().await?; Ok(SsTable { - reader: AsyncReader::new(file, size).await?, + reader: lru_cache + .get_reader( + id, + BoxedFileReader::new(AsyncReader::new(file, size).await?), + ) + .await, _marker: PhantomData, }) } @@ -40,11 +55,10 @@ where self, limit: Option, projection_mask: ProjectionMask, - ) -> parquet::errors::Result< - ArrowReaderBuilder>, - > { + ) -> ParquetResult>>> + { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( - self.reader, + Box::new(self.reader) as Box, ArrowReaderOptions::default().with_page_index(true), ) .await?; @@ -58,7 +72,7 @@ where self, key: &TimestampedRef, projection_mask: ProjectionMask, - ) -> parquet::errors::Result>> { + ) -> ParquetResult>> { self.scan( (Bound::Included(key.value()), Bound::Included(key.value())), key.ts(), @@ -114,6 +128,7 @@ pub(crate) mod tests { basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; + use parquet_lru::NoCache; use super::SsTable; use crate::{ @@ -157,6 +172,8 @@ pub(crate) mod tests { R: Record, { SsTable::open( + Arc::new(NoCache::default()), + Default::default(), store .open_options(path, FileType::Parquet.open_options(true)) .await diff --git a/src/snapshot.rs b/src/snapshot.rs index 941fc9f3..c548d010 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -10,17 +10,24 @@ use crate::{ stream::ScanStream, timestamp::Timestamp, version::{TransactionTs, VersionRef}, - DbError, Projection, Scan, Schema, + DbError, ParquetLru, Projection, Scan, Schema, }; -pub struct Snapshot<'s, R: Record> { +pub struct Snapshot<'s, R> +where + R: Record, +{ ts: Timestamp, share: RwLockReadGuard<'s, Schema>, version: VersionRef, manager: Arc, + parquet_lru: ParquetLru, } -impl<'s, R: Record> Snapshot<'s, R> { +impl<'s, R> Snapshot<'s, R> +where + R: Record, +{ pub async fn get<'get>( &'get self, key: &'get R::Key, @@ -28,7 +35,14 @@ impl<'s, R: Record> Snapshot<'s, R> { ) -> Result>, DbError> { Ok(self .share - .get(&self.version, &self.manager, key, self.ts, projection) + .get( + &self.version, + &self.manager, + key, + self.ts, + projection, + self.parquet_lru.clone(), + ) .await? .and_then(|entry| { if entry.value().is_none() { @@ -50,6 +64,7 @@ impl<'s, R: Record> Snapshot<'s, R> { self.ts, &self.version, Box::new(move |_: Option| None), + self.parquet_lru.clone(), ) } @@ -57,12 +72,14 @@ impl<'s, R: Record> Snapshot<'s, R> { share: RwLockReadGuard<'s, Schema>, version: VersionRef, manager: Arc, + parquet_lru: ParquetLru, ) -> Self { Self { ts: version.load_ts(), share, version, manager, + parquet_lru, } } @@ -92,6 +109,7 @@ impl<'s, R: Record> Snapshot<'s, R> { self.ts, &self.version, fn_pre_stream, + self.parquet_lru.clone(), ) } } diff --git a/src/stream/level.rs b/src/stream/level.rs index 90a87966..81c42b2f 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -13,6 +13,8 @@ use fusio::{ }; use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; +use parquet_lru::DynLruCache; +use ulid::Ulid; use crate::{ fs::{FileId, FileType}, @@ -31,7 +33,10 @@ where { Init(FileId), Ready(SsTableScan<'level, R>), - OpenFile(Pin, Error>> + 'level>>), + OpenFile( + Ulid, + Pin, Error>> + 'level>>, + ), OpenSst(Pin, Error>> + 'level>>), LoadStream( Pin, ParquetError>> + Send + 'level>>, @@ -53,6 +58,7 @@ where status: FutureStatus<'level, R>, fs: Arc, path: Option, + parquet_lru: Arc + Send + Sync>, } impl<'level, R> LevelStream<'level, R> @@ -71,6 +77,7 @@ where limit: Option, projection_mask: ProjectionMask, fs: Arc, + parquet_lru: Arc + Send + Sync>, ) -> Option { let (lower, upper) = range; let mut gens: VecDeque = version.level_slice[level][start..end + 1] @@ -92,6 +99,7 @@ where status, fs, path: None, + parquet_lru, }) } } @@ -125,7 +133,7 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + self.status = FutureStatus::OpenFile(gen, reader); continue; } FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { @@ -151,7 +159,7 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + self.status = FutureStatus::OpenFile(gen, reader); continue; } }, @@ -163,9 +171,14 @@ where } Poll::Pending => Poll::Pending, }, - FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { + FutureStatus::OpenFile(id, file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file))); + let id = *id; + self.status = FutureStatus::OpenSst(Box::pin(SsTable::open( + self.parquet_lru.clone(), + id, + file, + ))); continue; } Poll::Ready(Err(err)) => { @@ -209,6 +222,7 @@ mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use parquet_lru::NoCache; use tempfile::TempDir; use crate::{ @@ -251,6 +265,7 @@ mod tests { [0, 1, 2, 3], ), manager.base_fs().clone(), + Arc::new(NoCache::default()), ) .unwrap(); @@ -287,6 +302,7 @@ mod tests { [0, 1, 2, 4], ), manager.base_fs().clone(), + Arc::new(NoCache::default()), ) .unwrap(); @@ -323,6 +339,7 @@ mod tests { [0, 1, 2], ), manager.base_fs().clone(), + Arc::new(NoCache::default()), ) .unwrap(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 6cf707c5..d1dda197 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -25,7 +25,7 @@ use crate::{ stream::{level::LevelStream, record_batch::RecordBatchEntry, ScanStream}, timestamp::{Timestamp, TimestampedRef}, version::{cleaner::CleanTag, edit::VersionEdit}, - DbOption, + DbOption, ParquetLru, }; pub(crate) const MAX_LEVEL: usize = 7; @@ -121,6 +121,7 @@ where manager: &StoreManager, key: &TimestampedRef, projection_mask: ProjectionMask, + parquet_lru: ParquetLru, ) -> Result>, VersionError> { let level_0_path = self .option @@ -132,7 +133,14 @@ where continue; } if let Some(entry) = self - .table_query(level_0_fs, key, 0, scope.gen, projection_mask.clone()) + .table_query( + level_0_fs, + key, + 0, + scope.gen, + projection_mask.clone(), + parquet_lru.clone(), + ) .await? { return Ok(Some(entry)); @@ -159,6 +167,7 @@ where leve, sort_runs[index].gen, projection_mask.clone(), + parquet_lru.clone(), ) .await? { @@ -176,6 +185,7 @@ where level: usize, gen: FileId, projection_mask: ProjectionMask, + parquet_lru: ParquetLru, ) -> Result>, VersionError> { let file = store .open_options( @@ -184,7 +194,7 @@ where ) .await .map_err(VersionError::Fusio)?; - SsTable::::open(file) + SsTable::::open(parquet_lru, gen, file) .await? .get(key, projection_mask) .await @@ -201,6 +211,7 @@ where self.level_slice[level].len() } + #[allow(clippy::too_many_arguments)] pub(crate) async fn streams<'streams>( &self, manager: &StoreManager, @@ -209,6 +220,7 @@ where ts: Timestamp, limit: Option, projection_mask: ProjectionMask, + parquet_lru: ParquetLru, ) -> Result<(), VersionError> { let level_0_path = self .option @@ -226,7 +238,7 @@ where ) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(file).await?; + let table = SsTable::open(parquet_lru.clone(), scope.gen, file).await?; streams.push(ScanStream::SsTable { inner: table @@ -271,6 +283,7 @@ where limit, projection_mask.clone(), level_fs.clone(), + parquet_lru.clone(), ) .unwrap(), }); diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 11f558d1..b4a1b13a 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -112,5 +112,3 @@ mod tests { assert_eq!(write_hasher.finish(), read_hasher.finish()); } } - -fn main() {}