Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
workspace = { members = ["parquet-lru", "tonbo_macros"] }

[workspace]
members = ["parquet-lru", "tonbo_macros", "common"]
[workspace.dependencies]
arrow = "55"
fusio = { git = "https://github.com/tonbo-io/fusio", rev = "278eb79091b24df29eb9f3ac78ae6c3305ea3ee6", version = "0.3.8", package = "fusio", features = [
"dyn",
"fs",
] }
fusio-log = { git = "https://github.com/tonbo-io/fusio", rev = "278eb79091b24df29eb9f3ac78ae6c3305ea3ee6", version = "0.3.8", package = "fusio-log", default-features = false, features = [
"bytes",
] }

[package]
description = "An embedded persistent KV database in Rust."
Expand All @@ -14,14 +25,15 @@ version = "0.3.2"
msrv = "1.79.0"

[features]
aws = ["fusio-dispatch/aws", "fusio-log/aws", "fusio/aws"]
aws = ["common/aws", "fusio-dispatch/aws", "fusio-log/aws", "fusio/aws"]
bench = ["redb", "rocksdb", "sled"]
bytes = ["dep:bytes"]
datafusion = ["dep:async-trait", "dep:datafusion"]
default = ["aws", "bytes", "tokio", "tokio-http"]
load_tbl = []
object-store = ["fusio/object_store"]
opfs = [
"common/web",
"dep:wasm-bindgen-futures",
"fusio-dispatch/opfs",
"fusio-log/web",
Expand All @@ -33,6 +45,7 @@ rocksdb = ["dep:rocksdb"]
sled = ["dep:sled"]
sync = ["fusio/sync"]
tokio = [
"common/tokio",
"fusio-dispatch/tokio",
"fusio-log/tokio",
"fusio-parquet/tokio",
Expand Down Expand Up @@ -75,12 +88,13 @@ path = "benches/criterion/writes.rs"
required-features = ["sled"]

[dependencies]
arrow = "55"
arrow = { workspace = true }
async-lock = "3"
async-stream = "0.3"
async-trait = { version = "0.1", optional = true }
bytes = { version = "1.7", optional = true }
chrono = { version = "0.4", default-features = false, features = ["wasmbind"] }
common = { version = "0.3.2", path = "common" }
crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "47", optional = true }
Expand All @@ -90,9 +104,7 @@ fusio = { git = "https://github.com/tonbo-io/fusio", rev = "278eb79091b24df29eb9
"fs",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "278eb79091b24df29eb9f3ac78ae6c3305ea3ee6", version = "0.3.8", package = "fusio-dispatch" }
fusio-log = { git = "https://github.com/tonbo-io/fusio", rev = "278eb79091b24df29eb9f3ac78ae6c3305ea3ee6", version = "0.3.8", package = "fusio-log", default-features = false, features = [
"bytes",
] }
fusio-log = { workspace = true }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio", rev = "278eb79091b24df29eb9f3ac78ae6c3305ea3ee6", version = "0.3.8", package = "fusio-parquet" }
futures-core = "0.3"
futures-util = "0.3"
Expand Down
52 changes: 26 additions & 26 deletions bindings/js/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,40 @@ pub enum DataType {
Float64 = 12,
}

impl From<DataType> for tonbo::record::DataType {
impl From<DataType> for tonbo::datatype::DataType {
fn from(datatype: DataType) -> Self {
match datatype {
DataType::UInt8 => tonbo::record::DataType::UInt8,
DataType::UInt16 => tonbo::record::DataType::UInt16,
DataType::UInt32 => tonbo::record::DataType::UInt32,
DataType::UInt64 => tonbo::record::DataType::UInt64,
DataType::Int8 => tonbo::record::DataType::Int8,
DataType::Int16 => tonbo::record::DataType::Int16,
DataType::Int32 => tonbo::record::DataType::Int32,
DataType::Int64 => tonbo::record::DataType::Int64,
DataType::String => tonbo::record::DataType::String,
DataType::Boolean => tonbo::record::DataType::Boolean,
DataType::Float32 => tonbo::record::DataType::Float32,
DataType::Float64 => tonbo::record::DataType::Float64,
DataType::UInt8 => tonbo::datatype::DataType::UInt8,
DataType::UInt16 => tonbo::datatype::DataType::UInt16,
DataType::UInt32 => tonbo::datatype::DataType::UInt32,
DataType::UInt64 => tonbo::datatype::DataType::UInt64,
DataType::Int8 => tonbo::datatype::DataType::Int8,
DataType::Int16 => tonbo::datatype::DataType::Int16,
DataType::Int32 => tonbo::datatype::DataType::Int32,
DataType::Int64 => tonbo::datatype::DataType::Int64,
DataType::String => tonbo::datatype::DataType::String,
DataType::Boolean => tonbo::datatype::DataType::Boolean,
DataType::Float32 => tonbo::datatype::DataType::Float32,
DataType::Float64 => tonbo::datatype::DataType::Float64,
_ => todo!(),
}
}
}

pub(crate) fn to_datatype(datatype: &str) -> tonbo::record::DataType {
pub(crate) fn to_datatype(datatype: &str) -> tonbo::datatype::DataType {
match datatype {
"UInt8" => tonbo::record::DataType::UInt8,
"UInt16" => tonbo::record::DataType::UInt16,
"UInt32" => tonbo::record::DataType::UInt32,
"UInt64" => tonbo::record::DataType::UInt64,
"Int8" => tonbo::record::DataType::Int8,
"Int16" => tonbo::record::DataType::Int16,
"Int32" => tonbo::record::DataType::Int32,
"Int64" => tonbo::record::DataType::Int64,
"String" => tonbo::record::DataType::String,
"Boolean" => tonbo::record::DataType::Boolean,
"Float32" => tonbo::record::DataType::Float32,
"Float64" => tonbo::record::DataType::Float64,
"UInt8" => tonbo::datatype::DataType::UInt8,
"UInt16" => tonbo::datatype::DataType::UInt16,
"UInt32" => tonbo::datatype::DataType::UInt32,
"UInt64" => tonbo::datatype::DataType::UInt64,
"Int8" => tonbo::datatype::DataType::Int8,
"Int16" => tonbo::datatype::DataType::Int16,
"Int32" => tonbo::datatype::DataType::Int32,
"Int64" => tonbo::datatype::DataType::Int64,
"String" => tonbo::datatype::DataType::String,
"Boolean" => tonbo::datatype::DataType::Boolean,
"Float32" => tonbo::datatype::DataType::Float32,
"Float64" => tonbo::datatype::DataType::Float64,
_ => todo!(),
}
}
9 changes: 4 additions & 5 deletions bindings/js/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{any::Any, sync::Arc};
use js_sys::{Object, Reflect, Uint8Array};
use tonbo::{
cast_arc_value,
record::{DataType, DynRecord, Value, ValueDesc, F32, F64},
datatype::DataType,
record::{DynRecord, Value, ValueDesc},
F32, F64,
};
use wasm_bindgen::{JsCast, JsValue};

Expand Down Expand Up @@ -188,10 +190,7 @@ pub(crate) fn to_record(cols: &Vec<Value>, primary_key_index: usize) -> JsValue
#[cfg(test)]
mod tests {

use tonbo::{
cast_arc_value,
record::{DataType, ValueDesc, F64},
};
use tonbo::{cast_arc_value, datatype::DataType, record::ValueDesc, F64};
use wasm_bindgen::JsValue;
use wasm_bindgen_test::wasm_bindgen_test;

Expand Down
5 changes: 4 additions & 1 deletion bindings/python/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
};

use pyo3::{pyclass, pymethods};
use tonbo::record::{DataType as TonboDataType, Value, ValueDesc};
use tonbo::{
datatype::DataType as TonboDataType,
record::{Value, ValueDesc},
};

use crate::datatype::DataType;

Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use pyo3::pyclass;
use tonbo::record::{DataType as TonboDataType, F64};
use tonbo::{datatype::DataType as TonboDataType, F64};

#[pyclass]
#[derive(PartialEq, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pyo3::{
types::{PyDict, PyMapping, PyString, PyTuple},
Bound,
};
use tonbo::record::F64;
use tonbo::F64;

use crate::{column::Column, datatype::DataType};

Expand Down
5 changes: 1 addition & 4 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use pyo3::{
types::{PyBytes, PyDict, PyDictMethods},
Bound, Py, PyAny, Python,
};
use tonbo::{
cast_arc_value,
record::{DataType as TonboDataType, Value, F64},
};
use tonbo::{cast_arc_value, datatype::DataType as TonboDataType, record::Value, F64};

use crate::{column::Column, datatype::DataType, range};

Expand Down
25 changes: 25 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
edition = "2021"
license = "Apache-2.0"
name = "common"
version = "0.3.2"

[features]
aws = ["fusio-log/aws"]
default = []
tokio = [
"fusio-log/tokio",
]
web = ["fusio-log/web"]


[dependencies]
arrow = { workspace = true }
bytes = { version = "1.8.0", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["wasmbind"] }
fusio = { workspace = true }
fusio-log = { workspace = true }
ulid = { version = "1", features = ["serde"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1", features = ["full"] }
153 changes: 153 additions & 0 deletions common/src/datatype.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use arrow::datatypes::DataType as ArrowDataType;
use fusio_log::{Decode, Encode};

use crate::key::TimeUnit;

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum DataType {
UInt8,
UInt16,
UInt32,
UInt64,
Int8,
Int16,
Int32,
Int64,
String,
LargeString,
Boolean,
Bytes,
LargeBinary,
Float32,
Float64,
Timestamp(TimeUnit),
/// representing the elapsed time since midnight in the unit of
/// `TimeUnit`. Must be either seconds or milliseconds.
Time32(TimeUnit),
/// representing the elapsed time since midnight in the unit of `TimeUnit`. Must be either
/// microseconds or nanoseconds.
///
/// See more details in [`arrow::datatypes::DataType::Time64`].
Time64(TimeUnit),
/// representing the elapsed time since UNIX epoch (1970-01-01)
/// in days.
Date32,
/// A signed 64-bit date representing the elapsed time since UNIX epoch (1970-01-01)
/// in milliseconds.
///
/// See [`arrow::datatypes::DataType::Date64`] for more details.
Date64,
}

impl From<&ArrowDataType> for DataType {
fn from(datatype: &ArrowDataType) -> Self {
match datatype {
ArrowDataType::UInt8 => DataType::UInt8,
ArrowDataType::UInt16 => DataType::UInt16,

Check warning on line 46 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L45-L46

Added lines #L45 - L46 were not covered by tests
ArrowDataType::UInt32 => DataType::UInt32,
ArrowDataType::UInt64 => DataType::UInt64,
ArrowDataType::Int8 => DataType::Int8,
ArrowDataType::Int16 => DataType::Int16,
ArrowDataType::Int32 => DataType::Int32,
ArrowDataType::Int64 => DataType::Int64,
ArrowDataType::Float32 => DataType::Float32,
ArrowDataType::Float64 => DataType::Float64,
ArrowDataType::Utf8 => DataType::String,
ArrowDataType::Boolean => DataType::Boolean,
ArrowDataType::Binary => DataType::Bytes,
ArrowDataType::Timestamp(unit, tz) => {
debug_assert!(tz.is_none(), "expected timezone is none, get {:?}", tz);
DataType::Timestamp(unit.into())
}
ArrowDataType::Time32(unit) => DataType::Time32(unit.into()),
ArrowDataType::Time64(unit) => DataType::Time64(unit.into()),
ArrowDataType::Date32 => DataType::Date32,
ArrowDataType::Date64 => DataType::Date64,
ArrowDataType::LargeBinary => DataType::LargeBinary,
ArrowDataType::LargeUtf8 => DataType::LargeString,
_ => todo!(),

Check warning on line 68 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L62-L68

Added lines #L62 - L68 were not covered by tests
}
}
}

impl Encode for DataType {
type Error = fusio::Error;

async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
where
W: fusio::Write,
{
match self {
DataType::UInt8 => 0u8.encode(writer).await,
DataType::UInt16 => 1u8.encode(writer).await,
DataType::UInt32 => 2u8.encode(writer).await,
DataType::UInt64 => 3u8.encode(writer).await,
DataType::Int8 => 4u8.encode(writer).await,
DataType::Int16 => 5u8.encode(writer).await,
DataType::Int32 => 6u8.encode(writer).await,
DataType::Int64 => 7u8.encode(writer).await,
DataType::String => 8u8.encode(writer).await,
DataType::Boolean => 9u8.encode(writer).await,
DataType::Bytes => 10u8.encode(writer).await,
DataType::Float32 => 11u8.encode(writer).await,
DataType::Float64 => 12u8.encode(writer).await,
DataType::Timestamp(TimeUnit::Second) => 13u8.encode(writer).await,
DataType::Timestamp(TimeUnit::Millisecond) => 14u8.encode(writer).await,
DataType::Timestamp(TimeUnit::Microsecond) => 15u8.encode(writer).await,
DataType::Timestamp(TimeUnit::Nanosecond) => 16u8.encode(writer).await,
DataType::Time32(TimeUnit::Second) => 17u8.encode(writer).await,
DataType::Time32(TimeUnit::Millisecond) => 18u8.encode(writer).await,
DataType::Time64(TimeUnit::Microsecond) => 19u8.encode(writer).await,
DataType::Time64(TimeUnit::Nanosecond) => 20u8.encode(writer).await,
DataType::Date32 => 21u8.encode(writer).await,
DataType::Date64 => 22u8.encode(writer).await,
DataType::LargeBinary => 23u8.encode(writer).await,
DataType::LargeString => 24u8.encode(writer).await,
DataType::Time32(_) | DataType::Time64(_) => unreachable!(),

Check warning on line 106 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L76-L106

Added lines #L76 - L106 were not covered by tests
}
}

Check warning on line 108 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L108

Added line #L108 was not covered by tests

fn size(&self) -> usize {
1
}

Check warning on line 112 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L110-L112

Added lines #L110 - L112 were not covered by tests
}

impl Decode for DataType {
type Error = fusio::Error;

async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
where
R: fusio::SeqRead,
{
let tag = u8::decode(reader).await?;
let data_type = match tag {
0 => DataType::UInt8,
1 => DataType::UInt16,
2 => DataType::UInt32,
3 => DataType::UInt64,
4 => DataType::Int8,
5 => DataType::Int16,
6 => DataType::Int32,
7 => DataType::Int64,
8 => DataType::String,
9 => DataType::Boolean,
10 => DataType::Bytes,
11 => DataType::Float32,
12 => DataType::Float64,
13 => DataType::Timestamp(TimeUnit::Second),
14 => DataType::Timestamp(TimeUnit::Millisecond),
15 => DataType::Timestamp(TimeUnit::Millisecond),
16 => DataType::Timestamp(TimeUnit::Nanosecond),
17 => DataType::Time32(TimeUnit::Second),
18 => DataType::Time32(TimeUnit::Millisecond),
19 => DataType::Time64(TimeUnit::Microsecond),
20 => DataType::Time64(TimeUnit::Nanosecond),
21 => DataType::Date32,
22 => DataType::Date64,
23 => DataType::LargeBinary,
24 => DataType::LargeString,
_ => panic!("invalid datatype tag"),

Check warning on line 149 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L118-L149

Added lines #L118 - L149 were not covered by tests
};
Ok(data_type)
}

Check warning on line 152 in common/src/datatype.rs

View check run for this annotation

Codecov / codecov/patch

common/src/datatype.rs#L151-L152

Added lines #L151 - L152 were not covered by tests
}
2 changes: 1 addition & 1 deletion src/record/key/datetime.rs → common/src/key/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow::array::{
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use fusio_log::{Decode, Encode};

use crate::record::{Key, KeyRef, TimeUnit};
use crate::key::{Key, KeyRef, TimeUnit};

/// Number of seconds in a day
pub const SECONDS_IN_DAY: i64 = 86_400;
Expand Down
File renamed without changes.
File renamed without changes.
Loading
Loading