Skip to content

Commit 2e3b9ae

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/cache-reader
2 parents d78b778 + d10083e commit 2e3b9ae

File tree

29 files changed

+550
-119
lines changed

29 files changed

+550
-119
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484
uses: actions-rs/cargo@v1
8585
with:
8686
command: run
87-
args: --example declare --all-features
87+
args: --example declare --features bytes,tokio
8888

8989
benchmark:
9090
name: Rust benchmark
@@ -109,7 +109,7 @@ jobs:
109109
uses: actions-rs/cargo@v1
110110
with:
111111
command: bench
112-
args: --all-features
112+
args: --features bench
113113

114114
- name: Comment on PR using GitHub CLI
115115
env:

.github/workflows/ci_wasm.yml

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
name: WASM CI
2+
3+
on:
4+
push:
5+
pull_request:
6+
workflow_dispatch:
7+
8+
env:
9+
CARGO_TERM_COLOR: always
10+
CARGO_REGISTRIES_MY_REGISTRY_INDEX: https://github.com/rust-lang/crates.io-index
11+
12+
13+
jobs:
14+
check:
15+
name: Rust project wasm check
16+
runs-on: ${{ matrix.os }}
17+
strategy:
18+
matrix:
19+
os:
20+
- ubuntu-latest
21+
steps:
22+
- uses: actions/checkout@v4
23+
- name: Install Rust toolchain
24+
uses: actions-rs/toolchain@v1
25+
with:
26+
toolchain: stable
27+
override: true
28+
components: rustfmt, clippy
29+
30+
- name: Run cargo clippy
31+
uses: actions-rs/cargo@v1
32+
with:
33+
command: check
34+
35+
- name: Setup for wasm32
36+
run: |
37+
rustup target add wasm32-unknown-unknown
38+
39+
- name: Run cargo build
40+
uses: actions-rs/cargo@v1
41+
with:
42+
command: build
43+
args: --target wasm32-unknown-unknown --no-default-features --features aws,bytes,opfs
44+
45+
- name: Install Chrome Environment
46+
run: |
47+
mkdir -p /tmp/chrome
48+
wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url')
49+
wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url')
50+
unzip chromedriver-linux64.zip
51+
unzip chrome-linux64.zip
52+
cp -r chrome-linux64/ /tmp/chrome/
53+
cp -r chromedriver-linux64 /tmp/chrome/chromedriver
54+
55+
- name: Setup wasm-pack
56+
run: |
57+
cargo install wasm-pack
58+
59+
- name: Run wasm-pack test
60+
run: |
61+
export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/
62+
wasm-pack test --chrome --headless --test wasm --no-default-features --features aws,bytes,opfs

Cargo.toml

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,31 @@ version = "0.2.0"
1414
msrv = "1.79.0"
1515

1616
[features]
17+
aws = ["fusio-dispatch/aws", "fusio/aws"]
1718
bench = ["redb", "rocksdb", "sled"]
1819
bytes = ["dep:bytes"]
1920
datafusion = ["dep:async-trait", "dep:datafusion"]
20-
default = ["bytes", "tokio"]
21+
default = ["aws", "bytes", "tokio", "tokio-http"]
2122
load_tbl = []
23+
object-store = ["fusio/object_store"]
24+
opfs = [
25+
"dep:wasm-bindgen-futures",
26+
"fusio-dispatch/opfs",
27+
"fusio-parquet/opfs",
28+
"fusio/opfs",
29+
]
2230
redb = ["dep:redb"]
2331
rocksdb = ["dep:rocksdb"]
2432
sled = ["dep:sled"]
25-
tokio = ["tokio/fs"]
33+
tokio = [
34+
"fusio-dispatch/tokio",
35+
"fusio-parquet/tokio",
36+
"fusio/tokio",
37+
"parquet/default",
38+
"tokio/fs",
39+
]
40+
tokio-http = ["fusio/tokio-http"]
41+
wasm = ["aws", "bytes", "opfs"]
2642

