Skip to content

Commit 9e732d1

Browse files
committed
refactor: use parquet lru reader as trait object
1 parent 2e3b9ae commit 9e732d1

File tree

18 files changed

+214
-309
lines changed

18 files changed

+214
-309
lines changed

bindings/python/src/db.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::sync::Arc;
22

3-
use parquet_lru::NoopCache;
43
use pyo3::{
54
prelude::*,
65
pyclass, pymethods,
@@ -10,7 +9,6 @@ use pyo3::{
109
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
1110
use tonbo::{
1211
executor::tokio::TokioExecutor,
13-
fs::FileId,
1412
record::{ColumnDesc, DynRecord},
1513
DB,
1614
};
@@ -30,7 +28,7 @@ type PyExecutor = TokioExecutor;
3028
pub struct TonboDB {
3129
desc: Arc<Vec<Column>>,
3230
primary_key_index: usize,
33-
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
31+
db: Arc<DB<DynRecord, PyExecutor>>,
3432
}
3533

3634
#[pymethods]

bindings/python/src/transaction.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use std::{mem::transmute, sync::Arc};
22

3-
use parquet_lru::NoopCache;
43
use pyo3::{
54
pyclass, pymethods,
65
types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple},
76
Bound, IntoPy, Py, PyAny, PyResult, Python,
87
};
98
use pyo3_asyncio::tokio::future_into_py;
10-
use tonbo::{fs::FileId, record::DynRecord, transaction, Projection};
9+
use tonbo::{record::DynRecord, transaction, Projection};
1110

1211
use crate::{
1312
column::Column,
@@ -19,14 +18,14 @@ use crate::{
1918

2019
#[pyclass]
2120
pub struct Transaction {
22-
txn: Option<transaction::Transaction<'static, DynRecord, NoopCache<FileId>>>,
21+
txn: Option<transaction::Transaction<'static, DynRecord>>,
2322
desc: Arc<Vec<Column>>,
2423
primary_key_index: usize,
2524
}
2625

2726
impl Transaction {
2827
pub(crate) fn new<'txn>(
29-
txn: transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
28+
txn: transaction::Transaction<'txn, DynRecord>,
3029
desc: Arc<Vec<Column>>,
3130
) -> Self {
3231
let primary_key_index = desc
@@ -38,8 +37,8 @@ impl Transaction {
3837
Transaction {
3938
txn: Some(unsafe {
4039
transmute::<
41-
transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
42-
transaction::Transaction<'static, DynRecord, NoopCache<FileId>>,
40+
transaction::Transaction<'txn, DynRecord>,
41+
transaction::Transaction<'static, DynRecord>,
4342
>(txn)
4443
}),
4544
desc,
@@ -85,8 +84,8 @@ impl Transaction {
8584
let txn = self.txn.as_ref().unwrap();
8685
let txn = unsafe {
8786
transmute::<
88-
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
89-
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
87+
&transaction::Transaction<'_, DynRecord>,
88+
&'static transaction::Transaction<'_, DynRecord>,
9089
>(txn)
9190
};
9291

@@ -170,8 +169,8 @@ impl Transaction {
170169
let txn = self.txn.as_ref().unwrap();
171170
let txn = unsafe {
172171
transmute::<
173-
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
174-
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
172+
&transaction::Transaction<'_, DynRecord>,
173+
&'static transaction::Transaction<'_, DynRecord>,
175174
>(txn)
176175
};
177176
let col_desc = self.desc.get(self.primary_key_index).unwrap();

examples/datafusion.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion::{
2626
use fusio::path::Path;
2727
use futures_core::Stream;
2828
use futures_util::StreamExt;
29-
use parquet_lru::NoopCache;
29+
use parquet_lru::NoCache;
3030
use tokio::fs;
3131
use tonbo::{
3232
executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record,
@@ -43,12 +43,12 @@ pub struct Music {
4343
}
4444

4545
struct MusicProvider {
46-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
46+
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
4747
}
4848

4949
struct MusicExec {
5050
cache: PlanProperties,
51-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
51+
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
5252
projection: Option<Vec<usize>>,
5353
limit: Option<usize>,
5454
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
@@ -98,7 +98,7 @@ impl TableProvider for MusicProvider {
9898

9999
impl MusicExec {
100100
fn new(
101-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
101+
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
102102
projection: Option<&Vec<usize>>,
103103
) -> Self {
104104
let schema = Music::arrow_schema();

parquet-lru/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ name = "parquet-lru"
77
version = "0.1.0"
88

99
[features]
10+
default = ["foyer"]
1011
foyer = ["dep:foyer", "dep:serde"]
11-
full = ["foyer"]
1212

1313
[dependencies]
1414
bytes = { version = "1.8.0", features = ["serde"] }
@@ -17,4 +17,3 @@ futures-core = "0.3.31"
1717
futures-util = "0.3.31"
1818
parquet = { version = "53.2.0", features = ["async"] }
1919
serde = { version = "1.0.214", optional = true }
20-
thiserror = "2.0.3"

parquet-lru/src/foyer.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use parquet::{
1010
};
1111
use serde::{Deserialize, Serialize};
1212

13-
use crate::{Error, LruCache, Options};
13+
use crate::LruCache;
1414

1515
#[derive(Clone)]
1616
pub struct FoyerCache<K>
@@ -32,23 +32,14 @@ impl<K> LruCache<K> for FoyerCache<K>
3232
where
3333
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
3434
{
35-
type LruReader<R: AsyncFileReader + 'static> = FoyerReader<K, R>;
35+
type LruReader<R> = FoyerReader<K, R>
36+
where
37+
R: AsyncFileReader + 'static;
3638

37-
async fn new(options: Options) -> Result<Self, Error> {
38-
Ok(Self {
39-
inner: Arc::new(FoyerCacheInner {
40-
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
41-
data: foyer::HybridCacheBuilder::new()
42-
.memory(options.data_capacity)
43-
.storage(foyer::Engine::Large)
44-
.build()
45-
.await
46-
.map_err(|e| Error::External(e.into()))?,
47-
}),
48-
})
49-
}
50-
51-
async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> FoyerReader<K, R> {
39+
async fn get_reader<R>(&self, key: K, reader: R) -> FoyerReader<K, R>
40+
where
41+
R: AsyncFileReader,
42+
{
5243
FoyerReader::new(self.clone(), key, reader)
5344
}
5445
}

parquet-lru/src/lib.rs

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,56 @@
1+
mod r#dyn;
12
#[cfg(feature = "foyer")]
23
pub mod foyer;
34

45
use std::{future::Future, marker::PhantomData};
56

6-
use parquet::{arrow::async_reader::AsyncFileReader, errors::Result};
7-
use thiserror::Error;
7+
use parquet::arrow::async_reader::AsyncFileReader;
88

9-
#[derive(Default)]
10-
pub struct Options {
11-
meta_capacity: usize,
12-
data_capacity: usize,
13-
}
14-
15-
impl Options {
16-
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
17-
self.meta_capacity = meta_capacity;
18-
self
19-
}
20-
21-
pub fn data_capacity(mut self, data_capacity: usize) -> Self {
22-
self.data_capacity = data_capacity;
23-
self
24-
}
25-
}
9+
pub use crate::r#dyn::*;
2610

27-
pub trait LruCache<K>: Clone + Send + Sync + 'static {
28-
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;
29-
30-
fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;
11+
pub trait LruCache<K>
12+
where
13+
K: 'static,
14+
{
15+
type LruReader<R>: AsyncFileReader + 'static
16+
where
17+
R: AsyncFileReader + 'static;
3118

3219
fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
3320
where
3421
R: AsyncFileReader + 'static;
3522
}
3623

37-
#[derive(Clone, Default)]
38-
pub struct NoopCache<K> {
24+
#[derive(Default)]
25+
pub struct NoCache<K> {
3926
_phantom: PhantomData<K>,
4027
}
4128

42-
impl<K> LruCache<K> for NoopCache<K>
43-
where
44-
K: Send + Sync + Clone + 'static,
45-
{
46-
type LruReader<R: AsyncFileReader + 'static> = R;
47-
48-
async fn new(_options: Options) -> Result<Self, Error> {
49-
Ok(Self {
29+
impl<K> Clone for NoCache<K> {
30+
fn clone(&self) -> Self {
31+
Self {
5032
_phantom: PhantomData,
51-
})
52-
}
53-
54-
async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
55-
reader
33+
}
5634
}
5735
}
5836

59-
#[derive(Debug, Error)]
60-
pub enum Error {
61-
#[error("External lru implementation error: {0}")]
62-
External(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
37+
unsafe impl<K> Send for NoCache<K> {}
38+
39+
unsafe impl<K> Sync for NoCache<K> {}
40+
41+
impl<K> LruCache<K> for NoCache<K>
42+
where
43+
K: 'static,
44+
{
45+
type LruReader<R> = R
46+
where
47+
R: AsyncFileReader + 'static;
48+
49+
#[allow(clippy::manual_async_fn)]
50+
fn get_reader<R>(&self, _key: K, reader: R) -> impl Future<Output = R> + Send
51+
where
52+
R: AsyncFileReader,
53+
{
54+
async move { reader }
55+
}
6356
}

src/compaction/mod.rs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use fusio::DynFs;
55
use fusio_parquet::writer::AsyncWriter;
66
use futures_util::StreamExt;
77
use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
8-
use parquet_lru::LruCache;
98
use thiserror::Error;
109
use tokio::sync::oneshot;
1110

11+
use crate::ParquetLru;
1212
use crate::{
1313
fs::{manager::StoreManager, FileId, FileType},
1414
inmem::{
@@ -60,13 +60,10 @@ where
6060
}
6161
}
6262

63-
pub(crate) async fn check_then_compaction<C>(
63+
pub(crate) async fn check_then_compaction(
6464
&mut self,
65-
parquet_lru_cache: C,
66-
) -> Result<(), CompactionError<R>>
67-
where
68-
C: LruCache<FileId> + Unpin,
69-
{
65+
parquet_lru_cache: ParquetLru,
66+
) -> Result<(), CompactionError<R>> {
7067
let mut guard = self.schema.write().await;
7168

7269
guard.trigger.reset();
@@ -114,7 +111,7 @@ where
114111
&mut delete_gens,
115112
&guard.record_instance,
116113
&self.manager,
117-
&parquet_lru_cache,
114+
parquet_lru_cache,
118115
)
119116
.await?;
120117
}
@@ -193,7 +190,7 @@ where
193190
}
194191

195192
#[allow(clippy::too_many_arguments)]
196-
pub(crate) async fn major_compaction<C>(
193+
pub(crate) async fn major_compaction(
197194
version: &Version<R>,
198195
option: &DbOption<R>,
199196
mut min: &R::Key,
@@ -202,11 +199,8 @@ where
202199
delete_gens: &mut Vec<(FileId, usize)>,
203200
instance: &RecordInstance,
204201
manager: &StoreManager,
205-
parquet_cache: &C,
206-
) -> Result<(), CompactionError<R>>
207-
where
208-
C: LruCache<FileId> + Unpin,
209-
{
202+
parquet_cache: ParquetLru,
203+
) -> Result<(), CompactionError<R>> {
210204
let mut level = 0;
211205

212206
while level < MAX_LEVEL - 2 {
@@ -391,18 +385,15 @@ where
391385
(meet_scopes_l, start_l, end_l - 1)
392386
}
393387

394-
async fn build_tables<'scan, C>(
388+
async fn build_tables<'scan>(
395389
option: &DbOption<R>,
396390
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
397391
level: usize,
398-
streams: Vec<ScanStream<'scan, R, C>>,
392+
streams: Vec<ScanStream<'scan, R>>,
399393
instance: &RecordInstance,
400394
fs: &Arc<dyn DynFs>,
401-
) -> Result<(), CompactionError<R>>
402-
where
403-
C: LruCache<FileId> + Unpin,
404-
{
405-
let mut stream = MergeStream::<R, C>::from_vec(streams, u32::MAX.into()).await?;
395+
) -> Result<(), CompactionError<R>> {
396+
let mut stream = MergeStream::<R>::from_vec(streams, u32::MAX.into()).await?;
406397

407398
// Kould: is the capacity parameter necessary?
408399
let mut builder = R::Columns::builder(&instance.arrow_schema::<R>(), 8192);
@@ -529,7 +520,7 @@ pub(crate) mod tests {
529520
use fusio_dispatch::FsOptions;
530521
use fusio_parquet::writer::AsyncWriter;
531522
use parquet::arrow::AsyncArrowWriter;
532-
use parquet_lru::NoopCache;
523+
use parquet_lru::NoCache;
533524
use tempfile::TempDir;
534525

535526
use crate::{
@@ -827,7 +818,7 @@ pub(crate) mod tests {
827818
&mut vec![],
828819
&RecordInstance::Normal,
829820
&manager,
830-
&NoopCache::default(),
821+
Arc::new(NoCache::default()),
831822
)
832823
.await
833824
.unwrap();
@@ -1219,7 +1210,7 @@ pub(crate) mod tests {
12191210
&mut vec![],
12201211
&RecordInstance::Normal,
12211212
&manager,
1222-
&NoopCache::default(),
1213+
Arc::new(NoCache::default()),
12231214
)
12241215
.await
12251216
.unwrap();
@@ -1240,7 +1231,7 @@ pub(crate) mod tests {
12401231
option.major_default_oldest_table_num = 1;
12411232
option.trigger_type = TriggerType::Length(5);
12421233

1243-
let db: DB<Test, TokioExecutor, _> = DB::new(option, TokioExecutor::new()).await.unwrap();
1234+
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new()).await.unwrap();
12441235

12451236
for i in 5..9 {
12461237
let item = Test {

0 commit comments

Comments
 (0)