Skip to content

Commit 4c5d905

Browse files
committed
refactor: use parquet lru reader as trait object
1 parent 2e3b9ae commit 4c5d905

File tree

17 files changed

+206
-291
lines changed

17 files changed

+206
-291
lines changed

bindings/python/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type PyExecutor = TokioExecutor;
3030
pub struct TonboDB {
3131
desc: Arc<Vec<Column>>,
3232
primary_key_index: usize,
33-
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
33+
db: Arc<DB<DynRecord, PyExecutor>>,
3434
}
3535

3636
#[pymethods]

examples/datafusion.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion::{
2626
use fusio::path::Path;
2727
use futures_core::Stream;
2828
use futures_util::StreamExt;
29-
use parquet_lru::NoopCache;
29+
use parquet_lru::NoCache;
3030
use tokio::fs;
3131
use tonbo::{
3232
executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record,
@@ -43,12 +43,12 @@ pub struct Music {
4343
}
4444

4545
struct MusicProvider {
46-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
46+
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
4747
}
4848

4949
struct MusicExec {
5050
cache: PlanProperties,
51-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
51+
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
5252
projection: Option<Vec<usize>>,
5353
limit: Option<usize>,
5454
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
@@ -98,7 +98,7 @@ impl TableProvider for MusicProvider {
9898

9999
impl MusicExec {
100100
fn new(
101-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
101+
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
102102
projection: Option<&Vec<usize>>,
103103
) -> Self {
104104
let schema = Music::arrow_schema();

parquet-lru/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ name = "parquet-lru"
77
version = "0.1.0"
88

99
[features]
10+
default = ["foyer"]
1011
foyer = ["dep:foyer", "dep:serde"]
11-
full = ["foyer"]
1212

1313
[dependencies]
1414
bytes = { version = "1.8.0", features = ["serde"] }
@@ -17,4 +17,3 @@ futures-core = "0.3.31"
1717
futures-util = "0.3.31"
1818
parquet = { version = "53.2.0", features = ["async"] }
1919
serde = { version = "1.0.214", optional = true }
20-
thiserror = "2.0.3"

parquet-lru/src/foyer.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use parquet::{
1010
};
1111
use serde::{Deserialize, Serialize};
1212

13-
use crate::{Error, LruCache, Options};
13+
use crate::LruCache;
1414

1515
#[derive(Clone)]
1616
pub struct FoyerCache<K>
@@ -32,23 +32,14 @@ impl<K> LruCache<K> for FoyerCache<K>
3232
where
3333
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
3434
{
35-
type LruReader<R: AsyncFileReader + 'static> = FoyerReader<K, R>;
35+
type LruReader<R> = FoyerReader<K, R>
36+
where
37+
R: AsyncFileReader + 'static;
3638

37-
async fn new(options: Options) -> Result<Self, Error> {
38-
Ok(Self {
39-
inner: Arc::new(FoyerCacheInner {
40-
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
41-
data: foyer::HybridCacheBuilder::new()
42-
.memory(options.data_capacity)
43-
.storage(foyer::Engine::Large)
44-
.build()
45-
.await
46-
.map_err(|e| Error::External(e.into()))?,
47-
}),
48-
})
49-
}
50-
51-
async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> FoyerReader<K, R> {
39+
async fn get_reader<R>(&self, key: K, reader: R) -> FoyerReader<K, R>
40+
where
41+
R: AsyncFileReader,
42+
{
5243
FoyerReader::new(self.clone(), key, reader)
5344
}
5445
}

parquet-lru/src/lib.rs

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,56 @@
1+
mod r#dyn;
12
#[cfg(feature = "foyer")]
23
pub mod foyer;
34

45
use std::{future::Future, marker::PhantomData};
56

6-
use parquet::{arrow::async_reader::AsyncFileReader, errors::Result};
7-
use thiserror::Error;
7+
use parquet::arrow::async_reader::AsyncFileReader;
88

9-
#[derive(Default)]
10-
pub struct Options {
11-
meta_capacity: usize,
12-
data_capacity: usize,
13-
}
14-
15-
impl Options {
16-
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
17-
self.meta_capacity = meta_capacity;
18-
self
19-
}
20-
21-
pub fn data_capacity(mut self, data_capacity: usize) -> Self {
22-
self.data_capacity = data_capacity;
23-
self
24-
}
25-
}
9+
pub use crate::r#dyn::*;
2610

27-
pub trait LruCache<K>: Clone + Send + Sync + 'static {
28-
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;
29-
30-
fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;
11+
pub trait LruCache<K>
12+
where
13+
K: 'static,
14+
{
15+
type LruReader<R>: AsyncFileReader + 'static
16+
where
17+
R: AsyncFileReader + 'static;
3118

3219
fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
3320
where
3421
R: AsyncFileReader + 'static;
3522
}
3623

37-
#[derive(Clone, Default)]
38-
pub struct NoopCache<K> {
24+
#[derive(Default)]
25+
pub struct NoCache<K> {
3926
_phantom: PhantomData<K>,
4027
}
4128

42-
impl<K> LruCache<K> for NoopCache<K>
43-
where
44-
K: Send + Sync + Clone + 'static,
45-
{
46-
type LruReader<R: AsyncFileReader + 'static> = R;
47-
48-
async fn new(_options: Options) -> Result<Self, Error> {
49-
Ok(Self {
29+
impl<K> Clone for NoCache<K> {
30+
fn clone(&self) -> Self {
31+
Self {
5032
_phantom: PhantomData,
51-
})
52-
}
53-
54-
async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
55-
reader
33+
}
5634
}
5735
}
5836

59-
#[derive(Debug, Error)]
60-
pub enum Error {
61-
#[error("External lru implementation error: {0}")]
62-
External(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
37+
unsafe impl<K> Send for NoCache<K> {}
38+
39+
unsafe impl<K> Sync for NoCache<K> {}
40+
41+
impl<K> LruCache<K> for NoCache<K>
42+
where
43+
K: 'static,
44+
{
45+
type LruReader<R> = R
46+
where
47+
R: AsyncFileReader + 'static;
48+
49+
#[allow(clippy::manual_async_fn)]
50+
fn get_reader<R>(&self, _key: K, reader: R) -> impl Future<Output = R> + Send
51+
where
52+
R: AsyncFileReader,
53+
{
54+
async move { reader }
55+
}
6356
}

src/compaction/mod.rs

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use fusio::DynFs;
55
use fusio_parquet::writer::AsyncWriter;
66
use futures_util::StreamExt;
77
use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
8-
use parquet_lru::LruCache;
98
use thiserror::Error;
109
use tokio::sync::oneshot;
1110

11+
use crate::ParquetLru;
1212
use crate::{
1313
fs::{manager::StoreManager, FileId, FileType},
1414
inmem::{
@@ -60,13 +60,10 @@ where
6060
}
6161
}
6262

63-
pub(crate) async fn check_then_compaction<C>(
63+
pub(crate) async fn check_then_compaction(
6464
&mut self,
65-
parquet_lru_cache: C,
66-
) -> Result<(), CompactionError<R>>
67-
where
68-
C: LruCache<FileId> + Unpin,
69-
{
65+
parquet_lru_cache: ParquetLru,
66+
) -> Result<(), CompactionError<R>> {
7067
let mut guard = self.schema.write().await;
7168

7269
guard.trigger.reset();
@@ -114,7 +111,7 @@ where
114111
&mut delete_gens,
115112
&guard.record_instance,
116113
&self.manager,
117-
&parquet_lru_cache,
114+
parquet_lru_cache,
118115
)
119116
.await?;
120117
}
@@ -193,7 +190,7 @@ where
193190
}
194191

195192
#[allow(clippy::too_many_arguments)]
196-
pub(crate) async fn major_compaction<C>(
193+
pub(crate) async fn major_compaction(
197194
version: &Version<R>,
198195
option: &DbOption<R>,
199196
mut min: &R::Key,
@@ -202,11 +199,8 @@ where
202199
delete_gens: &mut Vec<(FileId, usize)>,
203200
instance: &RecordInstance,
204201
manager: &StoreManager,
205-
parquet_cache: &C,
206-
) -> Result<(), CompactionError<R>>
207-
where
208-
C: LruCache<FileId> + Unpin,
209-
{
202+
parquet_cache: ParquetLru,
203+
) -> Result<(), CompactionError<R>> {
210204
let mut level = 0;
211205

212206
while level < MAX_LEVEL - 2 {
@@ -391,18 +385,15 @@ where
391385
(meet_scopes_l, start_l, end_l - 1)
392386
}
393387

394-
async fn build_tables<'scan, C>(
388+
async fn build_tables<'scan>(
395389
option: &DbOption<R>,
396390
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
397391
level: usize,
398-
streams: Vec<ScanStream<'scan, R, C>>,
392+
streams: Vec<ScanStream<'scan, R>>,
399393
instance: &RecordInstance,
400394
fs: &Arc<dyn DynFs>,
401-
) -> Result<(), CompactionError<R>>
402-
where
403-
C: LruCache<FileId> + Unpin,
404-
{
405-
let mut stream = MergeStream::<R, C>::from_vec(streams, u32::MAX.into()).await?;
395+
) -> Result<(), CompactionError<R>> {
396+
let mut stream = MergeStream::<R>::from_vec(streams, u32::MAX.into()).await?;
406397

407398
// Kould: is the capacity parameter necessary?
408399
let mut builder = R::Columns::builder(&instance.arrow_schema::<R>(), 8192);
@@ -529,8 +520,9 @@ pub(crate) mod tests {
529520
use fusio_dispatch::FsOptions;
530521
use fusio_parquet::writer::AsyncWriter;
531522
use parquet::arrow::AsyncArrowWriter;
532-
use parquet_lru::NoopCache;
523+
use parquet_lru::{DynLruCache, NoCache};
533524
use tempfile::TempDir;
525+
use ulid::Ulid;
534526

535527
use crate::{
536528
compaction::Compactor,
@@ -827,7 +819,7 @@ pub(crate) mod tests {
827819
&mut vec![],
828820
&RecordInstance::Normal,
829821
&manager,
830-
&NoopCache::default(),
822+
Arc::new(NoCache::default()),
831823
)
832824
.await
833825
.unwrap();
@@ -1219,7 +1211,7 @@ pub(crate) mod tests {
12191211
&mut vec![],
12201212
&RecordInstance::Normal,
12211213
&manager,
1222-
&NoopCache::default(),
1214+
Arc::new(NoCache::default()),
12231215
)
12241216
.await
12251217
.unwrap();
@@ -1240,7 +1232,7 @@ pub(crate) mod tests {
12401232
option.major_default_oldest_table_num = 1;
12411233
option.trigger_type = TriggerType::Length(5);
12421234

1243-
let db: DB<Test, TokioExecutor, _> = DB::new(option, TokioExecutor::new()).await.unwrap();
1235+
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new()).await.unwrap();
12441236

12451237
for i in 5..9 {
12461238
let item = Test {

0 commit comments

Comments
 (0)