Skip to content

Commit 4dcc968

Browse files
committed
refactor: remove schema trait
1 parent fbe90c0 commit 4dcc968

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+846
-1175
lines changed

bindings/js/src/db.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use std::{mem::transmute, sync::Arc};
33
use futures::TryStreamExt;
44
use js_sys::{Array, Function, JsString, Object, Reflect};
55
use tonbo::{
6+
arrow::datatypes::Field,
67
executor::opfs::OpfsExecutor,
7-
record::{DynRecord, DynSchema, ValueDesc},
8+
record::{DynRecord, Schema, ValueDesc},
89
DB,
910
};
1011
use wasm_bindgen::prelude::*;
@@ -78,9 +79,13 @@ impl TonboDB {
7879
#[wasm_bindgen(constructor)]
7980
pub async fn new(option: DbOption, schema: Object) -> Self {
8081
let (desc, primary_key_index) = Self::parse_schema(schema);
81-
let schema = DynSchema::new(desc.clone(), primary_key_index);
82+
let schema = Schema::new(
83+
desc.iter()
84+
.map(|v| Field::new(v.name.as_str(), (&v.datatype).into(), v.is_nullable)).collect(),
85+
primary_key_index,
86+
);
8287

83-
let db = DB::new(option.into_option(&schema), JsExecutor::new(), schema)
88+
let db = DB::new(option.into_option(), JsExecutor::new(), schema)
8489
.await
8590
.unwrap();
8691

@@ -297,7 +302,8 @@ mod tests {
297302
&JsValue::from_str(i.to_string().as_str()),
298303
)
299304
.unwrap();
300-
js_sys::Reflect::set(&item, &JsValue::from_str("price"), &JsValue::from(i as f64)).unwrap();
305+
js_sys::Reflect::set(&item, &JsValue::from_str("price"), &JsValue::from(i as f64))
306+
.unwrap();
301307

302308
items.push(item);
303309
}

bindings/js/src/options.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use tonbo::{option::Path, record::Schema};
1+
use tonbo::option::Path;
22
use wasm_bindgen::{prelude::wasm_bindgen, JsValue};
33

44
use crate::FsOptions;
@@ -64,8 +64,8 @@ impl DbOption {
6464
}
6565

6666
impl DbOption {
67-
pub(crate) fn into_option<S: Schema>(self, schema: &S) -> tonbo::DbOption {
68-
let mut opt = tonbo::DbOption::new(Path::from(self.path), schema)
67+
pub(crate) fn into_option(self) -> tonbo::DbOption {
68+
let mut opt = tonbo::DbOption::new(Path::from(self.path))
6969
.clean_channel_buffer(self.clean_channel_buffer)
7070
.immutable_chunk_num(self.immutable_chunk_num)
7171
.level_sst_magnification(self.level_sst_magnification)

bindings/python/src/column.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use std::{
55
};
66

77
use pyo3::{pyclass, pymethods};
8-
use tonbo::record::{DataType as TonboDataType, Value, ValueDesc};
8+
use tonbo::{
9+
arrow::datatypes::{DataType as ArrowDataType, Field},
10+
record::{DataType as TonboDataType, Value},
11+
};
912

1013
use crate::datatype::DataType;
1114

@@ -58,12 +61,13 @@ impl Display for Column {
5861
}
5962
}
6063

61-
impl From<Column> for ValueDesc {
64+
impl From<Column> for Field {
6265
fn from(col: Column) -> Self {
63-
let datatype = TonboDataType::from(col.datatype);
64-
ValueDesc::new(col.name, datatype, col.nullable)
66+
let datatype = ArrowDataType::from(col.datatype);
67+
Field::new(col.name, datatype, col.nullable)
6568
}
6669
}
70+
6771
impl From<Column> for Value {
6872
fn from(col: Column) -> Self {
6973
let datatype = TonboDataType::from(col.datatype);

bindings/python/src/datatype.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use std::{
55
};
66

77
use pyo3::pyclass;
8-
use tonbo::record::{DataType as TonboDataType, F64};
8+
use tonbo::{
9+
arrow::datatypes::DataType as ArrowDataType,
10+
record::{DataType as TonboDataType, F64},
11+
};
912

1013
#[pyclass]
1114
#[derive(PartialEq, Clone)]
@@ -81,6 +84,25 @@ impl From<DataType> for TonboDataType {
8184
}
8285
}
8386

87+
impl From<DataType> for ArrowDataType {
88+
fn from(datatype: DataType) -> Self {
89+
match datatype {
90+
DataType::UInt8 => ArrowDataType::UInt8,
91+
DataType::UInt16 => ArrowDataType::UInt16,
92+
DataType::UInt32 => ArrowDataType::UInt32,
93+
DataType::UInt64 => ArrowDataType::UInt64,
94+
DataType::Int8 => ArrowDataType::Int8,
95+
DataType::Int16 => ArrowDataType::Int16,
96+
DataType::Int32 => ArrowDataType::Int32,
97+
DataType::Int64 => ArrowDataType::Int64,
98+
DataType::String => ArrowDataType::Utf8,
99+
DataType::Boolean => ArrowDataType::Boolean,
100+
DataType::Bytes => ArrowDataType::Binary,
101+
DataType::Float => ArrowDataType::Float64,
102+
}
103+
}
104+
}
105+
84106
impl From<&DataType> for TonboDataType {
85107
fn from(datatype: &DataType) -> Self {
86108
match datatype {

bindings/python/src/db.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use pyo3::{
88
};
99
use pyo3_async_runtimes::tokio::{future_into_py, get_runtime};
1010
use tonbo::{
11+
arrow::datatypes::Field,
1112
executor::tokio::TokioExecutor,
12-
record::{DynRecord, DynSchema, Value, ValueDesc},
13+
record::{DynRecord, Schema, Value},
1314
DB,
1415
};
1516

@@ -52,11 +53,11 @@ impl TonboDB {
5253
primary_key_index = Some(desc.len());
5354
}
5455
cols.push(col.clone());
55-
desc.push(ValueDesc::from(col));
56+
desc.push(Field::from(col));
5657
}
5758
}
58-
let schema = DynSchema::new(desc, primary_key_index.unwrap());
59-
let option = option.into_option(&schema);
59+
let schema = Schema::new(desc, primary_key_index.unwrap());
60+
let option = option.into_option();
6061
let db = get_runtime()
6162
.block_on(async { DB::new(option, TokioExecutor::current(), schema).await })
6263
.unwrap();

bindings/python/src/options.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use pyo3::{pyclass, pymethods, PyResult};
2-
use tonbo::{option::Path, record::Schema};
2+
use tonbo::option::Path;
33

44
use crate::{ExceedsMaxLevelError, FsOptions};
55

@@ -71,8 +71,8 @@ impl DbOption {
7171
}
7272

7373
impl DbOption {
74-
pub(crate) fn into_option<S: Schema>(self, schema: &S) -> tonbo::DbOption {
75-
let mut opt = tonbo::DbOption::new(Path::from(self.path), schema)
74+
pub(crate) fn into_option(self) -> tonbo::DbOption {
75+
let mut opt = tonbo::DbOption::new(Path::from(self.path))
7676
.clean_channel_buffer(self.clean_channel_buffer)
7777
.immutable_chunk_num(self.immutable_chunk_num)
7878
.level_sst_magnification(self.level_sst_magnification)

examples/datafusion.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use futures_core::Stream;
3030
use futures_util::StreamExt;
3131
use tokio::fs;
3232
use tonbo::{
33-
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Schema, DbOption, DB,
33+
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
3434
};
3535
use tonbo_macros::Record;
3636

@@ -51,10 +51,7 @@ struct MusicExec {
5151
db: Arc<DB<Music, TokioExecutor>>,
5252
projection: Option<Vec<usize>>,
5353
limit: Option<usize>,
54-
range: (
55-
Bound<<MusicSchema as Schema>::Key>,
56-
Bound<<MusicSchema as Schema>::Key>,
57-
),
54+
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
5855
}
5956

6057
struct MusicStream {
@@ -76,7 +73,7 @@ impl TableProvider for MusicProvider {
7673
}
7774

7875
fn schema(&self) -> SchemaRef {
79-
MusicSchema {}.arrow_schema().clone()
76+
Music::arrow_schema().clone()
8077
}
8178

8279
fn table_type(&self) -> TableType {
@@ -109,7 +106,7 @@ impl TableProvider for MusicProvider {
109106

110107
impl MusicExec {
111108
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
112-
let schema = MusicSchema {}.arrow_schema();
109+
let schema = Music::arrow_schema();
113110
let schema = if let Some(projection) = &projection {
114111
Arc::new(schema.project(projection).unwrap())
115112
} else {
@@ -141,7 +138,7 @@ impl Stream for MusicStream {
141138

142139
impl RecordBatchStream for MusicStream {
143140
fn schema(&self) -> SchemaRef {
144-
MusicSchema {}.arrow_schema().clone()
141+
Music::arrow_schema().clone()
145142
}
146143
}
147144

@@ -229,12 +226,9 @@ async fn main() -> Result<()> {
229226
// make sure the path exists
230227
let _ = fs::create_dir_all("./db_path/music").await;
231228

232-
let options = DbOption::new(
233-
Path::from_filesystem_path("./db_path/music").unwrap(),
234-
&MusicSchema,
235-
);
229+
let options = DbOption::new(Path::from_filesystem_path("./db_path/music").unwrap());
236230

237-
let db = DB::new(options, TokioExecutor::current(), MusicSchema)
231+
let db = DB::new(options, TokioExecutor::current(), Music::schema())
238232
.await
239233
.unwrap();
240234
for (id, name, like) in [

examples/declare.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ async fn main() {
2323
// make sure the path exists
2424
let _ = fs::create_dir_all("./db_path/users").await;
2525

26-
let options = DbOption::new(
27-
Path::from_filesystem_path("./db_path/users").unwrap(),
28-
&UserSchema,
29-
);
26+
let options = DbOption::new(Path::from_filesystem_path("./db_path/users").unwrap());
3027
// pluggable async runtime and I/O
31-
let db = DB::new(options, TokioExecutor::current(), UserSchema)
28+
let db = DB::new(options, TokioExecutor::current(), User::schema())
3229
.await
3330
.unwrap();
3431

examples/dynamic.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
11
use std::{fs, sync::Arc};
22

3+
use arrow::datatypes::{DataType as ArrowDataType, Field};
34
use fusio::path::Path;
45
use tonbo::{
5-
dyn_record, dyn_schema,
6+
dyn_record,
67
executor::tokio::TokioExecutor,
7-
record::{DataType, Value},
8+
record::{DataType, Schema, Value},
89
DbOption, DB,
910
};
1011

1112
#[tokio::main]
1213
async fn main() {
1314
fs::create_dir_all("./db_path/users").unwrap();
1415

15-
let schema = dyn_schema!(("foo", String, false), ("bar", Int32, true), 0);
16-
17-
let options = DbOption::new(
18-
Path::from_filesystem_path("./db_path/users").unwrap(),
19-
&schema,
16+
let schema = Schema::new(
17+
vec![
18+
Field::new("foo", ArrowDataType::Utf8, false),
19+
Field::new("bar", ArrowDataType::Int32, true),
20+
],
21+
0,
2022
);
23+
24+
let options = DbOption::new(Path::from_filesystem_path("./db_path/users").unwrap());
25+
2126
let db = DB::new(options, TokioExecutor::current(), schema)
2227
.await
2328
.unwrap();

0 commit comments

Comments
 (0)