Skip to content

Commit 91abf6a

Browse files
committed
feat: support parquet lru reader
1 parent 25b0389 commit 91abf6a

File tree

19 files changed

+566
-142
lines changed

19 files changed

+566
-142
lines changed

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
workspace = { members = ["tonbo_macros"] }
1+
workspace = { members = ["parquet-lru", "tonbo_macros"] }
22

33
[package]
44
description = "An embedded persistent KV database in Rust."
@@ -74,17 +74,18 @@ fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
7474
futures-core = "0.3"
7575
futures-io = "0.3"
7676
futures-util = "0.3"
77-
lockable = "0.0.8"
77+
lockable = "0.1.1"
7878
once_cell = "1"
7979
parquet = { version = "53", features = ["async"] }
80+
parquet-lru = { version = "0.1.0", path = "parquet-lru" }
8081
pin-project-lite = "0.2"
8182
regex = "1"
82-
thiserror = "1"
83+
thiserror = "2.0.3"
8384
tokio = { version = "1", features = ["io-util"], default-features = false }
8485
tokio-util = { version = "0.7" }
8586
tonbo_macros = { version = "0.2.0", path = "tonbo_macros" }
8687
tracing = "0.1"
87-
ulid = "1"
88+
ulid = { version = "1", features = ["serde"] }
8889

8990
# Only used for benchmarks
9091
log = "0.4.22"

