Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions rust/cubesql/cubesql/src/compile/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@
MetaOk(StatusFlags, CommandCompletion),
MetaTabular(StatusFlags, Box<dataframe::DataFrame>),
// Query will be executed via Data Fusion
DataFusionSelect(StatusFlags, LogicalPlan, DFSessionContext),
DataFusionSelect(LogicalPlan, DFSessionContext),
// Query will be executed via DataFusion and saved to session
CreateTempTable(
StatusFlags,
LogicalPlan,
DFSessionContext,
String,
Arc<TempTableManager>,
),
CreateTempTable(LogicalPlan, DFSessionContext, String, Arc<TempTableManager>),
}

impl fmt::Debug for QueryPlan {
Expand All @@ -70,16 +64,13 @@
flags
))
},
QueryPlan::DataFusionSelect(flags, _, _) => {
f.write_str(&format!(
"DataFusionSelect(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden)",
flags
))
QueryPlan::DataFusionSelect(_, _) => {
f.write_str(&"DataFusionSelect(LogicalPlan: hidden, DFSessionContext: hidden)")

Check warning on line 68 in rust/cubesql/cubesql/src/compile/plan.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/plan.rs#L68

Added line #L68 was not covered by tests
},
QueryPlan::CreateTempTable(flags, _, _, name, _) => {
QueryPlan::CreateTempTable(_, _, name, _) => {

Check warning on line 70 in rust/cubesql/cubesql/src/compile/plan.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/plan.rs#L70

Added line #L70 was not covered by tests
f.write_str(&format!(
"CreateTempTable(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden, Name: {:?}, SessionState: hidden",
flags, name
"CreateTempTable(LogicalPlan: hidden, DFSessionContext: hidden, Name: {}, SessionState: hidden",
name

Check warning on line 73 in rust/cubesql/cubesql/src/compile/plan.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/plan.rs#L72-L73

Added lines #L72 - L73 were not covered by tests
))
},
}
Expand All @@ -89,8 +80,9 @@
impl QueryPlan {
pub fn as_logical_plan(&self) -> LogicalPlan {
match self {
QueryPlan::DataFusionSelect(_, plan, _)
| QueryPlan::CreateTempTable(_, plan, _, _, _) => plan.clone(),
QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => {
plan.clone()
}
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => {
panic!("This query doesnt have a plan, because it already has values for response")
}
Expand All @@ -99,8 +91,8 @@

pub async fn as_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
match self {
QueryPlan::DataFusionSelect(_, plan, ctx)
| QueryPlan::CreateTempTable(_, plan, ctx, _, _) => {
QueryPlan::DataFusionSelect(plan, ctx)
| QueryPlan::CreateTempTable(plan, ctx, _, _) => {

Check warning on line 95 in rust/cubesql/cubesql/src/compile/plan.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/plan.rs#L95

Added line #L95 was not covered by tests
DataFrame::new(ctx.state.clone(), plan)
.create_physical_plan()
.await
Expand All @@ -114,8 +106,7 @@

pub fn print(&self, pretty: bool) -> Result<String, CubeError> {
match self {
QueryPlan::DataFusionSelect(_, plan, _)
| QueryPlan::CreateTempTable(_, plan, _, _, _) => {
QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => {
if pretty {
Ok(plan.display_indent().to_string())
} else {
Expand All @@ -134,7 +125,7 @@
plan: &QueryPlan,
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>, CubeError> {
match plan {
QueryPlan::DataFusionSelect(_, plan, ctx) => {
QueryPlan::DataFusionSelect(plan, ctx) => {

Check warning on line 128 in rust/cubesql/cubesql/src/compile/plan.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/plan.rs#L128

Added line #L128 was not covered by tests
let df = DataFrame::new(ctx.state.clone(), &plan);
let safe_stream = async move {
std::panic::AssertUnwindSafe(df.execute_stream())
Expand Down
26 changes: 14 additions & 12 deletions rust/cubesql/cubesql/src/compile/query_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
analysis::LogicalPlanAnalysis,
converter::{LogicalPlanToLanguageContext, LogicalPlanToLanguageConverter},
},
CompilationError, CompilationResult, DatabaseProtocol, QueryPlan, Rewriter, StatusFlags,
CompilationError, CompilationResult, DatabaseProtocol, QueryPlan, Rewriter,
},
config::ConfigObj,
sql::{
Expand Down Expand Up @@ -46,8 +46,12 @@ use datafusion::{

#[async_trait::async_trait]
pub trait QueryEngine {
/// Custom type for AST statement type, It allows to use any parsers for SQL
type AstStatementType: std::fmt::Display + Send;

/// Additional metadata for results of plan method instead of extending query plan
type PlanMetadataType: std::fmt::Debug + Send;

fn compiler_cache_ref(&self) -> &Arc<dyn CompilerCache>;

fn transport_ref(&self) -> &Arc<dyn TransportService>;
Expand All @@ -70,7 +74,7 @@ pub trait QueryEngine {
&self,
cube_ctx: &CubeContext,
stmt: &Self::AstStatementType,
) -> Result<LogicalPlan, DataFusionError>;
) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError>;

fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType;

Expand All @@ -81,11 +85,11 @@ pub trait QueryEngine {
span_id: Option<Arc<SpanId>>,
meta: Arc<MetaContext>,
state: Arc<SessionState>,
) -> CompilationResult<QueryPlan> {
) -> CompilationResult<(QueryPlan, Self::PlanMetadataType)> {
let ctx = self.create_session_ctx(state.clone())?;
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;

let plan = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| {
let (plan, metadata) = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| {
let message = format!("Initial planning error: {}", err,);
let meta = Some(HashMap::from([
("query".to_string(), stmt.to_string()),
Expand Down Expand Up @@ -249,11 +253,7 @@ pub trait QueryEngine {
qtrace.set_best_plan_and_cube_scans(&rewrite_plan);
}

Ok(QueryPlan::DataFusionSelect(
StatusFlags::empty(),
rewrite_plan,
ctx,
))
Ok((QueryPlan::DataFusionSelect(rewrite_plan, ctx), metadata))
}

fn evaluate_wrapped_sql(
Expand Down Expand Up @@ -308,6 +308,8 @@ impl SqlQueryEngine {
impl QueryEngine for SqlQueryEngine {
type AstStatementType = sqlparser::ast::Statement;

type PlanMetadataType = ();

fn create_cube_ctx(
&self,
state: Arc<SessionState>,
Expand Down Expand Up @@ -470,12 +472,12 @@ impl QueryEngine for SqlQueryEngine {
&self,
cube_ctx: &CubeContext,
stmt: &Self::AstStatementType,
) -> Result<LogicalPlan, DataFusionError> {
) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError> {
let df_query_planner = SqlToRel::new_with_options(cube_ctx, true);
let plan =
df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())));
df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))?;

plan
Ok((plan, ()))
}

fn compiler_cache_ref(&self) -> &Arc<dyn CompilerCache> {
Expand Down
16 changes: 9 additions & 7 deletions rust/cubesql/cubesql/src/compile/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@
)])],
)),
)),
QueryPlan::DataFusionSelect(flags, plan, context)
| QueryPlan::CreateTempTable(flags, plan, context, _, _) => {
QueryPlan::DataFusionSelect(plan, context)
| QueryPlan::CreateTempTable(plan, context, _, _) => {

Check warning on line 360 in rust/cubesql/cubesql/src/compile/router.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/router.rs#L360

Added line #L360 was not covered by tests
// EXPLAIN over CREATE TABLE AS shows the SELECT query plan
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
Expand Down Expand Up @@ -385,7 +385,7 @@
})
};

Ok(QueryPlan::DataFusionSelect(flags, explain_plan, context))
Ok(QueryPlan::DataFusionSelect(explain_plan, context))
}
}
})
Expand Down Expand Up @@ -596,7 +596,7 @@
span_id: Option<Arc<SpanId>>,
) -> Result<QueryPlan, CompilationError> {
let plan = self.select_to_plan(stmt, qtrace, span_id).await?;
let QueryPlan::DataFusionSelect(flags, plan, ctx) = plan else {
let QueryPlan::DataFusionSelect(plan, ctx) = plan else {
return Err(CompilationError::internal(
"unable to build DataFusion plan from Query".to_string(),
));
Expand All @@ -608,8 +608,8 @@
"table name contains no ident parts".to_string(),
));
};

Ok(QueryPlan::CreateTempTable(
flags,
plan,
ctx,
table_name.value.to_string(),
Expand Down Expand Up @@ -708,9 +708,11 @@
}

let sql_query_engine = SqlQueryEngine::new(self.session_manager.clone());
sql_query_engine
let (plan, _) = sql_query_engine
.plan(stmt, qtrace, span_id, self.meta.clone(), self.state.clone())
.await
.await?;

Ok(plan)
}
}

Expand Down
8 changes: 5 additions & 3 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,19 +933,21 @@
.await
.map_err(|e| CubeError::internal(format!("Error during planning: {}", e)))?;
match query {
QueryPlan::DataFusionSelect(flags, plan, ctx) => {
QueryPlan::DataFusionSelect(plan, ctx) => {
let df = DFDataFrame::new(ctx.state, &plan);
let batches = df.collect().await?;
let frame = batches_to_dataframe(&df.schema().into(), batches)?;

output.push(frame.print());
output_flags = flags;
}
QueryPlan::MetaTabular(flags, frame) => {
output.push(frame.print());
output_flags = flags;
}
QueryPlan::MetaOk(flags, _) | QueryPlan::CreateTempTable(flags, _, _, _, _) => {
QueryPlan::CreateTempTable(_, _, _, _) => {
// nothing to do
}

Check warning on line 949 in rust/cubesql/cubesql/src/compile/test/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/test/mod.rs#L947-L949

Added lines #L947 - L949 were not covered by tests
QueryPlan::MetaOk(flags, _) => {
output_flags = flags;
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/cubesql/cubesql/src/sql/postgres/extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl Portal {

return;
}
QueryPlan::DataFusionSelect(_, plan, ctx) => {
QueryPlan::DataFusionSelect(plan, ctx) => {
let df = DFDataFrame::new(ctx.state.clone(), &plan);
let safe_stream = async move {
std::panic::AssertUnwindSafe(df.execute_stream())
Expand All @@ -511,7 +511,7 @@ impl Portal {
Err(err) => return yield Err(CubeError::panic(err).into()),
}
}
QueryPlan::CreateTempTable(_, plan, ctx, name, temp_tables) => {
QueryPlan::CreateTempTable(plan, ctx, name, temp_tables) => {
let df = DFDataFrame::new(ctx.state.clone(), &plan);
let record_batch = df.collect();
let row_count = match record_batch.await {
Expand Down
4 changes: 2 additions & 2 deletions rust/cubesql/cubesql/src/sql/postgres/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl QueryPlanExt for QueryPlan {
required_format: protocol::Format,
) -> Result<Option<protocol::RowDescription>, ConnectionError> {
match &self {
QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _, _) => Ok(None),
QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _) => Ok(None),
QueryPlan::MetaTabular(_, frame) => {
let mut result = vec![];

Expand All @@ -86,7 +86,7 @@ impl QueryPlanExt for QueryPlan {

Ok(Some(protocol::RowDescription::new(result)))
}
QueryPlan::DataFusionSelect(_, logical_plan, _) => {
QueryPlan::DataFusionSelect(logical_plan, _) => {
let mut result = vec![];

for field in logical_plan.schema().fields() {
Expand Down
Loading