Skip to content

Commit f779ec0

Browse files
committed
refactor: Added RwLock abstraction to the Executor trait
1 parent fcdfc80 commit f779ec0

File tree

29 files changed

+362
-270
lines changed

29 files changed

+362
-270
lines changed

CLAUDE.md

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ Tonbo is an embedded, persistent key-value database written in Rust that uses Lo
1313
```bash
1414
# Code quality checks
1515
cargo check # Check for compilation errors
16-
cargo clippy # Run linter for code improvements
16+
cargo clippy --workspace -- -D warnings # Run linter (fail on warnings)
1717
cargo +nightly fmt # Format code (must use nightly)
18+
cargo +nightly fmt -- --check # Check formatting without changing files
1819

1920
# Building
2021
cargo build # Standard debug build
@@ -54,11 +55,12 @@ cargo build --features datafusion
5455
cd bindings/python
5556
maturin develop # Build and install locally for development
5657
pytest tests/ # Run Python tests
58+
pytest tests/bench/ --benchmark-only # Run Python benchmarks
5759

5860
# JavaScript/WASM bindings (requires wasm-pack)
5961
cd bindings/js
6062
wasm-pack build # Build WASM module
61-
wasm-pack test --headless # Run WASM tests
63+
wasm-pack test --chrome --headless # Run WASM tests in Chrome
6264
```
6365

6466
### Code Coverage
@@ -71,14 +73,18 @@ cargo llvm-cov --workspace --lcov --output-path lcov.info
7173

7274
The codebase is organized into several key modules:
7375

