Skip to content

Commit 78f3fda

Browse files
committed
refactor(cubesql): Extract CubeScanWrappedSqlNode from CubeScanWrapperNode
1 parent d5df5c0 commit 78f3fda

File tree

6 files changed

+191
-373
lines changed

6 files changed

+191
-373
lines changed

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::{
3030

3131
use crate::{
3232
compile::{
33-
engine::df::wrapper::{CubeScanWrapperNode, SqlQuery},
33+
engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery},
3434
rewrite::WrappedSelectType,
3535
test::find_cube_scans_deep_search,
3636
},
@@ -386,35 +386,32 @@ impl ExtensionPlanner for CubeScanExtensionPlanner {
386386
config_obj: self.config_obj.clone(),
387387
}))
388388
} else if let Some(wrapper_node) = node.as_any().downcast_ref::<CubeScanWrapperNode>() {
389+
return Err(DataFusionError::Internal(format!(
390+
"CubeScanWrapperNode is not executable, SQL should be generated first with QueryEngine::evaluate_wrapped_sql: {:?}",
391+
wrapper_node
392+
)));
393+
} else if let Some(wrapped_sql_node) =
394+
node.as_any().downcast_ref::<CubeScanWrappedSqlNode>()
395+
{
389396
// TODO
390397
// assert_eq!(logical_inputs.len(), 0, "Inconsistent number of inputs");
391398
// assert_eq!(physical_inputs.len(), 0, "Inconsistent number of inputs");
392399
let scan_node =
393-
find_cube_scans_deep_search(wrapper_node.wrapped_plan.clone(), false)
400+
find_cube_scans_deep_search(wrapped_sql_node.wrapped_plan.clone(), false)
394401
.into_iter()
395402
.next()
396403
.ok_or(DataFusionError::Internal(format!(
397404
"No cube scans found in wrapper node: {:?}",
398-
wrapper_node
405+
wrapped_sql_node
399406
)))?;
400407

401-
let schema = SchemaRef::new(wrapper_node.schema().as_ref().into());
408+
let schema = SchemaRef::new(wrapped_sql_node.schema().as_ref().into());
402409
Some(Arc::new(CubeScanExecutionPlan {
403410
schema,
404-
member_fields: wrapper_node.member_fields.as_ref().ok_or_else(|| {
405-
DataFusionError::Internal(format!(
406-
"Member fields are not set for wrapper node. Optimization wasn't performed: {:?}",
407-
wrapper_node
408-
))
409-
})?.clone(),
411+
member_fields: wrapped_sql_node.member_fields.clone(),
410412
transport: self.transport.clone(),
411-
request: wrapper_node.request.clone().unwrap_or(scan_node.request.clone()),
412-
wrapped_sql: Some(wrapper_node.wrapped_sql.as_ref().ok_or_else(|| {
413-
DataFusionError::Internal(format!(
414-
"Wrapped SQL is not set for wrapper node. Optimization wasn't performed: {:?}",
415-
wrapper_node
416-
))
417-
})?.clone()),
413+
request: wrapped_sql_node.request.clone(),
414+
wrapped_sql: Some(wrapped_sql_node.wrapped_sql.clone()),
418415
auth_context: scan_node.auth_context.clone(),
419416
options: scan_node.options.clone(),
420417
meta: self.meta.clone(),

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 71 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,75 @@ impl SqlQuery {
201201
}
202202
}
203203

204+
#[derive(Clone, Debug)]
205+
pub struct CubeScanWrappedSqlNode {
206+
// TODO maybe replace wrapped plan with schema + scan_node
207+
pub wrapped_plan: Arc<LogicalPlan>,
208+
pub wrapped_sql: SqlQuery,
209+
pub request: TransportLoadRequestQuery,
210+
pub member_fields: Vec<MemberField>,
211+
}
212+
213+
impl CubeScanWrappedSqlNode {
214+
pub fn new(
215+
wrapped_plan: Arc<LogicalPlan>,
216+
sql: SqlQuery,
217+
request: TransportLoadRequestQuery,
218+
member_fields: Vec<MemberField>,
219+
) -> Self {
220+
Self {
221+
wrapped_plan,
222+
wrapped_sql: sql,
223+
request,
224+
member_fields,
225+
}
226+
}
227+
}
228+
229+
impl UserDefinedLogicalNode for CubeScanWrappedSqlNode {
230+
fn as_any(&self) -> &dyn Any {
231+
self
232+
}
233+
234+
fn inputs(&self) -> Vec<&LogicalPlan> {
235+
vec![]
236+
}
237+
238+
fn schema(&self) -> &DFSchemaRef {
239+
self.wrapped_plan.schema()
240+
}
241+
242+
fn expressions(&self) -> Vec<Expr> {
243+
vec![]
244+
}
245+
246+
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
247+
// TODO figure out nice plan for wrapped plan
248+
write!(f, "CubeScanWrappedSql")
249+
}
250+
251+
fn from_template(
252+
&self,
253+
exprs: &[datafusion::logical_plan::Expr],
254+
inputs: &[datafusion::logical_plan::LogicalPlan],
255+
) -> std::sync::Arc<dyn UserDefinedLogicalNode + Send + Sync> {
256+
assert_eq!(inputs.len(), 0, "input size inconsistent");
257+
assert_eq!(exprs.len(), 0, "expression size inconsistent");
258+
259+
Arc::new(CubeScanWrappedSqlNode {
260+
wrapped_plan: self.wrapped_plan.clone(),
261+
wrapped_sql: self.wrapped_sql.clone(),
262+
request: self.request.clone(),
263+
member_fields: self.member_fields.clone(),
264+
})
265+
}
266+
}
267+
204268
#[derive(Debug, Clone)]
205269
pub struct CubeScanWrapperNode {
206270
pub wrapped_plan: Arc<LogicalPlan>,
207271
pub meta: Arc<MetaContext>,
208272
pub auth_context: AuthContextRef,
209-
pub wrapped_sql: Option<SqlQuery>,
210-
pub request: Option<TransportLoadRequestQuery>,
211-
pub member_fields: Option<Vec<MemberField>>,
212273
pub span_id: Option<Arc<SpanId>>,
213274
pub config_obj: Arc<dyn ConfigObj>,
214275
}
@@ -225,31 +286,10 @@ impl CubeScanWrapperNode {
225286
wrapped_plan,
226287
meta,
227288
auth_context,
228-
wrapped_sql: None,
229-
request: None,
230-
member_fields: None,
231289
span_id,
232290
config_obj,
233291
}
234292
}
235-
236-
pub fn with_sql_and_request(
237-
&self,
238-
sql: SqlQuery,
239-
request: TransportLoadRequestQuery,
240-
member_fields: Vec<MemberField>,
241-
) -> Self {
242-
Self {
243-
wrapped_plan: self.wrapped_plan.clone(),
244-
meta: self.meta.clone(),
245-
auth_context: self.auth_context.clone(),
246-
wrapped_sql: Some(sql),
247-
request: Some(request),
248-
member_fields: Some(member_fields),
249-
span_id: self.span_id.clone(),
250-
config_obj: self.config_obj.clone(),
251-
}
252-
}
253293
}
254294

