Skip to content

Commit 997c329

Browse files
committed
refactor: use parquet lru reader as trait object
1 parent 2e3b9ae commit 997c329

File tree

19 files changed

+263
-315
lines changed

19 files changed

+263
-315
lines changed

bindings/python/src/db.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::sync::Arc;
22

3-
use parquet_lru::NoopCache;
43
use pyo3::{
54
prelude::*,
65
pyclass, pymethods,
@@ -10,7 +9,6 @@ use pyo3::{
109
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
1110
use tonbo::{
1211
executor::tokio::TokioExecutor,
13-
fs::FileId,
1412
record::{ColumnDesc, DynRecord},
1513
DB,
1614
};
@@ -30,7 +28,7 @@ type PyExecutor = TokioExecutor;
3028
pub struct TonboDB {
3129
desc: Arc<Vec<Column>>,
3230
primary_key_index: usize,
33-
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
31+
db: Arc<DB<DynRecord, PyExecutor>>,
3432
}
3533

3634
#[pymethods]

bindings/python/src/transaction.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
use std::{mem::transmute, sync::Arc};
22

3-
use parquet_lru::NoopCache;
43
use pyo3::{
54
pyclass, pymethods,
65
types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple},
76
Bound, IntoPy, Py, PyAny, PyResult, Python,
87
};
98
use pyo3_asyncio::tokio::future_into_py;
10-
use tonbo::{fs::FileId, record::DynRecord, transaction, Projection};
9+
use tonbo::{record::DynRecord, transaction, Projection};
1110

1211
use crate::{
1312
column::Column,
@@ -19,14 +18,14 @@ use crate::{
1918

2019
#[pyclass]
2120
pub struct Transaction {
22-
txn: Option<transaction::Transaction<'static, DynRecord, NoopCache<FileId>>>,
21+
txn: Option<transaction::Transaction<'static, DynRecord>>,
2322
desc: Arc<Vec<Column>>,
2423
primary_key_index: usize,
2524
}
2625

2726
impl Transaction {
2827
pub(crate) fn new<'txn>(
29-
txn: transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
28+
txn: transaction::Transaction<'txn, DynRecord>,
3029
desc: Arc<Vec<Column>>,
3130
) -> Self {
3231
let primary_key_index = desc
@@ -38,8 +37,8 @@ impl Transaction {
3837
Transaction {
3938
txn: Some(unsafe {
4039
transmute::<
41-
transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
42-
transaction::Transaction<'static, DynRecord, NoopCache<FileId>>,
40+
transaction::Transaction<'txn, DynRecord>,
41+
transaction::Transaction<'static, DynRecord>,
4342
>(txn)
4443
}),
4544
desc,
@@ -85,8 +84,8 @@ impl Transaction {
8584
let txn = self.txn.as_ref().unwrap();
8685
let txn = unsafe {
8786
transmute::<
88-
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
89-
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
87+
&transaction::Transaction<'_, DynRecord>,
88+
&'static transaction::Transaction<'_, DynRecord>,
9089
>(txn)
9190
};
9291

@@ -170,8 +169,8 @@ impl Transaction {
170169
let txn = self.txn.as_ref().unwrap();
171170
let txn = unsafe {
172171
transmute::<
173-
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
174-
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
172+
&transaction::Transaction<'_, DynRecord>,
173+
&'static transaction::Transaction<'_, DynRecord>,
175174
>(txn)
176175
};
177176
let col_desc = self.desc.get(self.primary_key_index).unwrap();

examples/datafusion.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@ use datafusion::{
2626
use fusio::path::Path;
2727
use futures_core::Stream;
2828
use futures_util::StreamExt;
29-
use parquet_lru::NoopCache;
3029
use tokio::fs;
3130
use tonbo::{
32-
executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record,
33-
DbOption, DB,
31+
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
3432
};
3533
use tonbo_macros::Record;
3634

@@ -43,12 +41,12 @@ pub struct Music {
4341
}
4442

4543
struct MusicProvider {
46-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
44+
db: Arc<DB<Music, TokioExecutor>>,
4745
}
4846

4947
struct MusicExec {
5048
cache: PlanProperties,
51-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
49+
db: Arc<DB<Music, TokioExecutor>>,
5250
projection: Option<Vec<usize>>,
5351
limit: Option<usize>,
5452
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
@@ -97,10 +95,7 @@ impl TableProvider for MusicProvider {
9795
}
9896

9997
impl MusicExec {
100-
fn new(
101-
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
102-
projection: Option<&Vec<usize>>,
103-
) -> Self {
98+
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
10499
let schema = Music::arrow_schema();
105100
let schema = if let Some(projection) = &projection {
106101
Arc::new(schema.project(projection).unwrap())

parquet-lru/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ name = "parquet-lru"
77
version = "0.1.0"
88

99
[features]
10+
default = []
1011
foyer = ["dep:foyer", "dep:serde"]
11-
full = ["foyer"]
1212

1313
[dependencies]
1414
bytes = { version = "1.8.0", features = ["serde"] }
@@ -17,4 +17,3 @@ futures-core = "0.3.31"
1717
futures-util = "0.3.31"
1818
parquet = { version = "53.2.0", features = ["async"] }
1919
serde = { version = "1.0.214", optional = true }
20-
thiserror = "2.0.3"

parquet-lru/src/dyn.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::{ops::Range, sync::Arc};
2+
3+
use bytes::Bytes;
4+
use futures_core::future::BoxFuture;
5+
use parquet::{
6+
arrow::async_reader::AsyncFileReader, errors::Result, file::metadata::ParquetMetaData,
7+
};
8+
9+
use crate::LruCache;
10+
11+
pub struct BoxedFileReader {
12+
inner: Box<dyn AsyncFileReader>,
13+
}
14+
15+
impl BoxedFileReader {
16+
pub fn new<T: AsyncFileReader + 'static>(inner: T) -> Self {
17+
Self {
18+
inner: Box::new(inner),
19+
}
20+
}
21+
}
22+
23+
impl AsyncFileReader for BoxedFileReader {
24+
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
25+
self.inner.get_bytes(range)
26+
}
27+
28+
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
29+
self.inner.get_metadata()
30+
}
31+
32+
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
33+
self.inner.get_byte_ranges(ranges)
34+
}
35+
}
36+
37+
pub trait DynLruCache<K> {
38+
fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader>;
39+
}
40+
41+
impl<K, C> DynLruCache<K> for C
42+
where
43+
K: 'static + Send,
44+
C: LruCache<K> + Sized + Send + Sync,
45+
{
46+
fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader> {
47+
Box::pin(async move { BoxedFileReader::new(self.get_reader(key, reader).await) })
48+
}
49+
}

parquet-lru/src/foyer.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use parquet::{
1010
};
1111
use serde::{Deserialize, Serialize};
1212

13-
use crate::{Error, LruCache, Options};
13+
use crate::LruCache;
1414

1515
#[derive(Clone)]
1616
pub struct FoyerCache<K>
@@ -32,23 +32,15 @@ impl<K> LruCache<K> for FoyerCache<K>
3232
where
3333
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
3434
{
35-
type LruReader<R: AsyncFileReader + 'static> = FoyerReader<K, R>;
35+
type LruReader<R>
36+
= FoyerReader<K, R>
37+
where
38+
R: AsyncFileReader + 'static;
3639

37-
async fn new(options: Options) -> Result<Self, Error> {
38-
Ok(Self {
39-
inner: Arc::new(FoyerCacheInner {
40-
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
41-
data: foyer::HybridCacheBuilder::new()
42-
.memory(options.data_capacity)
43-
.storage(foyer::Engine::Large)
44-
.build()
45-
.await
46-
.map_err(|e| Error::External(e.into()))?,
47-
}),
48-
})
49-
}
50-
51-
async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> FoyerReader<K, R> {
40+
async fn get_reader<R>(&self, key: K, reader: R) -> FoyerReader<K, R>
41+
where
42+
R: AsyncFileReader,
43+
{
5244
FoyerReader::new(self.clone(), key, reader)
5345
}
5446
}

parquet-lru/src/lib.rs

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,57 @@
1+
mod r#dyn;
12
#[cfg(feature = "foyer")]
23
pub mod foyer;
34

45
use std::{future::Future, marker::PhantomData};
56

6-
use parquet::{arrow::async_reader::AsyncFileReader, errors::Result};
7-
use thiserror::Error;
7+
use parquet::arrow::async_reader::AsyncFileReader;
88

9-
#[derive(Default)]
10-
pub struct Options {
11-
meta_capacity: usize,
12-
data_capacity: usize,
13-
}
14-
15-
impl Options {
16-
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
17-
self.meta_capacity = meta_capacity;
18-
self
19-
}
20-
21-
pub fn data_capacity(mut self, data_capacity: usize) -> Self {
22-
self.data_capacity = data_capacity;
23-
self
24-
}
25-
}
9+
pub use crate::r#dyn::*;
2610

27-
pub trait LruCache<K>: Clone + Send + Sync + 'static {
28-
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;
29-
30-
fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;
11+
pub trait LruCache<K>
12+
where
13+
K: 'static,
14+
{
15+
type LruReader<R>: AsyncFileReader + 'static
16+
where
17+
R: AsyncFileReader + 'static;
3118

3219
fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
3320
where
3421
R: AsyncFileReader + 'static;
3522
}
3623

37-
#[derive(Clone, Default)]
38-
pub struct NoopCache<K> {
24+
#[derive(Default)]
25+
pub struct NoCache<K> {
3926
_phantom: PhantomData<K>,
4027
}
4128

42-
impl<K> LruCache<K> for NoopCache<K>
43-
where
44-
K: Send + Sync + Clone + 'static,
45-
{
46-
type LruReader<R: AsyncFileReader + 'static> = R;
47-
48-
async fn new(_options: Options) -> Result<Self, Error> {
49-
Ok(Self {
29+
impl<K> Clone for NoCache<K> {
30+
fn clone(&self) -> Self {
31+
Self {
5032
_phantom: PhantomData,
51-
})
52-
}
53-
54-
async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
55-
reader
33+
}
5634
}
5735
}
5836

59-
#[derive(Debug, Error)]
60-
pub enum Error {
61-
#[error("External lru implementation error: {0}")]
62-
External(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
37+
unsafe impl<K> Send for NoCache<K> {}
38+
39+
unsafe impl<K> Sync for NoCache<K> {}
40+
41+
impl<K> LruCache<K> for NoCache<K>
42+
where
43+
K: 'static,
44+
{
45+
type LruReader<R>
46+
= R
47+
where
48+
R: AsyncFileReader + 'static;
49+
50+
#[allow(clippy::manual_async_fn)]
51+
fn get_reader<R>(&self, _key: K, reader: R) -> impl Future<Output = R> + Send
52+
where
53+
R: AsyncFileReader,
54+
{
55+
async move { reader }
56+
}
6357
}

0 commit comments

Comments
 (0)