74-
- **src/compaction/**: LSM tree compaction strategies and implementation
76+
- **src/compaction/**: LSM tree compaction strategies (leveled compaction implementation)
7577
- **src/fs/**: File system abstractions supporting multiple backends (local, S3, OPFS)
7678
- **src/inmem/**: In-memory data structures including mutable and immutable memtables
7779
- **src/ondisk/**: On-disk storage using SSTable format with Arrow/Parquet
7880
- **src/record/**: Record types, schemas, and the `#[derive(Record)]` macro system
79-
- **src/stream/**: Streaming operations for efficient data processing
81+
- **dynamic/**: Runtime-defined record schemas with Value enum for flexible typing
82+
- **key/**: Type implementations for primary keys (strings, numbers, timestamps, lists)
83+
- **src/stream/**: Streaming operations for efficient data processing and merging
8084
- **src/version/**: Version management and multi-version concurrency control
8185
- **src/wal/**: Write-ahead logging for durability
86+
- **src/transaction.rs**: Transaction support with optimistic concurrency control
87+
- **src/manifest.rs**: Database manifest management
8288

8389
The project uses procedural macros (in `tonbo_macros/`) to provide a type-safe API where users define their key-value schema using Rust structs.
8490

@@ -89,38 +95,35 @@ The project uses procedural macros (in `tonbo_macros/`) to provide a type-safe A
8995
2. **Feature Flags**: Important features include:
9096
- `tokio`: Async filesystem operations (default)
9197
- `wasm`: WebAssembly support with OPFS backend
92-
- `datafusion`: SQL query support
93-
- `aws`: S3 storage backend
94-
- `bench`: Enables comparison benchmarks
98+
- `datafusion`: SQL query support via Apache DataFusion
99+
- `aws`: S3 storage backend support
100+
- `bench`: Enables comparison benchmarks against RocksDB, Sled, and Redb
101+
- `bytes`: Bytes type support (default)
95102

96-
3. **Type System**: Records must derive from `tonbo::Record` trait. The macro generates necessary serialization and Arrow schema implementations.
103+
3. **Type System**: Records must derive from `tonbo::Record` trait. The macro generates necessary serialization and Arrow schema implementations. Dynamic records are also supported for runtime-defined schemas.
97104

98-
4. **Testing**: Tests should cover both sync and async APIs when applicable. Integration tests go in `tests/` directory.
105+
4. **Testing**:
106+
- Unit tests use `#[tokio::test]` for async code
107+
- Integration tests in `tests/` directory
108+
- Macro correctness tests using trybuild in `tests/success/` and `tests/fail/`
109+
- WASM-specific tests in `tests/wasm.rs`
99110

100-
5. **Error Handling**: Uses custom error types defined in `src/error.rs`. Always propagate errors appropriately using `?`.
111+
5. **Error Handling**: Uses custom error types. Always propagate errors appropriately using `?`.
101112

102113
## Working with the Codebase
103114

104115
1. **Adding New Features**: Check feature flags in `Cargo.toml` and ensure proper conditional compilation with `#[cfg(feature = "...")]`.
105116

106-
2. **Modifying Storage Layer**: Changes to on-disk format should maintain backward compatibility or increment version numbers appropriately.
117+
2. **Modifying Storage Layer**: Changes to on-disk format should maintain backward compatibility or increment version numbers appropriately. The magic number in `src/magic.rs` helps identify file format versions.
107118

108-
3. **Performance**: Run benchmarks before and after changes that might impact performance. Compare against baseline using `cargo bench`.
119+
3. **Performance**: Run benchmarks before and after changes that might impact performance. Use `cargo bench --features bench` to compare against other embedded databases.
109120

110-
4. **Documentation**: Update both inline documentation and the guide (in `guide/` directory) for user-facing changes.
111-
112-
5. **Cross-Platform**: Ensure changes work across all supported platforms (Linux, macOS, Windows) and targets (native, WASM).
113-
114-
## Current Development Focus
115-
116-
Based on recent commits, the project is actively developing:
117-
- Date/time type support (date32/date64/time32/time64)
118-
- Dynamic record implementation
119-
- Enhanced type system flexibility
121+
4. **Cross-Platform**: Ensure changes work across all supported platforms (Linux, macOS, Windows) and targets (native, WASM).
120122

121123
## Important Notes
122124

123125
- Rust toolchain version is pinned to 1.85 in `rust-toolchain.toml`
124-
- The project uses specific git revisions for fusio-* dependencies
126+
- The project uses fusio crates (v0.4.0) for pluggable I/O and storage backends
125127
- Formatting is strict - always run `cargo +nightly fmt` before committing
126-
- The project follows semantic versioning (currently at 0.3.2)
128+
- The project follows semantic versioning (currently at 0.3.2)
129+
- Default features include: aws, bytes, tokio, tokio-http, async-trait

Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ tokio = [
4040
"parquet/default",
4141
"tokio/fs",
4242
"tokio/rt-multi-thread",
43+
"dep:tokio",
4344
]
4445
tokio-http = ["fusio/tokio-http", "fusio-log/tokio-http"]
4546
wasm = ["aws", "bytes", "opfs", "wasm-http", "dep:async-trait"]
@@ -93,6 +94,7 @@ fusio-log = { version = "0.4.0", default-features = false, features = [
9394
"bytes",
9495
] }
9596
fusio-parquet = { version = "0.4.0" }
97+
futures = "0.3"
9698
futures-core = "0.3"
9799
futures-util = "0.3"
98100
itertools = { version = "0.14.0" }
@@ -108,8 +110,10 @@ parquet = { version = "55", default-features = false, features = [
108110
] }
109111
parquet-lru = { version = "0.3.0", path = "parquet-lru" }
110112
pin-project-lite = "0.2"
111-
thiserror = "2.0.3"
112-
tokio = { version = "1", features = ["io-util"], default-features = false }
113+
thiserror = "2.0.12"
114+
tokio = { version = "1.47.1", features = [
115+
"io-util",
116+
], default-features = false, optional = true }
113117
tonbo_macros = { version = "0.3.1", path = "tonbo_macros" }
114118
tracing = "0.1"
115119
ulid = { version = "1", features = ["serde"] }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async fn main() {
6363
&UserSchema,
6464
);
6565

66-
let db = DB::new(options, TokioExecutor::current(), UserSchema)
66+
let db = DB::new(options, TokioExecutor::default(), UserSchema)
6767
.await
6868
.unwrap();
6969

benches/common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ impl BenchDatabase for TonboS3BenchDataBase {
286286
.disable_wal();
287287

288288
TonboS3BenchDataBase::new(
289-
tonbo::DB::new(option, TokioExecutor::current(), CustomerSchema)
289+
tonbo::DB::new(option, TokioExecutor::default(), CustomerSchema)
290290
.await
291291
.unwrap(),
292292
)
@@ -339,7 +339,7 @@ impl BenchDatabase for TonboBenchDataBase {
339339
)
340340
.disable_wal();
341341

342-
let db = tonbo::DB::new(option, TokioExecutor::current(), CustomerSchema)
342+
let db = tonbo::DB::new(option, TokioExecutor::default(), CustomerSchema)
343343
.await
344344
.unwrap();
345345
TonboBenchDataBase::new(db)

benches/criterion/writes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ fn single_write(c: &mut Criterion) {
6464
)
6565
.disable_wal();
6666
let db = runtime
67-
.block_on(async { DB::new(option, TokioExecutor::current(), KVSchema).await })
67+
.block_on(async { DB::new(option, TokioExecutor::default(), KVSchema).await })
6868
.unwrap();
6969

7070
group.bench_with_input(BenchmarkId::new("Tonbo", batch), &batch, |b, batch| {

bindings/js/src/transaction.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{mem::transmute, sync::Arc};
33
use futures::StreamExt;
44
use js_sys::Object;
55
use tonbo::{
6+
executor::opfs::OpfsExecutor,
67
record::{DynRecord, DynamicField},
78
transaction, Projection,
89
};
@@ -15,22 +16,22 @@ use crate::{
1516

1617
#[wasm_bindgen]
1718
pub struct Transaction {
18-
txn: Option<transaction::Transaction<'static, DynRecord>>,
19+
txn: Option<transaction::Transaction<'static, DynRecord, OpfsExecutor>>,
1920
desc: Arc<Vec<DynamicField>>,
2021
primary_key_index: usize,
2122
}
2223

2324
impl Transaction {
2425
pub(crate) fn new<'txn>(
25-
txn: transaction::Transaction<'txn, DynRecord>,
26+
txn: transaction::Transaction<'txn, DynRecord, OpfsExecutor>,
2627
desc: Arc<Vec<DynamicField>>,
2728
primary_key_index: usize,
2829
) -> Self {
2930
Transaction {
3031
txn: Some(unsafe {
3132
transmute::<
32-
transaction::Transaction<'txn, DynRecord>,
33-
transaction::Transaction<'static, DynRecord>,
33+
transaction::Transaction<'txn, DynRecord, OpfsExecutor>,
34+
transaction::Transaction<'static, DynRecord, OpfsExecutor>,
3435
>(txn)
3536
}),
3637
desc,
@@ -123,8 +124,8 @@ impl Transaction {
123124
let txn = self.txn.as_ref().unwrap();
124125
let txn = unsafe {
125126
transmute::<
126-
&transaction::Transaction<'_, DynRecord>,
127-
&'static transaction::Transaction<'_, DynRecord>,
127+
&transaction::Transaction<'_, DynRecord, OpfsExecutor>,
128+
&'static transaction::Transaction<'_, DynRecord, OpfsExecutor>,
128129
>(txn)
129130
};
130131
let mut scan = txn

bindings/js/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub(crate) fn to_record(schema: &[DynamicField], cols: &[Value]) -> JsValue {
7171
Value::Float64(v) => (*v).into(),
7272
Value::String(v) => v.to_owned().into(),
7373
Value::Binary(v) => v.to_vec().into(),
74-
Value::FixedSizeBinary(v, w) => v.to_vec().into(),
74+
Value::FixedSizeBinary(v, _w) => v.to_vec().into(),
7575
Value::Date32(_)
7676
| Value::Date64(_)
7777
| Value::List(_, _)

bindings/python/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl TonboDB {
5858
let schema = DynSchema::new(desc, primary_key_index.unwrap());
5959
let option = option.into_option(&schema);
6060
let db = get_runtime()
61-
.block_on(async { DB::new(option, TokioExecutor::current(), schema).await })
61+
.block_on(async { DB::new(option, TokioExecutor::default(), schema).await })
6262
.unwrap();
6363
Ok(Self {
6464
db: Arc::new(db),

bindings/python/src/transaction.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use pyo3::{
77
};
88
use pyo3_async_runtimes::tokio::future_into_py;
99
use tonbo::{
10+
executor::tokio::TokioExecutor,
1011
record::{DynRecord, Value},
1112
transaction, Projection,
1213
};
@@ -21,14 +22,14 @@ use crate::{
2122

2223
#[pyclass]
2324
pub struct Transaction {
24-
txn: Option<transaction::Transaction<'static, DynRecord>>,
25+
txn: Option<transaction::Transaction<'static, DynRecord, TokioExecutor>>,
2526
desc: Arc<Vec<Column>>,
2627
primary_key_index: usize,
2728
}
2829

2930
impl Transaction {
3031
pub(crate) fn new<'txn>(
31-
txn: transaction::Transaction<'txn, DynRecord>,
32+
txn: transaction::Transaction<'txn, DynRecord, TokioExecutor>,
3233
desc: Arc<Vec<Column>>,
3334
) -> Self {
3435
let primary_key_index = desc
@@ -40,8 +41,8 @@ impl Transaction {
4041
Transaction {
4142
txn: Some(unsafe {
4243
transmute::<
43-
transaction::Transaction<'txn, DynRecord>,
44-
transaction::Transaction<'static, DynRecord>,
44+
transaction::Transaction<'txn, DynRecord, TokioExecutor>,
45+
transaction::Transaction<'static, DynRecord, TokioExecutor>,
4546
>(txn)
4647
}),
4748
desc,
@@ -87,8 +88,8 @@ impl Transaction {
8788
let txn = self.txn.as_ref().unwrap();
8889
let txn = unsafe {
8990
transmute::<
90-
&transaction::Transaction<'_, DynRecord>,
91-
&'static transaction::Transaction<'_, DynRecord>,
91+
&transaction::Transaction<'_, DynRecord, TokioExecutor>,
92+
&'static transaction::Transaction<'_, DynRecord, TokioExecutor>,
9293
>(txn)
9394
};
9495

@@ -176,8 +177,8 @@ impl Transaction {
176177
let txn = self.txn.as_ref().unwrap();
177178
let txn = unsafe {
178179
transmute::<
179-
&transaction::Transaction<'_, DynRecord>,
180-
&'static transaction::Transaction<'_, DynRecord>,
180+
&transaction::Transaction<'_, DynRecord, TokioExecutor>,
181+
&'static transaction::Transaction<'_, DynRecord, TokioExecutor>,
181182
>(txn)
182183
};
183184
let col_desc = self.desc.get(self.primary_key_index).unwrap();

examples/datafusion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ async fn main() -> Result<()> {
238238
&MusicSchema,
239239
);
240240

241-
let db = DB::new(options, TokioExecutor::current(), MusicSchema)
241+
let db = DB::new(options, TokioExecutor::default(), MusicSchema)
242242
.await
243243
.unwrap();
244244
for (id, name, like) in [

0 commit comments

Comments
 (0)