255295
fn expr_name(e: &Expr, schema: &Arc<DFSchema>) -> Result<String> {
@@ -317,7 +357,7 @@ impl CubeScanWrapperNode {
317357
&self,
318358
transport: Arc<dyn TransportService>,
319359
load_request_meta: Arc<LoadRequestMeta>,
320-
) -> result::Result<Self, CubeError> {
360+
) -> result::Result<CubeScanWrappedSqlNode, CubeError> {
321361
let schema = self.schema();
322362
let wrapped_plan = self.wrapped_plan.clone();
323363
let (sql, request, member_fields) = Self::generate_sql_for_node(
@@ -361,7 +401,12 @@ impl CubeScanWrapperNode {
361401
sql.finalize_query(sql_templates).map_err(|e| CubeError::internal(e.to_string()))?;
362402
Ok((sql, request, member_fields))
363403
})?;
364-
Ok(self.with_sql_and_request(sql, request, member_fields))
404+
Ok(CubeScanWrappedSqlNode::new(
405+
self.wrapped_plan.clone(),
406+
sql,
407+
request,
408+
member_fields,
409+
))
365410
}
366411

367412
pub fn set_max_limit_for_node(self, node: Arc<LogicalPlan>) -> Arc<LogicalPlan> {
@@ -2226,9 +2271,6 @@ impl UserDefinedLogicalNode for CubeScanWrapperNode {
22262271
wrapped_plan: self.wrapped_plan.clone(),
22272272
meta: self.meta.clone(),
22282273
auth_context: self.auth_context.clone(),
2229-
wrapped_sql: self.wrapped_sql.clone(),
2230-
request: self.request.clone(),
2231-
member_fields: self.member_fields.clone(),
22322274
span_id: self.span_id.clone(),
22332275
config_obj: self.config_obj.clone(),
22342276
})

0 commit comments

Comments
 (0)