Skip to content

Commit 1820be6

Browse files
committed
feat: support parquet lru reader
1 parent 25b0389 commit 1820be6

File tree

21 files changed

+587
-146
lines changed

21 files changed

+587
-146
lines changed

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
workspace = { members = ["tonbo_macros"] }
1+
workspace = { members = ["parquet-lru", "tonbo_macros"] }
22

33
[package]
44
description = "An embedded persistent KV database in Rust."
@@ -74,17 +74,18 @@ fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
7474
futures-core = "0.3"
7575
futures-io = "0.3"
7676
futures-util = "0.3"
77-
lockable = "0.0.8"
77+
lockable = "0.1.1"
7878
once_cell = "1"
7979
parquet = { version = "53", features = ["async"] }
80+
parquet-lru = { version = "0.1.0", path = "parquet-lru" }
8081
pin-project-lite = "0.2"
8182
regex = "1"
82-
thiserror = "1"
83+
thiserror = "2.0.3"
8384
tokio = { version = "1", features = ["io-util"], default-features = false }
8485
tokio-util = { version = "0.7" }
8586
tonbo_macros = { version = "0.2.0", path = "tonbo_macros" }
8687
tracing = "0.1"
87-
ulid = "1"
88+
ulid = { version = "1", features = ["serde"] }
8889

8990
# Only used for benchmarks
9091
log = "0.4.22"