bindings/python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
1515
"tokio",
1616
] }
1717
futures = { version = "0.3" }
18+
parquet-lru = { version = "0.1.0", path = "../../parquet-lru" }
1819
pyo3 = { version = "0.21.2", features = [
1920
"abi3",
2021
"abi3-py310",

bindings/python/src/db.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use parquet_lru::NoopCache;
34
use pyo3::{
45
prelude::*,
56
pyclass, pymethods,
@@ -9,6 +10,7 @@ use pyo3::{
910
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
1011
use tonbo::{
1112
executor::tokio::TokioExecutor,
13+
fs::FileId,
1214
record::{ColumnDesc, DynRecord},
1315
DB,
1416
};
@@ -28,7 +30,7 @@ type PyExecutor = TokioExecutor;
2830
pub struct TonboDB {
2931
desc: Arc<Vec<Column>>,
3032
primary_key_index: usize,
31-
db: Arc<DB<DynRecord, PyExecutor>>,
33+
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
3234
}
3335

3436
#[pymethods]

bindings/python/src/transaction.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::{mem::transmute, sync::Arc};
22

3+
use parquet_lru::NoopCache;
34
use pyo3::{
45
pyclass, pymethods,
56
types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple},
67
Bound, IntoPy, Py, PyAny, PyResult, Python,
78
};
89
use pyo3_asyncio::tokio::future_into_py;
9-
use tonbo::{record::DynRecord, transaction, Projection};
10+
use tonbo::{fs::FileId, record::DynRecord, transaction, Projection};
1011

1112
use crate::{
1213
column::Column,
@@ -18,14 +19,14 @@ use crate::{
1819

1920
#[pyclass]
2021
pub struct Transaction {
21-
txn: Option<transaction::Transaction<'static, DynRecord>>,
22+
txn: Option<transaction::Transaction<'static, DynRecord, NoopCache<FileId>>>,
2223
desc: Arc<Vec<Column>>,
2324
primary_key_index: usize,
2425
}
2526

2627
impl Transaction {
2728
pub(crate) fn new<'txn>(
28-
txn: transaction::Transaction<'txn, DynRecord>,
29+
txn: transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
2930
desc: Arc<Vec<Column>>,
3031
) -> Self {
3132
let primary_key_index = desc
@@ -37,8 +38,8 @@ impl Transaction {
3738
Transaction {
3839
txn: Some(unsafe {
3940
transmute::<
40-
transaction::Transaction<'txn, DynRecord>,
41-
transaction::Transaction<'static, DynRecord>,
41+
transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
42+
transaction::Transaction<'static, DynRecord, NoopCache<FileId>>,
4243
>(txn)
4344
}),
4445
desc,
@@ -84,8 +85,8 @@ impl Transaction {
8485
let txn = self.txn.as_ref().unwrap();
8586
let txn = unsafe {
8687
transmute::<
87-
&transaction::Transaction<'_, DynRecord>,
88-
&'static transaction::Transaction<'_, DynRecord>,
88+
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
89+
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
8990
>(txn)
9091
};
9192

@@ -169,8 +170,8 @@ impl Transaction {
169170
let txn = self.txn.as_ref().unwrap();
170171
let txn = unsafe {
171172
transmute::<
172-
&transaction::Transaction<'_, DynRecord>,
173-
&'static transaction::Transaction<'_, DynRecord>,
173+
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
174+
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
174175
>(txn)
175176
};
176177
let col_desc = self.desc.get(self.primary_key_index).unwrap();

parquet-lru/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
edition = "2021"
3+
name = "parquet-lru"
4+
version = "0.1.0"
5+
6+
[dependencies]
7+
bytes = { version = "1.8.0", features = ["serde"] }
8+
foyer = "0.12.2"
9+
futures-core = "0.3.31"
10+
futures-util = "0.3.31"
11+
parquet = { version = "53.2.0", features = ["async"] }
12+
serde = "1.0.214"
13+
thiserror = "2.0.3"

parquet-lru/src/lib.rs

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
use std::{future::Future, hash::Hash, marker::PhantomData, ops::Range, sync::Arc};
2+
3+
use bytes::Bytes;
4+
use futures_core::future::BoxFuture;
5+
use futures_util::future::FutureExt;
6+
use parquet::{
7+
arrow::async_reader::AsyncFileReader,
8+
errors::{ParquetError, Result},
9+
file::metadata::ParquetMetaData,
10+
};
11+
use serde::{Deserialize, Serialize};
12+
use thiserror::Error;
13+
14+
#[derive(Default)]
15+
pub struct Options {
16+
meta_capacity: usize,
17+
data_capacity: usize,
18+
}
19+
20+
impl Options {
21+
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
22+
self.meta_capacity = meta_capacity;
23+
self
24+
}
25+
26+
pub fn data_capacity(mut self, data_capacity: usize) -> Self {
27+
self.data_capacity = data_capacity;
28+
self
29+
}
30+
}
31+
32+
pub trait LruCache<K>: Clone + Send + Sync + 'static {
33+
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;
34+
35+
fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;
36+
37+
fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
38+
where
39+
R: AsyncFileReader + 'static;
40+
}
41+
42+
#[derive(Clone)]
43+
pub struct FoyerCache<K>
44+
where
45+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
46+
{
47+
inner: Arc<FoyerCacheInner<K>>,
48+
}
49+
50+
pub struct FoyerCacheInner<K>
51+
where
52+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
53+
{
54+
meta: foyer::Cache<K, Arc<ParquetMetaData>>,
55+
data: foyer::HybridCache<(K, Range<usize>), Bytes>,
56+
}
57+
58+
impl<K> LruCache<K> for FoyerCache<K>
59+
where
60+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
61+
{
62+
type LruReader<R: AsyncFileReader + 'static> = ParquetLru<K, R>;
63+
64+
async fn new(options: Options) -> Result<Self, Error> {
65+
Ok(Self {
66+
inner: Arc::new(FoyerCacheInner {
67+
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
68+
data: foyer::HybridCacheBuilder::new()
69+
.memory(options.data_capacity)
70+
.storage(foyer::Engine::Large)
71+
.build()
72+
.await
73+
.map_err(|e| Error::Foyer(e.into()))?,
74+
}),
75+
})
76+
}
77+
78+
async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> ParquetLru<K, R> {
79+
ParquetLru::new(self.clone(), key, reader)
80+
}
81+
}
82+
83+
pub struct ParquetLru<K, R>
84+
where
85+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
86+
{
87+
cache: FoyerCache<K>,
88+
key: K,
89+
reader: R,
90+
}
91+
92+
impl<K, R> ParquetLru<K, R>
93+
where
94+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
95+
R: AsyncFileReader,
96+
{
97+
fn new(cache: FoyerCache<K>, key: K, reader: R) -> Self {
98+
Self { cache, key, reader }
99+
}
100+
}
101+
102+
impl<K, R> AsyncFileReader for ParquetLru<K, R>
103+
where
104+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
105+
R: AsyncFileReader,
106+
{
107+
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
108+
async move {
109+
if let Some(data) = self
110+
.cache
111+
.inner
112+
.data
113+
.get(&(self.key.clone(), range.clone()))
114+
.await
115+
.map_err(|e| ParquetError::External(e.into()))?
116+
{
117+
Ok(data.value().clone())
118+
} else {
119+
let data = self.reader.get_bytes(range.clone()).await?;
120+
self.cache
121+
.inner
122+
.data
123+
.insert((self.key.clone(), range), data.clone());
124+
Ok(data)
125+
}
126+
}
127+
.boxed()
128+
}
129+
130+
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
131+
async move {
132+
if let Some(meta) = self.cache.inner.meta.get(&self.key) {
133+
Ok(meta.value().clone())
134+
} else {
135+
let meta = self.reader.get_metadata().await?;
136+
self.cache.inner.meta.insert(self.key.clone(), meta.clone());
137+
Ok(meta)
138+
}
139+
}
140+
.boxed()
141+
}
142+
143+
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
144+
async move {
145+
let mut missed = Vec::with_capacity(ranges.len());
146+
let mut results = Vec::with_capacity(ranges.len());
147+
for (id, range) in ranges.iter().enumerate() {
148+
if let Some(data) = self
149+
.cache
150+
.inner
151+
.data
152+
.get(&(self.key.clone(), range.clone()))
153+
.await
154+
.map_err(|e| ParquetError::External(e.into()))?
155+
{
156+
results.push((id, data.value().clone()));
157+
} else {
158+
missed.push((id, range));
159+
}
160+
}
161+
if !missed.is_empty() {
162+
let data = self
163+
.reader
164+
.get_byte_ranges(missed.iter().map(|&(_, r)| r.clone()).collect())
165+
.await?;
166+
for (id, range) in missed {
167+
let data = data[id].clone();
168+
self.cache
169+
.inner
170+
.data
171+
.insert((self.key.clone(), range.clone()), data.clone());
172+
results.push((id, data));
173+
}
174+
}
175+
results.sort_by_key(|(id, _)| *id);
176+
Ok(results.into_iter().map(|(_, data)| data).collect())
177+
}
178+
.boxed()
179+
}
180+
}
181+
182+
#[derive(Clone, Default)]
183+
pub struct NoopCache<K> {
184+
_phantom: PhantomData<K>,
185+
}
186+
187+
impl<K> LruCache<K> for NoopCache<K>
188+
where
189+
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
190+
{
191+
type LruReader<R: AsyncFileReader + 'static> = R;
192+
193+
async fn new(_options: Options) -> Result<Self, Error> {
194+
Ok(Self {
195+
_phantom: PhantomData,
196+
})
197+
}
198+
199+
async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
200+
reader
201+
}
202+
}
203+
204+
#[derive(Debug, Error)]
205+
pub enum Error {
206+
#[error("Foyer error: {0}")]
207+
Foyer(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
208+
}

0 commit comments

Comments
 (0)