From a5fa5f2c2f94e8a31bdfe5b6e1b5867e75b7e3ba Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 12 Nov 2024 16:26:58 +0800 Subject: [PATCH 1/6] make tonbo build under wasm32 arch --- .github/workflows/ci_wasm.yml | 62 ++++++++ Cargo.toml | 59 +++++-- src/compaction/mod.rs | 255 +++++++++++++++++++++-------- src/executor.rs | 34 ++++ src/fs/manager.rs | 3 + src/fs/mod.rs | 95 ++++++++++- src/inmem/immutable.rs | 2 +- src/inmem/mutable.rs | 31 +++- src/lib.rs | 22 +-- src/ondisk/scan.rs | 2 + src/ondisk/sstable.rs | 51 ++++-- src/record/runtime/record.rs | 2 + src/serdes/arc.rs | 2 +- src/serdes/mod.rs | 9 +- src/stream/level.rs | 6 +- src/stream/mem_projection.rs | 2 +- src/stream/merge.rs | 2 +- src/stream/mod.rs | 2 + src/stream/package.rs | 2 +- src/transaction.rs | 4 +- src/trigger.rs | 2 +- src/version/cleaner.rs | 46 ++++-- src/version/mod.rs | 5 +- src/version/set.rs | 27 +++- tests/data_integrity.rs | 2 +- tests/wasm.rs | 291 ++++++++++++++++++++++++++++++++++ 26 files changed, 877 insertions(+), 143 deletions(-) create mode 100644 .github/workflows/ci_wasm.yml create mode 100644 tests/wasm.rs diff --git a/.github/workflows/ci_wasm.yml b/.github/workflows/ci_wasm.yml new file mode 100644 index 00000000..65f11389 --- /dev/null +++ b/.github/workflows/ci_wasm.yml @@ -0,0 +1,62 @@ +name: WASM CI + +on: + push: + pull_request: + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + CARGO_REGISTRIES_MY_REGISTRY_INDEX: https://github.com/rust-lang/crates.io-index + + +jobs: + check: + name: Rust project wasm check + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + + - name: Run cargo clippy + uses: actions-rs/cargo@v1 + with: + command: check + + - name: Setup for wasm32 + run: | + rustup target add wasm32-unknown-unknown + + - name: Run cargo build + uses: actions-rs/cargo@v1 + with: + command: build + args: --target wasm32-unknown-unknown --no-default-features --features aws,bytes,opfs + + - name: Install Chrome Environment + run: | + mkdir -p /tmp/chrome + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chrome | .[] | select(.platform == "linux64") | .url') + wget $(curl https://googlechromelabs.github.io/chrome-for-testing/known-good-versions-with-downloads.json | jq -r '.versions | sort_by(.version) | reverse | .[0] | .downloads.chromedriver | .[] | select(.platform == "linux64") | .url') + unzip chromedriver-linux64.zip + unzip chrome-linux64.zip + cp -r chrome-linux64/ /tmp/chrome/ + cp -r chromedriver-linux64 /tmp/chrome/chromedriver + + - name: Setup wasm-pack + run: | + cargo install wasm-pack + + - name: Run wasm-pack test + run: | + export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/ + wasm-pack test --chrome --headless --test wasm --no-default-features --features aws,bytes,opfs diff --git a/Cargo.toml b/Cargo.toml index c9dfa9dc..67e8e183 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,15 +14,30 @@ version = "0.2.0" msrv = "1.79.0" [features] +aws = ["fusio/aws", "fusio-dispatch/tokio"] bench = ["redb", "rocksdb", "sled"] bytes = ["dep:bytes"] datafusion = ["dep:async-trait", "dep:datafusion"] -default = ["bytes", "tokio"] +default = ["aws", "bytes", "tokio", "tokio-http"] load_tbl = [] +object-store = ["fusio/object_store"] +opfs = [ + "dep:wasm-bindgen-futures", + "fusio-dispatch/opfs", + "fusio/opfs", + "fusio-parquet/opfs", +] redb = ["dep:redb"] rocksdb = ["dep:rocksdb"] sled = ["dep:sled"] -tokio = ["tokio/fs"] +tokio = [ + "fusio-dispatch/tokio", + "fusio-parquet/tokio", + "fusio/tokio", + "parquet/default", + "tokio/fs", +] +tokio-http = ["fusio/tokio-http"] [[example]] name = "declare" @@ -58,25 +73,25 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { package = "fusio", version = "0.3.3", features = [ - "aws", +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.3", features = [ "dyn", "fs", - "object_store", - "tokio", - "tokio-http", ] } -fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [ - "aws", - "tokio", -] } -fusio-parquet = { package = "fusio-parquet", version = "0.2.1" } +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.1"} +fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-parquet", version = "0.2.1" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" lockable = "0.0.8" once_cell = "1" -parquet = { version = "53", features = ["async"] } +parquet = { version = "53", default-features = false, features = [ + "async", + "base64", + "brotli", + "flate2", + "lz4", + "snap", +] } pin-project-lite = "0.2" regex = "1" thiserror = "1" @@ -86,6 +101,10 @@ tonbo_macros = { version = "0.2.0", path = "tonbo_macros" } tracing = "0.1" ulid = "1" +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen = "0.2.95" +wasm-bindgen-futures = { version = "0.4.45", optional = true } + # Only used for benchmarks log = "0.4.22" redb = { version = "2", optional = true } @@ -94,16 +113,22 @@ sled = { version = "0.34", optional = true } [dev-dependencies] bincode = "1" -comfy-table = "7" -criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } fastrand = "2" futures = { version = "0.3" } -mimalloc = "0.1" serde = "1" tempfile = "3" -tokio = { version = "1", features = ["full"] } trybuild = "1.0" +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +comfy-table = "7" +criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } +mimalloc = "0.1" +tokio = { version = "1", features = ["full"] } + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen = "0.2.95" +wasm-bindgen-test = "0.3.9" + [target.'cfg(unix)'.dev-dependencies] pprof = { version = "0.13", features = ["criterion", "flamegraph"] } diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index eac33d95..6eac7d80 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -1,7 +1,6 @@ use std::{cmp, collections::Bound, mem, pin::Pin, sync::Arc}; use async_lock::{RwLock, RwLockUpgradableReadGuard}; -use fusio::DynFs; use fusio_parquet::writer::AsyncWriter; use futures_util::StreamExt; use parquet::arrow::{AsyncArrowWriter, ProjectionMask}; @@ -38,10 +37,12 @@ where { pub(crate) option: Arc>, pub(crate) schema: Arc>>, - pub(crate) version_set: VersionSet, + pub(crate) version_set: Arc>, pub(crate) manager: Arc, } +unsafe impl Send for Compactor {} + impl Compactor where R: Record, @@ -49,7 +50,7 @@ where pub(crate) fn new( schema: Arc>>, option: Arc>, - version_set: VersionSet, + version_set: Arc>, manager: Arc, ) -> Self { Compactor:: { @@ -70,10 +71,20 @@ where } let trigger_clone = guard.trigger.clone(); - let mutable = mem::replace( - &mut guard.mutable, - Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?, - ); + #[cfg(feature = "opfs")] + let new_mutable = { + let option = self.option.clone(); + let manager = self.manager.clone(); + let (sender, receiver) = oneshot::channel(); + wasm_bindgen_futures::spawn_local(async move { + let _ = sender.send(Mutable::new(&option, trigger_clone, manager.base_fs()).await); + }); + receiver.await.unwrap()? + }; + #[cfg(not(feature = "opfs"))] + let new_mutable = + { Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await? }; + let mutable = mem::replace(&mut guard.mutable, new_mutable); let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?; guard.immutables.push((file_id, immutable)); @@ -86,11 +97,11 @@ where let excess = &guard.immutables[0..chunk_num]; if let Some(scope) = Self::minor_compaction( - &self.option, + self.option.clone(), recover_wal_ids, excess, &guard.record_instance, - &self.manager, + self.manager.clone(), ) .await? { @@ -101,13 +112,13 @@ where if self.option.is_threshold_exceeded_major(&version_ref, 0) { Self::major_compaction( &version_ref, - &self.option, + self.option.clone(), &scope.min, &scope.max, &mut version_edits, &mut delete_gens, &guard.record_instance, - &self.manager, + self.manager.clone(), ) .await?; } @@ -116,6 +127,20 @@ where ts: version_ref.increase_ts(), }); + #[cfg(feature = "opfs")] + { + let (tx, rx) = oneshot::channel(); + let version_set = self.version_set.clone(); + wasm_bindgen_futures::spawn_local(async move { + let _ = tx.send( + version_set + .apply_edits(version_edits, Some(delete_gens), false) + .await, + ); + }); + rx.await.unwrap()?; + } + #[cfg(not(feature = "opfs"))] self.version_set .apply_edits(version_edits, Some(delete_gens), false) .await?; @@ -128,34 +153,69 @@ where } pub(crate) async fn minor_compaction( - option: &DbOption, + option: Arc>, recover_wal_ids: Option>, batches: &[(Option, Immutable)], instance: &RecordInstance, - manager: &StoreManager, + manager: Arc, ) -> Result>, CompactionError> { if !batches.is_empty() { - let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); - let level_0_fs = manager.get_fs(level_0_path); - let mut min = None; let mut max = None; let gen = FileId::new(); let mut wal_ids = Vec::with_capacity(batches.len()); + let schema = instance.arrow_schema::().clone(); - let mut writer = AsyncArrowWriter::try_new( - AsyncWriter::new( - level_0_fs + #[cfg(feature = "opfs")] + let mut writer = { + let (tx, rx) = oneshot::channel(); + wasm_bindgen_futures::spawn_local(async move { + let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); + let level_0_fs = manager.get_fs(level_0_path); + + match level_0_fs .open_options( &option.table_path(&gen, 0), FileType::Parquet.open_options(false), ) - .await?, - ), - instance.arrow_schema::().clone(), - Some(option.write_parquet_properties.clone()), - )?; + .await + { + Ok(file) => { + let _ = tx.send( + AsyncArrowWriter::try_new( + AsyncWriter::new(file), + schema, + Some(option.write_parquet_properties.clone()), + ) + .map_err(CompactionError::Parquet), + ); + } + Err(err) => { + let _ = tx.send(Err(CompactionError::Fusio(err))); + return; + } + }; + }); + rx.await.map_err(|_| CompactionError::ChannelClose)?? + }; + #[cfg(not(feature = "opfs"))] + let mut writer = { + let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); + let level_0_fs = manager.get_fs(level_0_path); + AsyncArrowWriter::try_new( + AsyncWriter::new( + level_0_fs + .open_options( + &option.table_path(&gen, 0), + FileType::Parquet.open_options(false), + ) + .await?, + ), + schema, + Some(option.write_parquet_properties.clone()), + )? + }; if let Some(mut recover_wal_ids) = recover_wal_ids { wal_ids.append(&mut recover_wal_ids); @@ -188,13 +248,13 @@ where #[allow(clippy::too_many_arguments)] pub(crate) async fn major_compaction( version: &Version, - option: &DbOption, + option: Arc>, mut min: &R::Key, mut max: &R::Key, version_edits: &mut Vec>, delete_gens: &mut Vec<(FileId, usize)>, instance: &RecordInstance, - manager: &StoreManager, + manager: Arc, ) -> Result<(), CompactionError> { let mut level = 0; @@ -212,12 +272,43 @@ where // This Level if level == 0 { for scope in meet_scopes_l.iter() { - let file = level_fs - .open_options( - &option.table_path(&scope.gen, level), - FileType::Parquet.open_options(true), - ) - .await?; + #[cfg(feature = "opfs")] + let file = { + let (tx, rx) = oneshot::channel(); + let manager = manager.clone(); + let option = option.clone(); + let gen = scope.gen; + wasm_bindgen_futures::spawn_local(async move { + let level_path = + option.level_fs_path(level).unwrap_or(&option.base_path); + let level_fs = manager.get_fs(level_path); + match level_fs + .open_options( + &option.table_path(&gen, level), + FileType::Parquet.open_options(true), + ) + .await + { + Ok(file) => { + let _ = tx.send(Ok(file.into())); + } + Err(err) => { + let _ = tx.send(Err(err)); + } + }; + }); + rx.await.unwrap()? + }; + #[cfg(not(feature = "opfs"))] + let file = { + level_fs + .open_options( + &option.table_path(&scope.gen, level), + FileType::Parquet.open_options(true), + ) + .await? + .into() + }; streams.push(ScanStream::SsTable { inner: SsTable::open(file) @@ -251,6 +342,8 @@ where }); } if !meet_scopes_ll.is_empty() { + let level_path = option.level_fs_path(level).unwrap_or(&option.base_path); + let level_fs = manager.get_fs(level_path); // Next Level let (lower, upper) = Self::full_scope(&meet_scopes_ll)?; let level_scan_ll = LevelStream::new( @@ -271,12 +364,12 @@ where }); } Self::build_tables( - option, + option.clone(), version_edits, level + 1, streams, instance, - level_fs, + manager.clone(), ) .await?; @@ -379,12 +472,12 @@ where } async fn build_tables<'scan>( - option: &DbOption, + option: Arc>, version_edits: &mut Vec::Key>>, level: usize, streams: Vec>, instance: &RecordInstance, - fs: &Arc, + manager: Arc, ) -> Result<(), CompactionError> { let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; @@ -405,28 +498,28 @@ where if builder.written_size() >= option.max_sst_file_size { Self::build_table( - option, + option.clone(), version_edits, level, &mut builder, &mut min, &mut max, instance, - fs, + manager.clone(), ) .await?; } } if builder.written_size() > 0 { Self::build_table( - option, + option.clone(), version_edits, level, &mut builder, &mut min, &mut max, instance, - fs, + manager.clone(), ) .await?; } @@ -443,31 +536,69 @@ where #[allow(clippy::too_many_arguments)] async fn build_table( - option: &DbOption, + option: Arc>, version_edits: &mut Vec>, level: usize, builder: &mut ::Builder, min: &mut Option, max: &mut Option, instance: &RecordInstance, - fs: &Arc, + manager: Arc, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); debug_assert!(max.is_some()); let gen = Ulid::new(); let columns = builder.finish(None); - let mut writer = AsyncArrowWriter::try_new( - AsyncWriter::new( - fs.open_options( - &option.table_path(&gen, level), - FileType::Parquet.open_options(false), - ) - .await?, - ), - instance.arrow_schema::().clone(), - Some(option.write_parquet_properties.clone()), - )?; + let schema = instance.arrow_schema::().clone(); + #[cfg(feature = "opfs")] + let mut writer = { + let (tx, rx) = oneshot::channel(); + wasm_bindgen_futures::spawn_local(async move { + let level_path = option.level_fs_path(level - 1).unwrap_or(&option.base_path); + let fs = manager.get_fs(level_path); + + match fs + .open_options( + &option.table_path(&gen, level - 1), + FileType::Parquet.open_options(false), + ) + .await + { + Ok(writer) => { + let _ = tx.send( + AsyncArrowWriter::try_new( + AsyncWriter::new(writer), + schema, + Some(option.write_parquet_properties.clone()), + ) + .map_err(CompactionError::Parquet), + ); + } + Err(err) => { + let _ = tx.send(Err(CompactionError::Fusio(err))); + } + } + }); + + rx.await.unwrap()? + }; + #[cfg(not(feature = "opfs"))] + let mut writer = { + let level_path = option.level_fs_path(level - 1).unwrap_or(&option.base_path); + let fs = manager.get_fs(level_path); + AsyncArrowWriter::try_new( + AsyncWriter::new( + fs.open_options( + &option.table_path(&gen, level), + FileType::Parquet.open_options(false), + ) + .await?, + ), + schema, + Some(option.write_parquet_properties.clone()), + )? + }; writer.write(columns.as_record_batch()).await?; writer.close().await?; version_edits.push(VersionEdit::Add { @@ -504,7 +635,7 @@ where EmptyLevel, } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::sync::{atomic::AtomicU32, Arc}; @@ -673,14 +804,14 @@ pub(crate) mod tests { .unwrap(); let scope = Compactor::::minor_compaction( - &option, + Arc::new(option), None, &vec![ (Some(FileId::new()), batch_1), (Some(FileId::new()), batch_2), ], &RecordInstance::Normal, - &manager, + Arc::new(manager), ) .await .unwrap() @@ -737,14 +868,14 @@ pub(crate) mod tests { .unwrap(); let scope = Compactor::::minor_compaction( - &option, + Arc::new(option), None, &vec![ (Some(FileId::new()), batch_1), (Some(FileId::new()), batch_2), ], &instance, - &manager, + Arc::new(manager), ) .await .unwrap() @@ -803,13 +934,13 @@ pub(crate) mod tests { Compactor::::major_compaction( &version, - &option, + option.clone(), &min, &max, &mut version_edits, &mut vec![], &RecordInstance::Normal, - &manager, + Arc::new(manager), ) .await .unwrap(); @@ -1194,13 +1325,13 @@ pub(crate) mod tests { Compactor::::major_compaction( &version, - &option, + option.clone(), &min, &max, &mut version_edits, &mut vec![], &RecordInstance::Normal, - &manager, + Arc::new(manager), ) .await .unwrap(); diff --git a/src/executor.rs b/src/executor.rs index 79cc6daa..a83dff32 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -42,3 +42,37 @@ pub mod tokio { } } } + +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub mod opfs { + use std::future::Future; + + use wasm_bindgen::prelude::*; + + use super::Executor; + + #[derive(Debug)] + #[wasm_bindgen] + pub struct OpfsExecutor {} + + impl Default for OpfsExecutor { + fn default() -> Self { + Self {} + } + } + + impl OpfsExecutor { + pub fn new() -> Self { + Self {} + } + } + + impl Executor for OpfsExecutor { + fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { + wasm_bindgen_futures::spawn_local(future); + } + } +} diff --git a/src/fs/manager.rs b/src/fs/manager.rs index 6ae83720..7dca4b98 100644 --- a/src/fs/manager.rs +++ b/src/fs/manager.rs @@ -8,6 +8,9 @@ pub struct StoreManager { fs_map: HashMap>, } +unsafe impl Send for StoreManager {} +unsafe impl Sync for StoreManager {} + impl StoreManager { pub fn new( base_options: FsOptions, diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 1bffbb50..1ce3932c 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -5,7 +5,7 @@ use std::{ str::FromStr, }; -use fusio::{fs::OpenOptions, path::Path}; +use fusio::{dynamic::DynFile, fs::OpenOptions, path::Path, Read, Write}; use ulid::{DecodeError, Ulid}; pub(crate) type FileId = Ulid; @@ -51,3 +51,96 @@ pub(crate) fn parse_file_id(path: &Path, suffix: FileType) -> Result, +} + +unsafe impl Send for DynFileWrapper {} +unsafe impl Sync for DynFileWrapper {} + +pub struct DynWriteWrapper { + inner: Box, +} + +unsafe impl Send for DynWriteWrapper {} +unsafe impl Sync for DynWriteWrapper {} +impl DynWriteWrapper { + #[allow(unused)] + pub(crate) fn new(inner: Box) -> Self { + Self { inner } + } +} + +impl From> for DynWriteWrapper { + fn from(inner: Box) -> Self { + Self { inner } + } +} + +impl Write for DynWriteWrapper { + async fn write_all(&mut self, buf: B) -> (Result<(), fusio::Error>, B) { + self.inner.write_all(buf).await + } + + async fn flush(&mut self) -> Result<(), fusio::Error> { + self.inner.flush().await + } + + async fn close(&mut self) -> Result<(), fusio::Error> { + self.inner.close().await + } +} + +impl DynFileWrapper { + #[allow(unused)] + pub(crate) fn new(inner: Box) -> Self { + Self { inner } + } + + pub(crate) fn file(self) -> Box { + self.inner + } +} + +impl From> for DynFileWrapper { + fn from(inner: Box) -> Self { + Self { inner } + } +} + +impl Read for DynFileWrapper { + async fn read_exact_at( + &mut self, + buf: B, + pos: u64, + ) -> (Result<(), fusio::Error>, B) { + self.inner.read_exact_at(buf, pos).await + } + + async fn read_to_end_at( + &mut self, + buf: Vec, + pos: u64, + ) -> (Result<(), fusio::Error>, Vec) { + self.inner.read_to_end_at(buf, pos).await + } + + async fn size(&self) -> Result { + self.inner.size().await + } +} + +impl Write for DynFileWrapper { + async fn write_all(&mut self, buf: B) -> (Result<(), fusio::Error>, B) { + self.inner.write_all(buf).await + } + + async fn flush(&mut self) -> Result<(), fusio::Error> { + self.inner.flush().await + } + + async fn close(&mut self) -> Result<(), fusio::Error> { + self.inner.close().await + } +} diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index aad9f7ec..4077e282 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -212,7 +212,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{mem, sync::Arc}; diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 230baffa..ada1bc0a 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -9,7 +9,7 @@ use fusio::{buffered::BufWriter, DynFs, DynWrite}; use ulid::Ulid; use crate::{ - fs::{FileId, FileType}, + fs::{DynWriteWrapper, FileId, FileType}, inmem::immutable::Immutable, record::{Key, KeyRef, Record, RecordInstance}, timestamp::{ @@ -37,7 +37,7 @@ where R: Record, { pub(crate) data: SkipMap, Option>, - wal: Option, R>>>, + wal: Option>>, pub(crate) trigger: Arc + Send + Sync>>, } @@ -63,7 +63,10 @@ where option.wal_buffer_size, )) as Box; - wal = Some(Mutex::new(WalFile::new(file, file_id))); + wal = Some(Mutex::new(WalFile::new( + DynWriteWrapper::from(file), + file_id, + ))); }; Ok(Self { @@ -178,9 +181,23 @@ where let mut file_id = None; if let Some(wal) = self.wal { - let mut wal_guard = wal.lock().await; - wal_guard.flush().await?; - file_id = Some(wal_guard.file_id()); + #[cfg(feature = "opfs")] + { + let (tx, rx) = tokio::sync::oneshot::channel(); + wasm_bindgen_futures::spawn_local(async move { + let mut wal_guard = wal.lock().await; + wal_guard.flush().await.unwrap(); + + let _ = tx.send(Some(wal_guard.file_id())); + }); + file_id = rx.await.unwrap() + }; + #[cfg(not(feature = "opfs"))] + { + let mut wal_guard = wal.lock().await; + wal_guard.flush().await?; + file_id = Some(wal_guard.file_id()) + }; } Ok((file_id, Immutable::from((self.data, instance)))) @@ -205,7 +222,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{ops::Bound, sync::Arc}; diff --git a/src/lib.rs b/src/lib.rs index 09ac0fb7..2cb6b4b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,7 +176,7 @@ where E: Executor, { schema: Arc>>, - version_set: VersionSet, + version_set: Arc>, lock_map: LockMap, manager: Arc, _p: PhantomData, @@ -242,7 +242,8 @@ where let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); - let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?; + let version_set = + Arc::new(VersionSet::new(clean_sender, option.clone(), manager.clone()).await?); let schema = Arc::new(RwLock::new( Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, )); @@ -367,7 +368,7 @@ where ) -> impl Stream>> + 'scan { stream! { let schema = self.schema.read().await; - let manager = &self.manager; + let manager = self.manager.clone(); let current = self.version_set.current().await; let mut scan = Scan::new( &schema, @@ -617,7 +618,7 @@ where R: Record, { schema: &'scan Schema, - manager: &'scan StoreManager, + manager: Arc, lower: Bound<&'scan R::Key>, upper: Bound<&'scan R::Key>, ts: Timestamp, @@ -637,7 +638,7 @@ where { fn new( schema: &'scan Schema, - manager: &'scan StoreManager, + manager: Arc, (lower, upper): (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, version: &'scan Version, @@ -725,7 +726,7 @@ where } self.version .streams( - self.manager, + &self.manager, &mut streams, (self.lower, self.upper), self.ts, @@ -777,7 +778,7 @@ where } self.version .streams( - self.manager, + &self.manager, &mut streams, (self.lower, self.upper), self.ts, @@ -828,7 +829,7 @@ pub enum Projection { Parts(Vec), } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{ collections::{BTreeMap, Bound}, @@ -1254,8 +1255,9 @@ pub(crate) mod tests { let schema = Arc::new(RwLock::new(schema)); let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); - let version_set = - build_version_set(version, clean_sender, option.clone(), manager.clone()).await?; + let version_set = Arc::new( + build_version_set(version, clean_sender, option.clone(), manager.clone()).await?, + ); let mut compactor = Compactor::::new( schema.clone(), option.clone(), diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 5a8ce66d..e499c3d3 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -28,6 +28,8 @@ pin_project! { } } +unsafe impl<'scan, R> Send for SsTableScan<'scan, R> {} + impl SsTableScan<'_, R> { pub fn new( stream: ParquetRecordBatchStream, diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 2304d6d3..28acddb2 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -1,6 +1,6 @@ use std::{marker::PhantomData, ops::Bound}; -use fusio::{dynamic::DynFile, DynRead}; +use fusio::DynRead; use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; use parquet::arrow::{ @@ -10,6 +10,7 @@ use parquet::arrow::{ use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ + fs::DynFileWrapper, record::Record, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, TimestampedRef}, @@ -27,13 +28,42 @@ impl SsTable where R: Record, { - pub(crate) async fn open(file: Box) -> Result { - let size = file.size().await?; - - Ok(SsTable { - reader: AsyncReader::new(file, size).await?, - _marker: PhantomData, - }) + pub(crate) async fn open(file: DynFileWrapper) -> Result { + #[cfg(feature = "opfs")] + { + let (tx, rx) = tokio::sync::oneshot::channel(); + wasm_bindgen_futures::spawn_local(async move { + let size = match file.size().await { + Ok(size) => size, + Err(err) => { + let _ = tx.send(Err(err)); + return; + } + }; + let file = file.file(); + match AsyncReader::new(file, size).await { + Ok(file) => { + let _ = tx.send(Ok(SsTable { + reader: file, + _marker: PhantomData, + })); + } + Err(err) => { + let _ = tx.send(Err(err)); + } + } + }); + rx.await.unwrap() + } + #[cfg(not(feature = "opfs"))] + { + let size = file.size().await?; + let file = file.file(); + Ok(SsTable { + reader: AsyncReader::new(file, size).await?, + _marker: PhantomData, + }) + } } async fn into_parquet_builder( @@ -97,7 +127,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{borrow::Borrow, fs::File, ops::Bound, sync::Arc}; @@ -160,7 +190,8 @@ pub(crate) mod tests { store .open_options(path, FileType::Parquet.open_options(true)) .await - .unwrap(), + .unwrap() + .into(), ) .await .unwrap() diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 7f47e462..24c0ca0a 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -284,6 +284,7 @@ pub(crate) mod test { use super::DynRecord; use crate::record::{Column, ColumnDesc, Datatype}; + #[allow(unused)] pub(crate) fn test_dyn_item_schema() -> (Vec, usize) { let descs = vec![ ColumnDesc::new("id".to_string(), Datatype::Int64, false), @@ -298,6 +299,7 @@ pub(crate) mod test { (descs, 0) } + #[allow(unused)] pub(crate) fn test_dyn_items() -> Vec { let mut items = vec![]; for i in 0..50 { diff --git a/src/serdes/arc.rs b/src/serdes/arc.rs index 15911663..f8104a28 100644 --- a/src/serdes/arc.rs +++ b/src/serdes/arc.rs @@ -26,7 +26,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: Write + Send, + W: Write, { self.as_ref().encode(writer).await } diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs index b857f42a..6962ae54 100644 --- a/src/serdes/mod.rs +++ b/src/serdes/mod.rs @@ -9,12 +9,15 @@ mod string; use std::future::Future; -use fusio::{SeqRead, Write}; +use fusio::{MaybeSend, SeqRead, Write}; pub trait Encode { type Error: From + std::error::Error + Send + Sync + 'static; - fn encode(&self, writer: &mut W) -> impl Future> + Send + fn encode( + &self, + writer: &mut W, + ) -> impl Future> + MaybeSend where W: Write; @@ -62,7 +65,7 @@ mod tests { async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: Write + Send, + W: Write, { self.0.encode(writer).await?; diff --git a/src/stream/level.rs b/src/stream/level.rs index 2039d52c..75da1d7f 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -55,6 +55,8 @@ where path: Option, } +unsafe impl<'level, R: Record> Send for LevelStream<'level, R> {} + impl<'level, R> LevelStream<'level, R> where R: Record, @@ -165,7 +167,7 @@ where }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file))); + self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file.into()))); continue; } Poll::Ready(Err(err)) => { @@ -201,7 +203,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{collections::Bound, sync::Arc}; diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 6671e285..0397e022 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -54,7 +54,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{ops::Bound, sync::Arc}; diff --git a/src/stream/merge.rs b/src/stream/merge.rs index f58583c6..5750fb3a 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -154,7 +154,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{ops::Bound, sync::Arc}; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fa0b5afe..4b2eddba 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -131,6 +131,8 @@ pin_project! { } } +unsafe impl<'scan, R: Record> Send for ScanStream<'scan, R> {} + impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, diff --git a/src/stream/package.rs b/src/stream/package.rs index c8493e8f..7e23b1b0 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -77,7 +77,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{collections::Bound, sync::Arc}; diff --git a/src/transaction.rs b/src/transaction.rs index 3a9d38c1..bb4b743c 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -107,7 +107,7 @@ where ) -> Scan<'scan, R> { Scan::new( &self.share, - &self.manager, + self.manager.clone(), range, self.ts, &self.version, @@ -249,7 +249,7 @@ where ChannelClose, } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{collections::Bound, sync::Arc}; diff --git a/src/trigger.rs b/src/trigger.rs index 34d71de7..1fbf6163 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -86,7 +86,7 @@ impl TriggerFactory { } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use super::*; use crate::tests::Test; diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index bd29e7fc..1d5f33eb 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -33,6 +33,8 @@ where manager: Arc, } +unsafe impl Send for Cleaner {} + impl Cleaner where R: Record, @@ -70,31 +72,49 @@ where break; } for (gen, level) in gens { - let fs = self - .option - .level_fs_path(level) - .map(|path| self.manager.get_fs(path)) - .unwrap_or(self.manager.base_fs()); - fs.remove(&self.option.table_path(&gen, level)).await?; + self.remove(gen, level).await?; } } } CleanTag::RecoverClean { wal_id: gen, level } => { - let fs = self - .option - .level_fs_path(level) - .map(|path| self.manager.get_fs(path)) - .unwrap_or(self.manager.base_fs()); - fs.remove(&self.option.table_path(&gen, level)).await?; + self.remove(gen, level).await?; } } } Ok(()) } + + async fn remove(&self, gen: FileId, level: usize) -> Result<(), DbError> { + #[cfg(feature = "opfs")] + { + let (sender, receiver) = tokio::sync::oneshot::channel(); + let manager = self.manager.clone(); + let path = self.option.table_path(&gen, level); + let option = self.option.clone(); + wasm_bindgen_futures::spawn_local(async move { + let fs = option + .level_fs_path(level) + .map(|path| manager.get_fs(path)) + .unwrap_or(manager.base_fs()); + sender.send(fs.remove(&path).await).unwrap(); + }); + receiver.await.unwrap()?; + } + #[cfg(not(feature = "opfs"))] + { + let fs = self + .option + .level_fs_path(level) + .map(|path| self.manager.get_fs(path)) + .unwrap_or(self.manager.base_fs()); + fs.remove(&self.option.table_path(&gen, level)).await?; + } + Ok(()) + } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{sync::Arc, time::Duration}; diff --git a/src/version/mod.rs b/src/version/mod.rs index 37d15c65..f7db40df 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -56,6 +56,7 @@ where R: Record, { #[cfg(test)] + #[allow(unused)] pub(crate) fn new( option: Arc>, clean_sender: Sender, @@ -183,7 +184,7 @@ where ) .await .map_err(VersionError::Fusio)?; - SsTable::::open(file) + SsTable::::open(file.into()) .await? .get(key, projection_mask) .await @@ -225,7 +226,7 @@ where ) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(file).await?; + let table = SsTable::open(file.into()).await?; streams.push(ScanStream::SsTable { inner: table diff --git a/src/version/set.rs b/src/version/set.rs index 586ca89e..4660adbb 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -10,12 +10,12 @@ use std::{ use async_lock::RwLock; use flume::Sender; -use fusio::{dynamic::DynFile, fs::FileMeta}; +use fusio::{fs::FileMeta, Write}; use futures_util::StreamExt; use super::{TransactionTs, MAX_LEVEL}; use crate::{ - fs::{manager::StoreManager, parse_file_id, FileId, FileType}, + fs::{manager::StoreManager, parse_file_id, DynFileWrapper, FileId, FileType}, record::Record, serdes::Encode, timestamp::Timestamp, @@ -50,9 +50,11 @@ where R: Record, { current: VersionRef, - log_with_id: (Box, FileId), + log_with_id: (DynFileWrapper, FileId), } +unsafe impl Send for VersionSetInner {} + pub(crate) struct VersionSet where R: Record, @@ -158,7 +160,7 @@ where timestamp: timestamp.clone(), log_length: 0, }), - log_with_id: (log, log_id), + log_with_id: (log.into(), log_id), })), clean_sender, timestamp, @@ -171,6 +173,17 @@ where } pub(crate) async fn current(&self) -> VersionRef { + #[cfg(feature = "opfs")] + { + let (tx, rx) = tokio::sync::oneshot::channel(); + let inner = self.inner.clone(); + wasm_bindgen_futures::spawn_local(async move { + let version = inner.read().await.current.clone(); + let _ = tx.send(version); + }); + rx.await.unwrap() + } + #[cfg(not(feature = "opfs"))] self.inner.read().await.current.clone() } @@ -270,7 +283,7 @@ where FileType::Log.open_options(false), ) .await?; - let _old_log = mem::replace(log, new_log); + let _old_log = mem::replace(log, new_log.into()); new_version.log_length = 0; for new_edit in new_version.to_edits() { @@ -284,7 +297,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{io::Cursor, sync::Arc}; @@ -330,7 +343,7 @@ pub(crate) mod tests { Ok(VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { current: Arc::new(version), - log_with_id: (log, log_id), + log_with_id: (log.into(), log_id), })), clean_sender, timestamp, diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 239d0d83..11f558d1 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -1,4 +1,4 @@ -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{hash::Hasher, ops::Bound}; diff --git a/tests/wasm.rs b/tests/wasm.rs new file mode 100644 index 00000000..92334da9 --- /dev/null +++ b/tests/wasm.rs @@ -0,0 +1,291 @@ +#[cfg(all(test, feature = "opfs", target_arch = "wasm32"))] +mod tests { + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + use std::{collections::BTreeMap, ops::Bound, sync::Arc}; + + use fusio::{path::Path, DynFs}; + use futures::StreamExt; + use tonbo::{ + executor::opfs::OpfsExecutor, + record::{Column, ColumnDesc, Datatype, DynRecord, Record}, + DbOption, Projection, DB, + }; + use wasm_bindgen_test::wasm_bindgen_test; + + fn test_dyn_item_schema() -> (Vec, usize) { + let descs = vec![ + ColumnDesc::new("id".to_string(), Datatype::Int64, false), + ColumnDesc::new("age".to_string(), Datatype::Int8, true), + ColumnDesc::new("name".to_string(), Datatype::String, false), + ColumnDesc::new("email".to_string(), Datatype::String, true), + ColumnDesc::new("bytes".to_string(), Datatype::Bytes, true), + ]; + (descs, 0) + } + + fn test_dyn_items() -> Vec { + let mut items = vec![]; + for i in 0..50 { + let columns = vec![ + Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false), + Column::new( + Datatype::Int8, + "age".to_string(), + Arc::new(Some(i as i8)), + true, + ), + Column::new( + Datatype::String, + "name".to_string(), + Arc::new(i.to_string()), + false, + ), + Column::new( + Datatype::String, + "email".to_string(), + Arc::new(Some(format!("{}@tonbo.io", i))), + true, + ), + Column::new( + Datatype::Bytes, + "bytes".to_string(), + Arc::new(Some((i as i32).to_le_bytes().to_vec())), + true, + ), + ]; + + let record = DynRecord::new(columns, 0); + items.push(record); + } + items + } + + #[wasm_bindgen_test] + async fn test_wasm_read_write() { + let (cols_desc, primary_key_index) = test_dyn_item_schema(); + let path = Path::from_opfs_path("opfs_dir_rw").unwrap(); + let fs = fusio::disk::LocalFs {}; + fs.remove(&path).await.unwrap(); + fs.create_dir_all(&path).await.unwrap(); + + let option = DbOption::with_path( + Path::from_opfs_path("opfs_dir_rw").unwrap(), + "id".to_string(), + primary_key_index, + ); + + let db: DB = + DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index) + .await + .unwrap(); + + for item in test_dyn_items().into_iter() { + db.insert(item).await.unwrap(); + } + + // test get + { + let tx = db.transaction().await; + + for i in 0..50 { + let key = Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false); + let option1 = tx.get(&key, Projection::All).await.unwrap(); + let entry = option1.unwrap(); + let record_ref = entry.get(); + + assert_eq!( + *record_ref + .columns + .first() + .unwrap() + .value + .as_ref() + .downcast_ref::() + .unwrap(), + i as i64 + ); + assert_eq!( + *record_ref + .columns + .get(2) + .unwrap() + .value + .as_ref() + .downcast_ref::>() + .unwrap(), + Some(i.to_string()), + ); + assert_eq!( + *record_ref + .columns + .get(3) + .unwrap() + .value + .as_ref() + .downcast_ref::>() + .unwrap(), + Some(format!("{}@tonbo.io", i)), + ); + assert_eq!( + *record_ref + .columns + .get(4) + .unwrap() + .value + .as_ref() + .downcast_ref::>>() + .unwrap(), + Some((i as i32).to_le_bytes().to_vec()), + ); + } + tx.commit().await.unwrap(); + } + } + + #[wasm_bindgen_test] + async fn test_wasm_transaction() { + let (cols_desc, primary_key_index) = test_dyn_item_schema(); + + let fs = fusio::disk::LocalFs {}; + let path = Path::from_opfs_path("opfs_dir_txn").unwrap(); + fs.remove(&path).await.unwrap(); + fs.create_dir_all(&path).await.unwrap(); + + let option = DbOption::with_path( + Path::from_opfs_path("opfs_dir_txn").unwrap(), + "id".to_string(), + primary_key_index, + ); + + let db: DB = + DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index) + .await + .unwrap(); + + { + let mut txn = db.transaction().await; + for item in test_dyn_items().into_iter() { + txn.insert(item); + } + txn.commit().await.unwrap(); + } + + // test scan + { + let txn = db.transaction().await; + let lower = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(5_i64), false); + let upper = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(47_i64), false); + let mut scan = txn + .scan((Bound::Included(&lower), Bound::Included(&upper))) + .projection(vec![0, 2, 4]) + .take() + .await + .unwrap(); + + let mut i = 5_i64; + while let Some(entry) = scan.next().await.transpose().unwrap() { + let columns = entry.value().unwrap().columns; + + let primary_key_col = columns.first().unwrap(); + assert_eq!(primary_key_col.datatype, Datatype::Int64); + assert_eq!(primary_key_col.name, "id".to_string()); + assert_eq!( + *primary_key_col + .value + .as_ref() + .downcast_ref::() + .unwrap(), + i + ); + + let col = columns.get(1).unwrap(); + assert_eq!(col.datatype, Datatype::Int8); + assert_eq!(col.name, "age".to_string()); + let age = col.value.as_ref().downcast_ref::>(); + assert!(age.is_some()); + assert_eq!(age.unwrap(), &None); + + let col = columns.get(2).unwrap(); + assert_eq!(col.datatype, Datatype::String); + assert_eq!(col.name, "name".to_string()); + let name = col.value.as_ref().downcast_ref::>(); + assert!(name.is_some()); + assert_eq!(name.unwrap(), &Some(i.to_string())); + + let col = columns.get(4).unwrap(); + assert_eq!(col.datatype, Datatype::Bytes); + assert_eq!(col.name, "bytes".to_string()); + let bytes = col.value.as_ref().downcast_ref::>>(); + assert!(bytes.is_some()); + assert_eq!(bytes.unwrap(), &Some((i as i32).to_le_bytes().to_vec())); + i += 1 + } + assert_eq!(i, 48); + } + } + + #[wasm_bindgen_test] + async fn test_wasm_schema_recover() { + let (cols_desc, primary_key_index) = test_dyn_item_schema(); + let path = Path::from_opfs_path("opfs_dir").unwrap(); + let fs = fusio::disk::LocalFs {}; + fs.remove(&path).await.unwrap(); + fs.create_dir_all(&path).await.unwrap(); + + let option = DbOption::with_path( + Path::from_opfs_path("opfs_dir").unwrap(), + "id".to_string(), + primary_key_index, + ); + + { + let db: DB = + DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index) + .await + .unwrap(); + + for item in test_dyn_items().into_iter() { + db.insert(item).await.unwrap(); + } + + db.flush_wal().await.unwrap(); + } + + let (cols_desc, primary_key_index) = test_dyn_item_schema(); + let option = DbOption::with_path( + Path::from_opfs_path("opfs_dir").unwrap(), + "id".to_string(), + primary_key_index, + ); + let db: DB = + DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index) + .await + .unwrap(); + + let mut sort_items = BTreeMap::new(); + for item in test_dyn_items() { + sort_items.insert(item.key(), item); + } + + { + let tx = db.transaction().await; + let mut scan = tx + .scan((Bound::Unbounded, Bound::Unbounded)) + .projection(vec![0, 1, 2]) + .take() + .await + .unwrap(); + + while let Some(entry) = scan.next().await.transpose().unwrap() { + let columns1 = entry.value().unwrap().columns; + let (_, record) = sort_items.pop_first().unwrap(); + let columns2 = record.as_record_ref().columns; + + assert_eq!(columns1.len(), columns2.len()); + for i in 0..columns1.len() { + assert_eq!(columns1.get(i), columns2.get(i)); + } + } + } + } +} From 9bbc38b41af0f22006af6ff9c10dd05c434584bf Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 12 Nov 2024 17:09:04 +0800 Subject: [PATCH 2/6] fix build error --- .github/workflows/ci.yml | 4 ++-- Cargo.toml | 12 ++++++------ bindings/python/Cargo.toml | 7 +++++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f70840ce..e5573b06 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: run - args: --example declare --all-features + args: --example declare --features bytes,tokio benchmark: name: Rust benchmark @@ -109,7 +109,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: bench - args: --all-features + args: --features bench - name: Comment on PR using GitHub CLI env: diff --git a/Cargo.toml b/Cargo.toml index 67e8e183..104d84ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ version = "0.2.0" msrv = "1.79.0" [features] -aws = ["fusio/aws", "fusio-dispatch/tokio"] +aws = ["fusio/aws", "fusio-dispatch/aws"] bench = ["redb", "rocksdb", "sled"] bytes = ["dep:bytes"] datafusion = ["dep:async-trait", "dep:datafusion"] @@ -77,7 +77,7 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87 "dyn", "fs", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.1"} +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.1" } fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-parquet", version = "0.2.1" } futures-core = "0.3" futures-io = "0.3" @@ -101,16 +101,16 @@ tonbo_macros = { version = "0.2.0", path = "tonbo_macros" } tracing = "0.1" ulid = "1" -[target.'cfg(target_arch = "wasm32")'.dependencies] -wasm-bindgen = "0.2.95" -wasm-bindgen-futures = { version = "0.4.45", optional = true } - # Only used for benchmarks log = "0.4.22" redb = { version = "2", optional = true } rocksdb = { version = "0.22", optional = true } sled = { version = "0.34", optional = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen = "0.2.95" +wasm-bindgen-futures = { version = "0.4.45", optional = true } + [dev-dependencies] bincode = "1" fastrand = "2" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 21b170ca..f4da9790 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -9,8 +9,11 @@ crate-type = ["cdylib"] [workspace] [dependencies] -fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] } -fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio", version = "0.3.1", features = [ + "aws", + "tokio", +] } +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "8038993675591f87dd65b88ffdade31dc0a254b7", package = "fusio-dispatch", version = "0.2.0", features = [ "aws", "tokio", ] } From 4d9c8f3c081e4cd7951f13f70927365fd5778167 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Wed, 13 Nov 2024 21:25:10 +0800 Subject: [PATCH 3/6] set compilation condition --- src/compaction/mod.rs | 1 + src/fs/manager.rs | 2 ++ src/fs/mod.rs | 5 +++++ src/ondisk/scan.rs | 1 + src/stream/level.rs | 1 + src/stream/mod.rs | 1 + src/version/cleaner.rs | 1 + src/version/set.rs | 1 + 8 files changed, 13 insertions(+) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 6eac7d80..318c1cdd 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -41,6 +41,7 @@ where pub(crate) manager: Arc, } +#[cfg(target_arch = "wasm32")] unsafe impl Send for Compactor {} impl Compactor diff --git a/src/fs/manager.rs b/src/fs/manager.rs index 7dca4b98..77f76b5f 100644 --- a/src/fs/manager.rs +++ b/src/fs/manager.rs @@ -8,7 +8,9 @@ pub struct StoreManager { fs_map: HashMap>, } +#[cfg(target_arch = "wasm32")] unsafe impl Send for StoreManager {} +#[cfg(target_arch = "wasm32")] unsafe impl Sync for StoreManager {} impl StoreManager { diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 1ce3932c..f4c85b89 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -56,15 +56,20 @@ pub struct DynFileWrapper { inner: Box, } +#[cfg(target_arch = "wasm32")] unsafe impl Send for DynFileWrapper {} +#[cfg(target_arch = "wasm32")] unsafe impl Sync for DynFileWrapper {} pub struct DynWriteWrapper { inner: Box, } +#[cfg(target_arch = "wasm32")] unsafe impl Send for DynWriteWrapper {} +#[cfg(target_arch = "wasm32")] unsafe impl Sync for DynWriteWrapper {} + impl DynWriteWrapper { #[allow(unused)] pub(crate) fn new(inner: Box) -> Self { diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index e499c3d3..f6c7d2ed 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -28,6 +28,7 @@ pin_project! { } } +#[cfg(target_arch = "wasm32")] unsafe impl<'scan, R> Send for SsTableScan<'scan, R> {} impl SsTableScan<'_, R> { diff --git a/src/stream/level.rs b/src/stream/level.rs index 75da1d7f..b0cdc0d2 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -55,6 +55,7 @@ where path: Option, } +#[cfg(target_arch = "wasm32")] unsafe impl<'level, R: Record> Send for LevelStream<'level, R> {} impl<'level, R> LevelStream<'level, R> diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 4b2eddba..2222bda2 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -131,6 +131,7 @@ pin_project! { } } +#[cfg(target_arch = "wasm32")] unsafe impl<'scan, R: Record> Send for ScanStream<'scan, R> {} impl<'scan, R> From> for ScanStream<'scan, R> diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 1d5f33eb..1ac4c3f9 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -33,6 +33,7 @@ where manager: Arc, } +#[cfg(target_arch = "wasm32")] unsafe impl Send for Cleaner {} impl Cleaner diff --git a/src/version/set.rs b/src/version/set.rs index 4660adbb..4c49656b 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -53,6 +53,7 @@ where log_with_id: (DynFileWrapper, FileId), } +#[cfg(target_arch = "wasm32")] unsafe impl Send for VersionSetInner {} pub(crate) struct VersionSet From 4d34932f722925637656173743469eb48fe6c658 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Thu, 14 Nov 2024 14:13:31 +0800 Subject: [PATCH 4/6] change executor to MaybeSend --- src/compaction/mod.rs | 3 --- src/executor.rs | 12 ++++++++---- src/fs/mod.rs | 4 ---- src/ondisk/scan.rs | 3 --- src/stream/level.rs | 3 --- src/stream/mod.rs | 3 --- src/version/cleaner.rs | 3 --- src/version/set.rs | 3 --- 8 files changed, 8 insertions(+), 26 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 318c1cdd..b13a7d2d 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -41,9 +41,6 @@ where pub(crate) manager: Arc, } -#[cfg(target_arch = "wasm32")] -unsafe impl Send for Compactor {} - impl Compactor where R: Record, diff --git a/src/executor.rs b/src/executor.rs index a83dff32..91e39f43 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,15 +1,18 @@ use std::future::Future; +use fusio::MaybeSend; + pub trait Executor { fn spawn(&self, future: F) where - F: Future + Send + 'static; + F: Future + MaybeSend + 'static; } -#[cfg(any(test, feature = "tokio"))] +#[cfg(feature = "tokio")] pub mod tokio { use std::future::Future; + use fusio::MaybeSend; use tokio::runtime::Handle; use super::Executor; @@ -36,7 +39,7 @@ pub mod tokio { impl Executor for TokioExecutor { fn spawn(&self, future: F) where - F: Future + Send + 'static, + F: Future + MaybeSend + 'static, { self.handle.spawn(future); } @@ -47,6 +50,7 @@ pub mod tokio { pub mod opfs { use std::future::Future; + use fusio::MaybeSend; use wasm_bindgen::prelude::*; use super::Executor; @@ -70,7 +74,7 @@ pub mod opfs { impl Executor for OpfsExecutor { fn spawn(&self, future: F) where - F: Future + Send + 'static, + F: Future + MaybeSend + 'static, { wasm_bindgen_futures::spawn_local(future); } diff --git a/src/fs/mod.rs b/src/fs/mod.rs index f4c85b89..12d8782c 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -58,8 +58,6 @@ pub struct DynFileWrapper { #[cfg(target_arch = "wasm32")] unsafe impl Send for DynFileWrapper {} -#[cfg(target_arch = "wasm32")] -unsafe impl Sync for DynFileWrapper {} pub struct DynWriteWrapper { inner: Box, @@ -67,8 +65,6 @@ pub struct DynWriteWrapper { #[cfg(target_arch = "wasm32")] unsafe impl Send for DynWriteWrapper {} -#[cfg(target_arch = "wasm32")] -unsafe impl Sync for DynWriteWrapper {} impl DynWriteWrapper { #[allow(unused)] diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index f6c7d2ed..5a8ce66d 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -28,9 +28,6 @@ pin_project! { } } -#[cfg(target_arch = "wasm32")] -unsafe impl<'scan, R> Send for SsTableScan<'scan, R> {} - impl SsTableScan<'_, R> { pub fn new( stream: ParquetRecordBatchStream, diff --git a/src/stream/level.rs b/src/stream/level.rs index b0cdc0d2..b3bdcbbb 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -55,9 +55,6 @@ where path: Option, } -#[cfg(target_arch = "wasm32")] -unsafe impl<'level, R: Record> Send for LevelStream<'level, R> {} - impl<'level, R> LevelStream<'level, R> where R: Record, diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 2222bda2..fa0b5afe 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -131,9 +131,6 @@ pin_project! { } } -#[cfg(target_arch = "wasm32")] -unsafe impl<'scan, R: Record> Send for ScanStream<'scan, R> {} - impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 1ac4c3f9..6a90e65a 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -33,9 +33,6 @@ where manager: Arc, } -#[cfg(target_arch = "wasm32")] -unsafe impl Send for Cleaner {} - impl Cleaner where R: Record, diff --git a/src/version/set.rs b/src/version/set.rs index 4c49656b..21778c29 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -53,9 +53,6 @@ where log_with_id: (DynFileWrapper, FileId), } -#[cfg(target_arch = "wasm32")] -unsafe impl Send for VersionSetInner {} - pub(crate) struct VersionSet where R: Record, From 2a0fcf6758605edd4080705f82a9fd4063e2a677 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Thu, 14 Nov 2024 20:25:19 +0800 Subject: [PATCH 5/6] use reference of store manager instead --- Cargo.toml | 5 +++-- src/lib.rs | 7 +++---- src/snapshot.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 104d84ef..dbe5308d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ version = "0.2.0" msrv = "1.79.0" [features] -aws = ["fusio/aws", "fusio-dispatch/aws"] +aws = ["fusio-dispatch/aws", "fusio/aws"] bench = ["redb", "rocksdb", "sled"] bytes = ["dep:bytes"] datafusion = ["dep:async-trait", "dep:datafusion"] @@ -24,8 +24,8 @@ object-store = ["fusio/object_store"] opfs = [ "dep:wasm-bindgen-futures", "fusio-dispatch/opfs", - "fusio/opfs", "fusio-parquet/opfs", + "fusio/opfs", ] redb = ["dep:redb"] rocksdb = ["dep:rocksdb"] @@ -38,6 +38,7 @@ tokio = [ "tokio/fs", ] tokio-http = ["fusio/tokio-http"] +wasm = ["aws", "bytes", "opfs"] [[example]] name = "declare" diff --git a/src/lib.rs b/src/lib.rs index c737aae2..b1b9b2c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -373,11 +373,10 @@ where ) -> impl Stream>> + 'scan { stream! { let schema = self.schema.read().await; - let manager = self.manager.clone(); let current = self.version_set.current().await; let mut scan = Scan::new( &schema, - manager, + &self.manager, range, self.version_set.load_ts(), &*current, @@ -624,7 +623,7 @@ where 'range: 'scan, { schema: &'scan Schema, - manager: Arc, + manager: &'scan StoreManager, lower: Bound<&'range R::Key>, upper: Bound<&'range R::Key>, ts: Timestamp, @@ -644,7 +643,7 @@ where { fn new( schema: &'scan Schema, - manager: Arc, + manager: &'scan StoreManager, (lower, upper): (Bound<&'range R::Key>, Bound<&'range R::Key>), ts: Timestamp, version: &'scan Version, diff --git a/src/snapshot.rs b/src/snapshot.rs index e957a020..941fc9f3 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -45,7 +45,7 @@ impl<'s, R: Record> Snapshot<'s, R> { ) -> Scan<'scan, 'range, R> { Scan::new( &self.share, - self.manager.clone(), + &self.manager, range, self.ts, &self.version, @@ -87,7 +87,7 @@ impl<'s, R: Record> Snapshot<'s, R> { ) -> Scan<'scan, 'range, R> { Scan::new( &self.share, - self.manager.clone(), + &self.manager, range, self.ts, &self.version, From b2e6118d4214116b9817ff73fac7701d74401829 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Thu, 14 Nov 2024 21:07:50 +0800 Subject: [PATCH 6/6] remove Send requirement --- src/compaction/mod.rs | 251 ++++++++++------------------------------- src/executor.rs | 3 +- src/fs/manager.rs | 5 - src/fs/mod.rs | 96 +--------------- src/inmem/mutable.rs | 29 +---- src/lib.rs | 14 +-- src/ondisk/sstable.rs | 49 ++------ src/stream/level.rs | 4 +- src/transaction.rs | 8 +- src/version/cleaner.rs | 42 ++----- src/version/mod.rs | 4 +- src/version/set.rs | 23 +--- src/wal/mod.rs | 2 +- 13 files changed, 110 insertions(+), 420 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index b9a7a180..0abd0e5e 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -1,6 +1,7 @@ use std::{cmp, collections::Bound, mem, pin::Pin, sync::Arc}; use async_lock::{RwLock, RwLockUpgradableReadGuard}; +use fusio::DynFs; use fusio_parquet::writer::AsyncWriter; use futures_util::StreamExt; use parquet::arrow::{AsyncArrowWriter, ProjectionMask}; @@ -36,7 +37,7 @@ where { pub(crate) option: Arc>, pub(crate) schema: Arc>>, - pub(crate) version_set: Arc>, + pub(crate) version_set: VersionSet, pub(crate) manager: Arc, } @@ -47,7 +48,7 @@ where pub(crate) fn new( schema: Arc>>, option: Arc>, - version_set: Arc>, + version_set: VersionSet, manager: Arc, ) -> Self { Compactor:: { @@ -68,20 +69,10 @@ where } let trigger_clone = guard.trigger.clone(); - #[cfg(feature = "opfs")] - let new_mutable = { - let option = self.option.clone(); - let manager = self.manager.clone(); - let (sender, receiver) = oneshot::channel(); - wasm_bindgen_futures::spawn_local(async move { - let _ = sender.send(Mutable::new(&option, trigger_clone, manager.base_fs()).await); - }); - receiver.await.unwrap()? - }; - #[cfg(not(feature = "opfs"))] - let new_mutable = - { Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await? }; - let mutable = mem::replace(&mut guard.mutable, new_mutable); + let mutable = mem::replace( + &mut guard.mutable, + Mutable::new(&self.option, trigger_clone, self.manager.base_fs()).await?, + ); let (file_id, immutable) = mutable.into_immutable(&guard.record_instance).await?; guard.immutables.push((file_id, immutable)); @@ -94,11 +85,11 @@ where let excess = &guard.immutables[0..chunk_num]; if let Some(scope) = Self::minor_compaction( - self.option.clone(), + &self.option, recover_wal_ids, excess, &guard.record_instance, - self.manager.clone(), + &self.manager, ) .await? { @@ -109,13 +100,13 @@ where if self.option.is_threshold_exceeded_major(&version_ref, 0) { Self::major_compaction( &version_ref, - self.option.clone(), + &self.option, &scope.min, &scope.max, &mut version_edits, &mut delete_gens, &guard.record_instance, - self.manager.clone(), + &self.manager, ) .await?; } @@ -124,20 +115,6 @@ where ts: version_ref.increase_ts(), }); - #[cfg(feature = "opfs")] - { - let (tx, rx) = oneshot::channel(); - let version_set = self.version_set.clone(); - wasm_bindgen_futures::spawn_local(async move { - let _ = tx.send( - version_set - .apply_edits(version_edits, Some(delete_gens), false) - .await, - ); - }); - rx.await.unwrap()?; - } - #[cfg(not(feature = "opfs"))] self.version_set .apply_edits(version_edits, Some(delete_gens), false) .await?; @@ -150,69 +127,34 @@ where } pub(crate) async fn minor_compaction( - option: Arc>, + option: &DbOption, recover_wal_ids: Option>, batches: &[(Option, Immutable)], instance: &RecordInstance, - manager: Arc, + manager: &StoreManager, ) -> Result>, CompactionError> { if !batches.is_empty() { + let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); + let level_0_fs = manager.get_fs(level_0_path); + let mut min = None; let mut max = None; let gen = FileId::new(); let mut wal_ids = Vec::with_capacity(batches.len()); - let schema = instance.arrow_schema::().clone(); - - #[cfg(feature = "opfs")] - let mut writer = { - let (tx, rx) = oneshot::channel(); - wasm_bindgen_futures::spawn_local(async move { - let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); - let level_0_fs = manager.get_fs(level_0_path); - match level_0_fs + let mut writer = AsyncArrowWriter::try_new( + AsyncWriter::new( + level_0_fs .open_options( &option.table_path(&gen, 0), FileType::Parquet.open_options(false), ) - .await - { - Ok(file) => { - let _ = tx.send( - AsyncArrowWriter::try_new( - AsyncWriter::new(file), - schema, - Some(option.write_parquet_properties.clone()), - ) - .map_err(CompactionError::Parquet), - ); - } - Err(err) => { - let _ = tx.send(Err(CompactionError::Fusio(err))); - return; - } - }; - }); - rx.await.map_err(|_| CompactionError::ChannelClose)?? - }; - #[cfg(not(feature = "opfs"))] - let mut writer = { - let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); - let level_0_fs = manager.get_fs(level_0_path); - AsyncArrowWriter::try_new( - AsyncWriter::new( - level_0_fs - .open_options( - &option.table_path(&gen, 0), - FileType::Parquet.open_options(false), - ) - .await?, - ), - schema, - Some(option.write_parquet_properties.clone()), - )? - }; + .await?, + ), + instance.arrow_schema::().clone(), + Some(option.write_parquet_properties.clone()), + )?; if let Some(mut recover_wal_ids) = recover_wal_ids { wal_ids.append(&mut recover_wal_ids); @@ -245,13 +187,13 @@ where #[allow(clippy::too_many_arguments)] pub(crate) async fn major_compaction( version: &Version, - option: Arc>, + option: &DbOption, mut min: &R::Key, mut max: &R::Key, version_edits: &mut Vec>, delete_gens: &mut Vec<(FileId, usize)>, instance: &RecordInstance, - manager: Arc, + manager: &StoreManager, ) -> Result<(), CompactionError> { let mut level = 0; @@ -269,43 +211,12 @@ where // This Level if level == 0 { for scope in meet_scopes_l.iter() { - #[cfg(feature = "opfs")] - let file = { - let (tx, rx) = oneshot::channel(); - let manager = manager.clone(); - let option = option.clone(); - let gen = scope.gen; - wasm_bindgen_futures::spawn_local(async move { - let level_path = - option.level_fs_path(level).unwrap_or(&option.base_path); - let level_fs = manager.get_fs(level_path); - match level_fs - .open_options( - &option.table_path(&gen, level), - FileType::Parquet.open_options(true), - ) - .await - { - Ok(file) => { - let _ = tx.send(Ok(file.into())); - } - Err(err) => { - let _ = tx.send(Err(err)); - } - }; - }); - rx.await.unwrap()? - }; - #[cfg(not(feature = "opfs"))] - let file = { - level_fs - .open_options( - &option.table_path(&scope.gen, level), - FileType::Parquet.open_options(true), - ) - .await? - .into() - }; + let file = level_fs + .open_options( + &option.table_path(&scope.gen, level), + FileType::Parquet.open_options(true), + ) + .await?; streams.push(ScanStream::SsTable { inner: SsTable::open(file) @@ -339,8 +250,6 @@ where }); } if !meet_scopes_ll.is_empty() { - let level_path = option.level_fs_path(level).unwrap_or(&option.base_path); - let level_fs = manager.get_fs(level_path); // Next Level let (lower, upper) = Self::full_scope(&meet_scopes_ll)?; let level_scan_ll = LevelStream::new( @@ -361,12 +270,12 @@ where }); } Self::build_tables( - option.clone(), + option, version_edits, level + 1, streams, instance, - manager.clone(), + level_fs, ) .await?; @@ -469,12 +378,12 @@ where } async fn build_tables<'scan>( - option: Arc>, + option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, streams: Vec>, instance: &RecordInstance, - manager: Arc, + fs: &Arc, ) -> Result<(), CompactionError> { let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; @@ -495,28 +404,28 @@ where if builder.written_size() >= option.max_sst_file_size { Self::build_table( - option.clone(), + option, version_edits, level, &mut builder, &mut min, &mut max, instance, - manager.clone(), + fs, ) .await?; } } if builder.written_size() > 0 { Self::build_table( - option.clone(), + option, version_edits, level, &mut builder, &mut min, &mut max, instance, - manager.clone(), + fs, ) .await?; } @@ -533,69 +442,31 @@ where #[allow(clippy::too_many_arguments)] async fn build_table( - option: Arc>, + option: &DbOption, version_edits: &mut Vec>, level: usize, builder: &mut ::Builder, min: &mut Option, max: &mut Option, instance: &RecordInstance, - manager: Arc, + fs: &Arc, ) -> Result<(), CompactionError> { debug_assert!(min.is_some()); debug_assert!(max.is_some()); let gen = FileId::new(); let columns = builder.finish(None); - let schema = instance.arrow_schema::().clone(); - #[cfg(feature = "opfs")] - let mut writer = { - let (tx, rx) = oneshot::channel(); - wasm_bindgen_futures::spawn_local(async move { - let level_path = option.level_fs_path(level - 1).unwrap_or(&option.base_path); - let fs = manager.get_fs(level_path); - - match fs - .open_options( - &option.table_path(&gen, level - 1), - FileType::Parquet.open_options(false), - ) - .await - { - Ok(writer) => { - let _ = tx.send( - AsyncArrowWriter::try_new( - AsyncWriter::new(writer), - schema, - Some(option.write_parquet_properties.clone()), - ) - .map_err(CompactionError::Parquet), - ); - } - Err(err) => { - let _ = tx.send(Err(CompactionError::Fusio(err))); - } - } - }); - - rx.await.unwrap()? - }; - #[cfg(not(feature = "opfs"))] - let mut writer = { - let level_path = option.level_fs_path(level - 1).unwrap_or(&option.base_path); - let fs = manager.get_fs(level_path); - AsyncArrowWriter::try_new( - AsyncWriter::new( - fs.open_options( - &option.table_path(&gen, level), - FileType::Parquet.open_options(false), - ) - .await?, - ), - schema, - Some(option.write_parquet_properties.clone()), - )? - }; + let mut writer = AsyncArrowWriter::try_new( + AsyncWriter::new( + fs.open_options( + &option.table_path(&gen, level), + FileType::Parquet.open_options(false), + ) + .await?, + ), + instance.arrow_schema::().clone(), + Some(option.write_parquet_properties.clone()), + )?; writer.write(columns.as_record_batch()).await?; writer.close().await?; version_edits.push(VersionEdit::Add { @@ -801,14 +672,14 @@ pub(crate) mod tests { .unwrap(); let scope = Compactor::::minor_compaction( - Arc::new(option), + &option, None, &vec![ (Some(FileId::new()), batch_1), (Some(FileId::new()), batch_2), ], &RecordInstance::Normal, - Arc::new(manager), + &manager, ) .await .unwrap() @@ -865,14 +736,14 @@ pub(crate) mod tests { .unwrap(); let scope = Compactor::::minor_compaction( - Arc::new(option), + &option, None, &vec![ (Some(FileId::new()), batch_1), (Some(FileId::new()), batch_2), ], &instance, - Arc::new(manager), + &manager, ) .await .unwrap() @@ -931,13 +802,13 @@ pub(crate) mod tests { Compactor::::major_compaction( &version, - option.clone(), + &option, &min, &max, &mut version_edits, &mut vec![], &RecordInstance::Normal, - Arc::new(manager), + &manager, ) .await .unwrap(); @@ -1322,13 +1193,13 @@ pub(crate) mod tests { Compactor::::major_compaction( &version, - option.clone(), + &option, &min, &max, &mut version_edits, &mut vec![], &RecordInstance::Normal, - Arc::new(manager), + &manager, ) .await .unwrap(); diff --git a/src/executor.rs b/src/executor.rs index 91e39f43..5521da25 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -55,9 +55,8 @@ pub mod opfs { use super::Executor; - #[derive(Debug)] #[wasm_bindgen] - pub struct OpfsExecutor {} + pub struct OpfsExecutor(); impl Default for OpfsExecutor { fn default() -> Self { diff --git a/src/fs/manager.rs b/src/fs/manager.rs index 77f76b5f..6ae83720 100644 --- a/src/fs/manager.rs +++ b/src/fs/manager.rs @@ -8,11 +8,6 @@ pub struct StoreManager { fs_map: HashMap>, } -#[cfg(target_arch = "wasm32")] -unsafe impl Send for StoreManager {} -#[cfg(target_arch = "wasm32")] -unsafe impl Sync for StoreManager {} - impl StoreManager { pub fn new( base_options: FsOptions, diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 12d8782c..1bffbb50 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -5,7 +5,7 @@ use std::{ str::FromStr, }; -use fusio::{dynamic::DynFile, fs::OpenOptions, path::Path, Read, Write}; +use fusio::{fs::OpenOptions, path::Path}; use ulid::{DecodeError, Ulid}; pub(crate) type FileId = Ulid; @@ -51,97 +51,3 @@ pub(crate) fn parse_file_id(path: &Path, suffix: FileType) -> Result, -} - -#[cfg(target_arch = "wasm32")] -unsafe impl Send for DynFileWrapper {} - -pub struct DynWriteWrapper { - inner: Box, -} - -#[cfg(target_arch = "wasm32")] -unsafe impl Send for DynWriteWrapper {} - -impl DynWriteWrapper { - #[allow(unused)] - pub(crate) fn new(inner: Box) -> Self { - Self { inner } - } -} - -impl From> for DynWriteWrapper { - fn from(inner: Box) -> Self { - Self { inner } - } -} - -impl Write for DynWriteWrapper { - async fn write_all(&mut self, buf: B) -> (Result<(), fusio::Error>, B) { - self.inner.write_all(buf).await - } - - async fn flush(&mut self) -> Result<(), fusio::Error> { - self.inner.flush().await - } - - async fn close(&mut self) -> Result<(), fusio::Error> { - self.inner.close().await - } -} - -impl DynFileWrapper { - #[allow(unused)] - pub(crate) fn new(inner: Box) -> Self { - Self { inner } - } - - pub(crate) fn file(self) -> Box { - self.inner - } -} - -impl From> for DynFileWrapper { - fn from(inner: Box) -> Self { - Self { inner } - } -} - -impl Read for DynFileWrapper { - async fn read_exact_at( - &mut self, - buf: B, - pos: u64, - ) -> (Result<(), fusio::Error>, B) { - self.inner.read_exact_at(buf, pos).await - } - - async fn read_to_end_at( - &mut self, - buf: Vec, - pos: u64, - ) -> (Result<(), fusio::Error>, Vec) { - self.inner.read_to_end_at(buf, pos).await - } - - async fn size(&self) -> Result { - self.inner.size().await - } -} - -impl Write for DynFileWrapper { - async fn write_all(&mut self, buf: B) -> (Result<(), fusio::Error>, B) { - self.inner.write_all(buf).await - } - - async fn flush(&mut self) -> Result<(), fusio::Error> { - self.inner.flush().await - } - - async fn close(&mut self) -> Result<(), fusio::Error> { - self.inner.close().await - } -} diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 987324de..3f084075 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -8,7 +8,7 @@ use crossbeam_skiplist::{ use fusio::{buffered::BufWriter, DynFs, DynWrite}; use crate::{ - fs::{DynWriteWrapper, FileId, FileType}, + fs::{FileId, FileType}, inmem::immutable::Immutable, record::{Key, KeyRef, Record, RecordInstance}, timestamp::{ @@ -36,7 +36,7 @@ where R: Record, { pub(crate) data: SkipMap, Option>, - wal: Option>>, + wal: Option, R>>>, pub(crate) trigger: Arc + Send + Sync>>, } @@ -62,10 +62,7 @@ where option.wal_buffer_size, )) as Box; - wal = Some(Mutex::new(WalFile::new( - DynWriteWrapper::from(file), - file_id, - ))); + wal = Some(Mutex::new(WalFile::new(file, file_id))); }; Ok(Self { @@ -180,23 +177,9 @@ where let mut file_id = None; if let Some(wal) = self.wal { - #[cfg(feature = "opfs")] - { - let (tx, rx) = tokio::sync::oneshot::channel(); - wasm_bindgen_futures::spawn_local(async move { - let mut wal_guard = wal.lock().await; - wal_guard.flush().await.unwrap(); - - let _ = tx.send(Some(wal_guard.file_id())); - }); - file_id = rx.await.unwrap() - }; - #[cfg(not(feature = "opfs"))] - { - let mut wal_guard = wal.lock().await; - wal_guard.flush().await?; - file_id = Some(wal_guard.file_id()) - }; + let mut wal_guard = wal.lock().await; + wal_guard.flush().await?; + file_id = Some(wal_guard.file_id()); } Ok((file_id, Immutable::from((self.data, instance)))) diff --git a/src/lib.rs b/src/lib.rs index b1b9b2c1..ad79f401 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,7 +178,7 @@ where E: Executor, { schema: Arc>>, - version_set: Arc>, + version_set: VersionSet, lock_map: LockMap, manager: Arc, _p: PhantomData, @@ -244,8 +244,7 @@ where let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); - let version_set = - Arc::new(VersionSet::new(clean_sender, option.clone(), manager.clone()).await?); + let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?; let schema = Arc::new(RwLock::new( Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, )); @@ -731,7 +730,7 @@ where } self.version .streams( - &self.manager, + self.manager, &mut streams, (self.lower, self.upper), self.ts, @@ -783,7 +782,7 @@ where } self.version .streams( - &self.manager, + self.manager, &mut streams, (self.lower, self.upper), self.ts, @@ -1260,9 +1259,8 @@ pub(crate) mod tests { let schema = Arc::new(RwLock::new(schema)); let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); - let version_set = Arc::new( - build_version_set(version, clean_sender, option.clone(), manager.clone()).await?, - ); + let version_set = + build_version_set(version, clean_sender, option.clone(), manager.clone()).await?; let mut compactor = Compactor::::new( schema.clone(), option.clone(), diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 28acddb2..859e6c56 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -1,6 +1,6 @@ use std::{marker::PhantomData, ops::Bound}; -use fusio::DynRead; +use fusio::{dynamic::DynFile, DynRead}; use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; use parquet::arrow::{ @@ -10,7 +10,6 @@ use parquet::arrow::{ use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ - fs::DynFileWrapper, record::Record, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, TimestampedRef}, @@ -28,42 +27,13 @@ impl SsTable where R: Record, { - pub(crate) async fn open(file: DynFileWrapper) -> Result { - #[cfg(feature = "opfs")] - { - let (tx, rx) = tokio::sync::oneshot::channel(); - wasm_bindgen_futures::spawn_local(async move { - let size = match file.size().await { - Ok(size) => size, - Err(err) => { - let _ = tx.send(Err(err)); - return; - } - }; - let file = file.file(); - match AsyncReader::new(file, size).await { - Ok(file) => { - let _ = tx.send(Ok(SsTable { - reader: file, - _marker: PhantomData, - })); - } - Err(err) => { - let _ = tx.send(Err(err)); - } - } - }); - rx.await.unwrap() - } - #[cfg(not(feature = "opfs"))] - { - let size = file.size().await?; - let file = file.file(); - Ok(SsTable { - reader: AsyncReader::new(file, size).await?, - _marker: PhantomData, - }) - } + pub(crate) async fn open(file: Box) -> Result { + let size = file.size().await?; + + Ok(SsTable { + reader: AsyncReader::new(file, size).await?, + _marker: PhantomData, + }) } async fn into_parquet_builder( @@ -190,8 +160,7 @@ pub(crate) mod tests { store .open_options(path, FileType::Parquet.open_options(true)) .await - .unwrap() - .into(), + .unwrap(), ) .await .unwrap() diff --git a/src/stream/level.rs b/src/stream/level.rs index b3bdcbbb..0d5704ae 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -32,7 +32,7 @@ where Init(FileId), Ready(SsTableScan<'level, R>), OpenFile(Pin, Error>> + 'level>>), - OpenSst(Pin, Error>> + Send + 'level>>), + OpenSst(Pin, Error>> + 'level>>), LoadStream( Pin, ParquetError>> + Send + 'level>>, ), @@ -165,7 +165,7 @@ where }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file.into()))); + self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file))); continue; } Poll::Ready(Err(err)) => { diff --git a/src/transaction.rs b/src/transaction.rs index 4a1bb053..88d8bdf3 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -85,14 +85,12 @@ where &'scan self, range: (Bound<&'range R::Key>, Bound<&'range R::Key>), ) -> Scan<'scan, 'range, R> { + let ts = self.snapshot.ts(); + let inner = self.local.range(range); self.snapshot._scan( range, Box::new(move |projection_mask: Option| { - let mut transaction_scan = TransactionScan { - inner: self.local.range(range), - ts: self.snapshot.ts(), - } - .into(); + let mut transaction_scan = TransactionScan { inner, ts }.into(); if let Some(mask) = projection_mask { transaction_scan = MemProjectionStream::new(transaction_scan, mask).into(); } diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 6a90e65a..03a9a946 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -70,46 +70,28 @@ where break; } for (gen, level) in gens { - self.remove(gen, level).await?; + let fs = self + .option + .level_fs_path(level) + .map(|path| self.manager.get_fs(path)) + .unwrap_or(self.manager.base_fs()); + fs.remove(&self.option.table_path(&gen, level)).await?; } } } CleanTag::RecoverClean { wal_id: gen, level } => { - self.remove(gen, level).await?; + let fs = self + .option + .level_fs_path(level) + .map(|path| self.manager.get_fs(path)) + .unwrap_or(self.manager.base_fs()); + fs.remove(&self.option.table_path(&gen, level)).await?; } } } Ok(()) } - - async fn remove(&self, gen: FileId, level: usize) -> Result<(), DbError> { - #[cfg(feature = "opfs")] - { - let (sender, receiver) = tokio::sync::oneshot::channel(); - let manager = self.manager.clone(); - let path = self.option.table_path(&gen, level); - let option = self.option.clone(); - wasm_bindgen_futures::spawn_local(async move { - let fs = option - .level_fs_path(level) - .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); - sender.send(fs.remove(&path).await).unwrap(); - }); - receiver.await.unwrap()?; - } - #[cfg(not(feature = "opfs"))] - { - let fs = self - .option - .level_fs_path(level) - .map(|path| self.manager.get_fs(path)) - .unwrap_or(self.manager.base_fs()); - fs.remove(&self.option.table_path(&gen, level)).await?; - } - Ok(()) - } } #[cfg(all(test, feature = "tokio"))] diff --git a/src/version/mod.rs b/src/version/mod.rs index f7db40df..72d057f8 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -184,7 +184,7 @@ where ) .await .map_err(VersionError::Fusio)?; - SsTable::::open(file.into()) + SsTable::::open(file) .await? .get(key, projection_mask) .await @@ -226,7 +226,7 @@ where ) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(file.into()).await?; + let table = SsTable::open(file).await?; streams.push(ScanStream::SsTable { inner: table diff --git a/src/version/set.rs b/src/version/set.rs index 21778c29..754a1984 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -10,12 +10,12 @@ use std::{ use async_lock::RwLock; use flume::Sender; -use fusio::{fs::FileMeta, Write}; +use fusio::{dynamic::DynFile, fs::FileMeta}; use futures_util::StreamExt; use super::{TransactionTs, MAX_LEVEL}; use crate::{ - fs::{manager::StoreManager, parse_file_id, DynFileWrapper, FileId, FileType}, + fs::{manager::StoreManager, parse_file_id, FileId, FileType}, record::Record, serdes::Encode, timestamp::Timestamp, @@ -50,7 +50,7 @@ where R: Record, { current: VersionRef, - log_with_id: (DynFileWrapper, FileId), + log_with_id: (Box, FileId), } pub(crate) struct VersionSet @@ -158,7 +158,7 @@ where timestamp: timestamp.clone(), log_length: 0, }), - log_with_id: (log.into(), log_id), + log_with_id: (log, log_id), })), clean_sender, timestamp, @@ -171,17 +171,6 @@ where } pub(crate) async fn current(&self) -> VersionRef { - #[cfg(feature = "opfs")] - { - let (tx, rx) = tokio::sync::oneshot::channel(); - let inner = self.inner.clone(); - wasm_bindgen_futures::spawn_local(async move { - let version = inner.read().await.current.clone(); - let _ = tx.send(version); - }); - rx.await.unwrap() - } - #[cfg(not(feature = "opfs"))] self.inner.read().await.current.clone() } @@ -281,7 +270,7 @@ where FileType::Log.open_options(false), ) .await?; - let _old_log = mem::replace(log, new_log.into()); + let _old_log = mem::replace(log, new_log); new_version.log_length = 0; for new_edit in new_version.to_edits() { @@ -341,7 +330,7 @@ pub(crate) mod tests { Ok(VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { current: Arc::new(version), - log_with_id: (log.into(), log_id), + log_with_id: (log, log_id), })), clean_sender, timestamp, diff --git a/src/wal/mod.rs b/src/wal/mod.rs index 6f8589d2..3fdb9d60 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -42,7 +42,7 @@ impl WalFile { impl WalFile where - F: Write + Send, + F: Write, R: Record, { pub(crate) async fn write<'r>(