bindings/python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
1515
"tokio",
1616
] }
1717
futures = { version = "0.3" }
18+
parquet-lru = { version = "0.1.0", path = "../../parquet-lru" }
1819
pyo3 = { version = "0.21.2", features = [
1920
"abi3",
2021
"abi3-py310",

bindings/python/src/db.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use parquet_lru::NoopCache;
34
use pyo3::{
45
prelude::*,
56
pyclass, pymethods,
@@ -9,6 +10,7 @@ use pyo3::{
910
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
1011
use tonbo::{
1112
executor::tokio::TokioExecutor,
13+
fs::FileId,
1214
record::{ColumnDesc, DynRecord},
1315
DB,
1416
};
@@ -28,7 +30,7 @@ type PyExecutor = TokioExecutor;
2830
pub struct TonboDB {
2931
desc: Arc<Vec<Column>>,
3032
primary_key_index: usize,
31-
db: Arc<DB<DynRecord, PyExecutor>>,
33+
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
3234
}
3335

3436
#[pymethods]

bindings/python/src/transaction.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::{mem::transmute, sync::Arc};
22

3+
use parquet_lru::NoopCache;
34
use pyo3::{
45
pyclass, pymethods,
56
types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple},
67
Bound, IntoPy, Py, PyAny, PyResult, Python,
78
};
89
use pyo3_asyncio::tokio::future_into_py;
9-
use tonbo::{record::DynRecord, transaction, Projection};
10+
use tonbo::{fs::FileId, record::DynRecord, transaction, Projection};
1011

1112
use crate::{
1213
column::Column,
@@ -18,14 +19,14 @@ use crate::{
1819

1920
#[pyclass]
2021
pub struct Transaction {
21-
txn: Option<transaction::Transaction<'static, DynRecord>>,
22+
txn: Option<transaction::Transaction<'static, DynRecord, NoopCache<FileId>>>,
2223
desc: Arc<Vec<Column>>,
2324
primary_key_index: usize,
2425
}
2526

2627
impl Transaction {
2728
pub(crate) fn new<'txn>(
28-
txn: transaction::Transaction<'txn, DynRecord>,
29+
txn: transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
2930
desc: Arc<Vec<Column>>,
3031
) -> Self {
3132
let primary_key_index = desc
@@ -37,8 +38,8 @@ impl Transaction {
3738
Transaction {
3839
txn: Some(unsafe {
3940
transmute::<
40-
transaction::Transaction<'txn, DynRecord>,
41-
transaction::Transaction<'static, DynRecord>,
41+
transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
42+
transaction::Transaction<'static, DynRecord, NoopCache<FileId>>,
4243
>(txn)
4344
}),
4445
desc,
@@ -84,8 +85,8 @@ impl Transaction {
8485
let txn = self.txn.as_ref().unwrap();
8586
let txn = unsafe {
8687
transmute::<
87-
&transaction::Transaction<'_, DynRecord>,
88-
&'static transaction::Transaction<'_, DynRecord>,
88+
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
89+
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
8990
>(txn)
9091
};
9192

@@ -169,8 +170,8 @@ impl Transaction {
169170
let txn = self.txn.as_ref().unwrap();
170171
let txn = unsafe {
171172
transmute::<
172-
&transaction::Transaction<'_, DynRecord>,
173-
&'static transaction::Transaction<'_, DynRecord>,
173+
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
174+
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
174175
>(txn)
175176
};
176177
let col_desc = self.desc.get(self.primary_key_index).unwrap();

examples/datafusion.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ use datafusion::{
2626
use fusio::path::Path;
2727
use futures_core::Stream;
2828
use futures_util::StreamExt;
29+
use parquet_lru::NoopCache;
2930
use tokio::fs;
3031
use tonbo::{
31-
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
32+
executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record,
33+
DbOption, DB,
3234
};
3335
use tonbo_macros::Record;
3436

@@ -41,12 +43,12 @@ pub struct Music {
4143
}
4244

4345
struct MusicProvider {
44-
db: Arc<DB<Music, TokioExecutor>>,
46+
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
4547
}
4648

4749
struct MusicExec {
4850
cache: PlanProperties,
49-
db: Arc<DB<Music, TokioExecutor>>,
51+
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
5052
projection: Option<Vec<usize>>,
5153
limit: Option<usize>,
5254
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
@@ -95,7 +97,10 @@ impl TableProvider for MusicProvider {
9597
}
9698

9799
impl MusicExec {
98-
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
100+
fn new(
101+
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
102+
projection: Option<&Vec<usize>>,
103+
) -> Self {
99104
let schema = Music::arrow_schema();
100105
let schema = if let Some(projection) = &projection {
101106
Arc::new(schema.project(projection).unwrap())

parquet-lru/Cargo.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
edition = "2021"
3+
name = "parquet-lru"
4+
version = "0.1.0"
5+
6+
[features]
7+
foyer = ["dep:foyer", "dep:serde"]
8+
full = ["foyer"]
9+
10+
[dependencies]
11+
bytes = { version = "1.8.0", features = ["serde"] }
12+
foyer = { version = "0.12.2", optional = true }
13+
futures-core = "0.3.31"
14+
futures-util = "0.3.31"
15+
parquet = { version = "53.2.0", features = ["async"] }
16+
serde = { version = "1.0.214", optional = true }
17+
thiserror = "2.0.3"

parquet-lru/src/foyer.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use std::{hash::Hash, ops::Range, sync::Arc};
2+
3+
use bytes::Bytes;
4+
use futures_core::future::BoxFuture;
5+
use futures_util::FutureExt;
6+
use parquet::{
7+
arrow::async_reader::AsyncFileReader,
8+
errors::{ParquetError, Result},
9+
file::metadata::ParquetMetaData,
10+
};
11+
use serde::{Deserialize, Serialize};
12+
13+
use crate::{Error, LruCache, Options};
14+
15+
#[derive(Clone)]
16+
pub struct FoyerCache<K>
17+
where
18+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
19+
{
20+
inner: Arc<FoyerCacheInner<K>>,
21+
}
22+
23+
pub struct FoyerCacheInner<K>
24+
where
25+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
26+
{
27+
meta: foyer::Cache<K, Arc<ParquetMetaData>>,
28+
data: foyer::HybridCache<(K, Range<usize>), Bytes>,
29+
}
30+
31+
impl<K> LruCache<K> for FoyerCache<K>
32+
where
33+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
34+
{
35+
type LruReader<R: AsyncFileReader + 'static> = FoyerReader<K, R>;
36+
37+
async fn new(options: Options) -> Result<Self, Error> {
38+
Ok(Self {
39+
inner: Arc::new(FoyerCacheInner {
40+
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
41+
data: foyer::HybridCacheBuilder::new()
42+
.memory(options.data_capacity)
43+
.storage(foyer::Engine::Large)
44+
.build()
45+
.await
46+
.map_err(|e| Error::External(e.into()))?,
47+
}),
48+
})
49+
}
50+
51+
async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> FoyerReader<K, R> {
52+
FoyerReader::new(self.clone(), key, reader)
53+
}
54+
}
55+
56+
pub struct FoyerReader<K, R>
57+
where
58+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
59+
{
60+
cache: FoyerCache<K>,
61+
key: K,
62+
reader: R,
63+
}
64+
65+
impl<K, R> FoyerReader<K, R>
66+
where
67+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
68+
R: AsyncFileReader,
69+
{
70+
fn new(cache: FoyerCache<K>, key: K, reader: R) -> Self {
71+
Self { cache, key, reader }
72+
}
73+
}
74+
75+
impl<K, R> AsyncFileReader for FoyerReader<K, R>
76+
where
77+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
78+
R: AsyncFileReader,
79+
{
80+
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
81+
async move {
82+
if let Some(data) = self
83+
.cache
84+
.inner
85+
.data
86+
.get(&(self.key.clone(), range.clone()))
87+
.await
88+
.map_err(|e| ParquetError::External(e.into()))?
89+
{
90+
Ok(data.value().clone())
91+
} else {
92+
let data = self.reader.get_bytes(range.clone()).await?;
93+
self.cache
94+
.inner
95+
.data
96+
.insert((self.key.clone(), range), data.clone());
97+
Ok(data)
98+
}
99+
}
100+
.boxed()
101+
}
102+
103+
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
104+
async move {
105+
if let Some(meta) = self.cache.inner.meta.get(&self.key) {
106+
Ok(meta.value().clone())
107+
} else {
108+
let meta = self.reader.get_metadata().await?;
109+
self.cache.inner.meta.insert(self.key.clone(), meta.clone());
110+
Ok(meta)
111+
}
112+
}
113+
.boxed()
114+
}
115+
116+
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
117+
async move {
118+
let mut missed = Vec::with_capacity(ranges.len());
119+
let mut results = Vec::with_capacity(ranges.len());
120+
for (id, range) in ranges.iter().enumerate() {
121+
if let Some(data) = self
122+
.cache
123+
.inner
124+
.data
125+
.get(&(self.key.clone(), range.clone()))
126+
.await
127+
.map_err(|e| ParquetError::External(e.into()))?
128+
{
129+
results.push((id, data.value().clone()));
130+
} else {
131+
missed.push((id, range));
132+
}
133+
}
134+
if !missed.is_empty() {
135+
let data = self
136+
.reader
137+
.get_byte_ranges(missed.iter().map(|&(_, r)| r.clone()).collect())
138+
.await?;
139+
for (id, range) in missed {
140+
let data = data[id].clone();
141+
self.cache
142+
.inner
143+
.data
144+
.insert((self.key.clone(), range.clone()), data.clone());
145+
results.push((id, data));
146+
}
147+
}
148+
results.sort_by_key(|(id, _)| *id);
149+
Ok(results.into_iter().map(|(_, data)| data).collect())
150+
}
151+
.boxed()
152+
}
153+
}

0 commit comments

Comments
 (0)