2743
[[example]]
2844
name = "declare"
@@ -58,25 +74,25 @@ crc32fast = "1"
5874
crossbeam-skiplist = "0.1"
5975
datafusion = { version = "42", optional = true }
6076
flume = { version = "0.11", features = ["async"] }
61-
fusio = { package = "fusio", version = "0.3.3", features = [
62-
"aws",
77+
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.3", features = [
6378
"dyn",
6479
"fs",
65-
"object_store",
66-
"tokio",
67-
"tokio-http",
6880
] }
69-
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [
70-
"aws",
71-
"tokio",
72-
] }
73-
fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
81+
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.1" }
82+
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-parquet", version = "0.2.1" }
7483
futures-core = "0.3"
7584
futures-io = "0.3"
7685
futures-util = "0.3"
7786
lockable = "0.1.1"
7887
once_cell = "1"
79-
parquet = { version = "53", features = ["async"] }
88+
parquet = { version = "53", default-features = false, features = [
89+
"async",
90+
"base64",
91+
"brotli",
92+
"flate2",
93+
"lz4",
94+
"snap",
95+
] }
8096
parquet-lru = { version = "0.1.0", path = "parquet-lru" }
8197
pin-project-lite = "0.2"
8298
regex = "1"
@@ -93,18 +109,28 @@ redb = { version = "2", optional = true }
93109
rocksdb = { version = "0.22", optional = true }
94110
sled = { version = "0.34", optional = true }
95111

112+
[target.'cfg(target_arch = "wasm32")'.dependencies]
113+
wasm-bindgen = "0.2.95"
114+
wasm-bindgen-futures = { version = "0.4.45", optional = true }
115+
96116
[dev-dependencies]
97117
bincode = "1"
98-
comfy-table = "7"
99-
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
100118
fastrand = "2"
101119
futures = { version = "0.3" }
102-
mimalloc = "0.1"
103120
serde = "1"
104121
tempfile = "3"
105-
tokio = { version = "1", features = ["full"] }
106122
trybuild = "1.0"
107123

124+
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
125+
comfy-table = "7"
126+
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
127+
mimalloc = "0.1"
128+
tokio = { version = "1", features = ["full"] }
129+
130+
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
131+
wasm-bindgen = "0.2.95"
132+
wasm-bindgen-test = "0.3.9"
133+
108134
[target.'cfg(unix)'.dev-dependencies]
109135
pprof = { version = "0.13", features = ["criterion", "flamegraph"] }
110136

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,4 @@ async fn main() {
145145

146146
## Contributing to Tonbo
147147
Follow the Contributing Guide to [contribute](https://github.com/tonbo-io/tonbo/blob/main/CONTRIBUTING.md).
148-
Please feel free to ask any question or contact us on Github [Discussions](https://github.com/orgs/tonbo-io/discussions) or [issues](https://github.com/tonbo-io/tonbo/issues).
148+
Please feel free to ask any question or contact us on Github [Discussions](https://github.com/tonbo-io/tonbo/discussions) or [issues](https://github.com/tonbo-io/tonbo/issues).

bindings/python/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ crate-type = ["cdylib"]
99
[workspace]
1010

1111
[dependencies]
12-
fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
13-
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
12+
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.1", features = [
13+
"aws",
14+
"tokio",
15+
] }
16+
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.0", features = [
1417
"aws",
1518
"tokio",
1619
] }

src/compaction/mod.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
88
use parquet_lru::LruCache;
99
use thiserror::Error;
1010
use tokio::sync::oneshot;
11-
use ulid::Ulid;
1211

