Skip to content

Commit a7b3d5f

Browse files
authored
chore: supplement decimal’s memcomparable encode (#256)
* chore: supplement decimal’s `memcomparable` encode * chore: fix server.rs
1 parent 7ed2e66 commit a7b3d5f

File tree

17 files changed

+301
-109
lines changed

17 files changed

+301
-109
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ run `cargo run -p tpcc --release` to run tpcc
6969

7070
- i9-13900HX
7171
- 32.0 GB
72-
- YMTC PC411-1024GB-B
72+
- KIOXIA-EXCERIA PLUS G3 SSD
7373
- Tips: TPC-C currently only supports single thread
7474
```shell
7575
<90th Percentile RT (MaxRT)>

src/bin/server.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,19 +172,25 @@ impl SimpleQueryHandler for SessionBackend {
172172
_ => {
173173
let mut guard = self.tx.lock();
174174

175-
let iter = if let Some(transaction) = guard.as_mut() {
176-
unsafe { transaction.as_mut().run(query) }.map(Box::new)
177-
as Result<Box<dyn ResultIter>, _>
178-
} else {
179-
self.inner.run(query).map(Box::new)
180-
}
181-
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
182-
183175
let mut tuples = Vec::new();
184-
for tuple in iter {
185-
tuples.push(tuple.map_err(|e| PgWireError::ApiError(Box::new(e)))?);
186-
}
187-
Ok(vec![Response::Query(encode_tuples(iter.schema(), tuples)?)])
176+
let response = if let Some(transaction) = guard.as_mut() {
177+
let mut iter = unsafe { transaction.as_mut().run(query) }
178+
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
179+
for tuple in iter.by_ref() {
180+
tuples.push(tuple.map_err(|e| PgWireError::ApiError(Box::new(e)))?);
181+
}
182+
encode_tuples(iter.schema(), tuples)?
183+
} else {
184+
let mut iter = self
185+
.inner
186+
.run(query)
187+
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
188+
for tuple in iter.by_ref() {
189+
tuples.push(tuple.map_err(|e| PgWireError::ApiError(Box::new(e)))?);
190+
}
191+
encode_tuples(iter.schema(), tuples)?
192+
};
193+
Ok(vec![Response::Query(response)])
188194
}
189195
}
190196
}
@@ -235,7 +241,9 @@ fn encode_tuples<'a>(schema: &Schema, tuples: Vec<Tuple>) -> PgWireResult<QueryR
235241
LogicalType::Date => encoder.encode_field(&value.date()),
236242
LogicalType::DateTime => encoder.encode_field(&value.datetime()),
237243
LogicalType::Time => encoder.encode_field(&value.time()),
238-
LogicalType::Decimal(_, _) => todo!(),
244+
LogicalType::Decimal(_, _) => {
245+
encoder.encode_field(&value.decimal().map(|decimal| decimal.to_string()))
246+
}
239247
_ => unreachable!(),
240248
}?;
241249
}
@@ -260,7 +268,7 @@ fn into_pg_type(data_type: &LogicalType) -> PgWireResult<Type> {
260268
LogicalType::Date | LogicalType::DateTime => Type::DATE,
261269
LogicalType::Char(..) => Type::CHAR,
262270
LogicalType::Time => Type::TIME,
263-
LogicalType::Decimal(_, _) => todo!(),
271+
LogicalType::Decimal(_, _) => Type::FLOAT8,
264272
_ => {
265273
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
266274
"ERROR".to_owned(),

src/binder/alter_table.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
6464
Childrens::Only(plan),
6565
)
6666
}
67-
AlterTableOperation::DropPrimaryKey => todo!(),
68-
AlterTableOperation::RenameColumn {
69-
old_column_name: _,
70-
new_column_name: _,
71-
} => todo!(),
72-
AlterTableOperation::RenameTable { table_name: _ } => todo!(),
73-
AlterTableOperation::ChangeColumn {
74-
old_name: _,
75-
new_name: _,
76-
data_type: _,
77-
options: _,
78-
} => todo!(),
79-
AlterTableOperation::AlterColumn {
80-
column_name: _,
81-
op: _,
82-
} => todo!(),
83-
_ => todo!(),
67+
op => {
68+
return Err(DatabaseError::UnsupportedStmt(format!(
69+
"AlertOperation: {:?}",
70+
op
71+
)))
72+
}
8473
};
8574

8675
Ok(plan)

src/binder/copy.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
8888
let ext_source = ExtSource {
8989
path: match target {
9090
CopyTarget::File { filename } => filename.into(),
91-
t => todo!("unsupported copy target: {:?}", t),
91+
t => {
92+
return Err(DatabaseError::UnsupportedStmt(format!(
93+
"copy target: {:?}",
94+
t
95+
)))
96+
}
9297
},
9398
format: FileFormat::from_options(options),
9499
};

src/binder/create_table.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
use itertools::Itertools;
2-
use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, TableConstraint};
3-
use std::collections::HashSet;
4-
use std::sync::Arc;
5-
61
use super::{is_valid_identifier, Binder};
72
use crate::binder::lower_case_name;
83
use crate::catalog::{ColumnCatalog, ColumnDesc};
@@ -14,6 +9,10 @@ use crate::planner::{Childrens, LogicalPlan};
149
use crate::storage::Transaction;
1510
use crate::types::value::DataValue;
1611
use crate::types::LogicalType;
12+
use itertools::Itertools;
13+
use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, TableConstraint};
14+
use std::collections::HashSet;
15+
use std::sync::Arc;
1716

1817
impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A> {
1918
// TODO: TableConstraint
@@ -75,7 +74,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
7574
}
7675
}
7776
}
78-
_ => todo!(),
77+
constraint => {
78+
return Err(DatabaseError::UnsupportedStmt(format!(
79+
"`CreateTable` does not currently support this constraint: {:?}",
80+
constraint
81+
)))?
82+
}
7983
}
8084
}
8185

@@ -140,7 +144,12 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
140144
}
141145
column_desc.default = Some(expr);
142146
}
143-
_ => todo!(),
147+
option => {
148+
return Err(DatabaseError::UnsupportedStmt(format!(
149+
"`Column` does not currently support this option: {:?}",
150+
option
151+
)))
152+
}
144153
}
145154
}
146155

src/binder/expr.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
5656
.find_map(|(key, value)| (key == name).then(|| value.clone()))
5757
.ok_or_else(|| DatabaseError::ParametersNotFound(name.to_string()))?
5858
} else {
59-
v.into()
59+
v.try_into()?
6060
};
6161
Ok(ScalarExpression::Constant(value))
6262
}
@@ -473,7 +473,12 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
473473
match arg_expr {
474474
FunctionArgExpr::Expr(expr) => args.push(self.bind_expr(expr)?),
475475
FunctionArgExpr::Wildcard => args.push(Self::wildcard_expr()),
476-
_ => todo!(),
476+
expr => {
477+
return Err(DatabaseError::UnsupportedStmt(format!(
478+
"function arg: {:#?}",
479+
expr
480+
)))
481+
}
477482
}
478483
}
479484
let function_name = func.name.to_string().to_lowercase();

src/binder/mod.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -350,24 +350,37 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
350350
names,
351351
if_exists,
352352
..
353-
} => match object_type {
354-
// todo handle all names
355-
ObjectType::Table => self.bind_drop_table(&names[0], if_exists)?,
356-
// todo handle all names
357-
ObjectType::View => self.bind_drop_view(&names[0], if_exists)?,
358-
_ => todo!(),
359-
},
353+
} => {
354+
if names.len() > 1 {
355+
return Err(DatabaseError::UnsupportedStmt(
356+
"only Drop a single `Table` or `View` is allowed".to_string(),
357+
));
358+
}
359+
match object_type {
360+
ObjectType::Table => self.bind_drop_table(&names[0], if_exists)?,
361+
ObjectType::View => self.bind_drop_view(&names[0], if_exists)?,
362+
_ => {
363+
return Err(DatabaseError::UnsupportedStmt(
364+
"only `Table` and `View` are allowed to be Dropped".to_string(),
365+
))
366+
}
367+
}
368+
}
360369
Statement::Insert {
361370
table_name,
362371
columns,
363372
source,
364373
overwrite,
365374
..
366375
} => {
376+
// TODO: support body on Insert
367377
if let SetExpr::Values(values) = source.body.as_ref() {
368378
self.bind_insert(table_name, columns, &values.rows, *overwrite, false)?
369379
} else {
370-
todo!()
380+
return Err(DatabaseError::UnsupportedStmt(format!(
381+
"insert body: {:#?}",
382+
source.body
383+
)));
371384
}
372385
}
373386
Statement::Update {
@@ -442,7 +455,10 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
442455
left,
443456
right,
444457
} => self.bind_set_operation(op, set_quantifier, left, right),
445-
_ => todo!(),
458+
expr => Err(DatabaseError::UnsupportedStmt(format!(
459+
"set expression: {:?}",
460+
expr
461+
))),
446462
}
447463
}
448464

src/binder/select.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
5555
left,
5656
right,
5757
} => self.bind_set_operation(op, set_quantifier, left, right),
58-
_ => unimplemented!(),
58+
expr => {
59+
return Err(DatabaseError::UnsupportedStmt(format!(
60+
"query body: {:?}",
61+
expr
62+
)))
63+
}
5964
}?;
6065

6166
let limit = &query.limit;
@@ -136,16 +141,7 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
136141
plan = self.bind_project(plan, select_list)?;
137142
}
138143

139-
if let Some(SelectInto {
140-
name,
141-
unlogged,
142-
temporary,
143-
..
144-
}) = &select.into
145-
{
146-
if *unlogged || *temporary {
147-
todo!()
148-
}
144+
if let Some(SelectInto { name, .. }) = &select.into {
149145
plan = LogicalPlan::new(
150146
Operator::Insert(InsertOperator {
151147
table_name: Arc::new(lower_case_name(name)?),
@@ -234,18 +230,10 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
234230
distinct_exprs,
235231
))
236232
}
237-
(SetOperator::Intersect, true) => {
238-
todo!()
239-
}
240-
(SetOperator::Intersect, false) => {
241-
todo!()
242-
}
243-
(SetOperator::Except, true) => {
244-
todo!()
245-
}
246-
(SetOperator::Except, false) => {
247-
todo!()
248-
}
233+
(set_operator, _) => Err(DatabaseError::UnsupportedStmt(format!(
234+
"set operator: {:?}",
235+
set_operator
236+
))),
249237
}
250238
}
251239

@@ -287,7 +275,9 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
287275
}) = alias
288276
{
289277
if tables.len() > 1 {
290-
todo!("Implement virtual tables for multiple table aliases");
278+
return Err(DatabaseError::UnsupportedStmt(
279+
"Implement virtual tables for multiple table aliases".to_string(),
280+
));
291281
}
292282
let table_alias = Arc::new(name.value.to_lowercase());
293283

src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ impl<S: Storage> Database<S> {
328328
}
329329
}
330330

331-
pub trait ResultIter: Iterator<Item = Result<Tuple, DatabaseError>> + Sized {
331+
pub trait ResultIter: Iterator<Item = Result<Tuple, DatabaseError>> {
332332
fn schema(&self) -> &SchemaRef;
333333

334334
fn done(self) -> Result<(), DatabaseError>;

src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ pub enum DatabaseError {
7373
InvalidTable(String),
7474
#[error("invalid type")]
7575
InvalidType,
76+
#[error("invalid value: {0}")]
77+
InvalidValue(String),
7678
#[error("io: {0}")]
7779
IO(
7880
#[source]

src/planner/operator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ impl fmt::Display for Operator {
287287
Operator::DropView(op) => write!(f, "{}", op),
288288
Operator::Truncate(op) => write!(f, "{}", op),
289289
Operator::CopyFromFile(op) => write!(f, "{}", op),
290-
Operator::CopyToFile(_) => todo!(),
290+
Operator::CopyToFile(op) => write!(f, "{}", op),
291291
Operator::Union(op) => write!(f, "{}", op),
292292
}
293293
}

0 commit comments

Comments
 (0)