1312
use crate::{
1413
fs::{manager::StoreManager, FileId, FileType},
@@ -66,7 +65,7 @@ where
6665
parquet_lru_cache: C,
6766
) -> Result<(), CompactionError<R>>
6867
where
69-
C: LruCache<Ulid> + Unpin,
68+
C: LruCache<FileId> + Unpin,
7069
{
7170
let mut guard = self.schema.write().await;
7271

@@ -156,7 +155,7 @@ where
156155
AsyncWriter::new(
157156
level_0_fs
158157
.open_options(
159-
&option.table_path(&gen, 0),
158+
&option.table_path(gen, 0),
160159
FileType::Parquet.open_options(false),
161160
)
162161
.await?,
@@ -206,7 +205,7 @@ where
206205
parquet_cache: &C,
207206
) -> Result<(), CompactionError<R>>
208207
where
209-
C: LruCache<Ulid> + Unpin,
208+
C: LruCache<FileId> + Unpin,
210209
{
211210
let mut level = 0;
212211

@@ -226,7 +225,7 @@ where
226225
for scope in meet_scopes_l.iter() {
227226
let file = level_fs
228227
.open_options(
229-
&option.table_path(&scope.gen, level),
228+
&option.table_path(scope.gen, level),
230229
FileType::Parquet.open_options(true),
231230
)
232231
.await?;
@@ -401,7 +400,7 @@ where
401400
fs: &Arc<dyn DynFs>,
402401
) -> Result<(), CompactionError<R>>
403402
where
404-
C: LruCache<Ulid> + Unpin,
403+
C: LruCache<FileId> + Unpin,
405404
{
406405
let mut stream = MergeStream::<R, C>::from_vec(streams, u32::MAX.into()).await?;
407406

@@ -472,12 +471,12 @@ where
472471
debug_assert!(min.is_some());
473472
debug_assert!(max.is_some());
474473

475-
let gen = Ulid::new();
474+
let gen = FileId::new();
476475
let columns = builder.finish(None);
477476
let mut writer = AsyncArrowWriter::try_new(
478477
AsyncWriter::new(
479478
fs.open_options(
480-
&option.table_path(&gen, level),
479+
&option.table_path(gen, level),
481480
FileType::Parquet.open_options(false),
482481
)
483482
.await?,
@@ -521,7 +520,7 @@ where
521520
EmptyLevel,
522521
}
523522

524-
#[cfg(test)]
523+
#[cfg(all(test, feature = "tokio"))]
525524
pub(crate) mod tests {
526525
use std::sync::{atomic::AtomicU32, Arc};
527526

@@ -582,7 +581,7 @@ pub(crate) mod tests {
582581
let mut writer = AsyncArrowWriter::try_new(
583582
AsyncWriter::new(
584583
fs.open_options(
585-
&option.table_path(&gen, level),
584+
&option.table_path(gen, level),
586585
FileType::Parquet.open_options(false),
587586
)
588587
.await?,

src/executor.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
use std::future::Future;
22

3+
use fusio::MaybeSend;
4+
35
pub trait Executor {
46
fn spawn<F>(&self, future: F)
57
where
6-
F: Future<Output = ()> + Send + 'static;
8+
F: Future<Output = ()> + MaybeSend + 'static;
79
}
810

9-
#[cfg(any(test, feature = "tokio"))]
11+
#[cfg(feature = "tokio")]
1012
pub mod tokio {
1113
use std::future::Future;
1214

15+
use fusio::MaybeSend;
1316
use tokio::runtime::Handle;
1417

1518
use super::Executor;
@@ -36,9 +39,43 @@ pub mod tokio {
3639
impl Executor for TokioExecutor {
3740
fn spawn<F>(&self, future: F)
3841
where
39-
F: Future<Output = ()> + Send + 'static,
42+
F: Future<Output = ()> + MaybeSend + 'static,
4043
{
4144
self.handle.spawn(future);
4245
}
4346
}
4447
}
48+
49+
#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
50+
pub mod opfs {
51+
use std::future::Future;
52+
53+
use fusio::MaybeSend;
54+
use wasm_bindgen::prelude::*;
55+
56+
use super::Executor;
57+
58+
#[wasm_bindgen]
59+
pub struct OpfsExecutor();
60+
61+
impl Default for OpfsExecutor {
62+
fn default() -> Self {
63+
Self {}
64+
}
65+
}
66+
67+
impl OpfsExecutor {
68+
pub fn new() -> Self {
69+
Self {}
70+
}
71+
}
72+
73+
impl Executor for OpfsExecutor {
74+
fn spawn<F>(&self, future: F)
75+
where
76+
F: Future<Output = ()> + MaybeSend + 'static,
77+
{
78+
wasm_bindgen_futures::spawn_local(future);
79+
}
80+
}
81+
}

src/inmem/immutable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ where
212212
}
213213
}
214214

215-
#[cfg(test)]
215+
#[cfg(all(test, feature = "tokio"))]
216216
pub(crate) mod tests {
217217
use std::{mem, sync::Arc};
218218

0 commit comments

Comments
 (0)