Skip to content

refactor(query): use trait to refactor physical plan #18268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8359dd8
refactor(query): use trait to refactor physical plan
zhang2014 Jun 28, 2025
e23c2d7
refactor(query): use trait to refactor physical plan
zhang2014 Jun 28, 2025
ed320f8
refactor(query): use trait to refactor physical plan
zhang2014 Jun 30, 2025
5b53eb0
refactor(query): use trait to refactor physical plan
zhang2014 Jun 30, 2025
b269460
refactor(query): use trait to refactor physical plan
zhang2014 Jun 30, 2025
adbf021
refactor(query): use trait to refactor physical plan
zhang2014 Jul 3, 2025
397bca7
refactor(query): use trait to refactor physical plan
zhang2014 Jul 3, 2025
8eb1139
refactor(query): use trait to refactor physical plan
zhang2014 Jul 3, 2025
d4cf0c7
refactor(query): refactor physical plan
zhang2014 Jul 3, 2025
d2dcd20
refactor(query): use trait to refactor physical plan
zhang2014 Jul 13, 2025
416ebfe
refactor(query): use trait to refactor physical plan
zhang2014 Jul 14, 2025
c5a0a94
refactor(query): use trait to refactor physical plan
zhang2014 Jul 14, 2025
c4f22fd
refactor(query): use trait to refactor physical plan
zhang2014 Jul 14, 2025
0dd0362
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
81e1b8a
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
373c80d
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Jul 15, 2025
e8a174f
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
eb867b1
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
66045a3
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
9b19b1d
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
58186fe
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
5e6bedf
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
5bd2815
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
9f44159
refactor(query): use trait to refactor physical plan
zhang2014 Jul 16, 2025
f0b1997
refactor(query): use trait to refactor physical plan
zhang2014 Jul 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use databend_common_exception::Result;
use databend_common_expression::infer_table_schema;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
use databend_common_sql::executor::PhysicalPlan;
use databend_storages_common_stage::CopyIntoLocationInfo;
use log::debug;
use log::info;
Expand All @@ -29,6 +27,9 @@ use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::physical_plans::CopyIntoLocation;
use crate::physical_plans::PhysicalPlan;
use crate::physical_plans::PhysicalPlanMeta;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -89,14 +90,14 @@ impl CopyIntoLocationInterpreter {
let query_result_schema = query_interpreter.get_result_schema();
let table_schema = infer_table_schema(&query_result_schema)?;

let mut physical_plan = PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
plan_id: 0,
input: Box::new(query_physical_plan),
let mut physical_plan: PhysicalPlan = Box::new(CopyIntoLocation {
input: query_physical_plan,
project_columns: query_interpreter.get_result_columns(),
input_data_schema: query_result_schema,
input_table_schema: table_schema,
info: info.clone(),
}));
meta: PhysicalPlanMeta::new("CopyIntoLocation"),
});

let mut next_plan_id = 0;
physical_plan.adjust_plan_id(&mut next_plan_id);
Expand Down
31 changes: 16 additions & 15 deletions src/query/service/src/interpreters/interpreter_copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ use databend_common_expression::SendableDataBlockStream;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::CopyIntoTable;
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_storage::StageFileInfo;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_stage::StageTable;
Expand All @@ -46,6 +41,12 @@ use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::physical_plans::CopyIntoTable;
use crate::physical_plans::CopyIntoTableSource;
use crate::physical_plans::Exchange;
use crate::physical_plans::PhysicalPlan;
use crate::physical_plans::PhysicalPlanMeta;
use crate::physical_plans::TableScan;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::PipelineBuilder;
use crate::schedulers::build_query_pipeline_without_render_result_set;
Expand Down Expand Up @@ -113,7 +114,7 @@ impl CopyIntoTableInterpreter {

let (query_interpreter, update_stream_meta) = self.build_query(&query).await?;
update_stream_meta_reqs = update_stream_meta;
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);
let query_physical_plan = query_interpreter.build_physical_plan().await?;

let result_columns = query_interpreter.get_result_columns();
(
Expand All @@ -133,21 +134,20 @@ impl CopyIntoTableInterpreter {
}

(
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
plan_id: 0,
CopyIntoTableSource::Stage(Box::new(TableScan {
scan_id: 0,
name_mapping,
stat_info: None,
table_index: None,
internal_column: None,
source: Box::new(data_source_plan),
}))),
meta: PhysicalPlanMeta::new("TableScan"),
})),
None,
)
};

let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
plan_id: 0,
let mut root: PhysicalPlan = Box::new(CopyIntoTable {
required_values_schema: plan.required_values_schema.clone(),
values_consts: plan.values_consts.clone(),
required_source_schema: plan.required_source_schema.clone(),
Expand All @@ -159,16 +159,17 @@ impl CopyIntoTableInterpreter {
source,
is_transform: plan.is_transform,
table_meta_timestamps,
}));
meta: PhysicalPlanMeta::new("CopyIntoTable"),
});

if plan.enable_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
root = Box::new(Exchange {
input: root,
kind: FragmentKind::Merge,
keys: Vec::new(),
allow_adjust_parallelism: true,
ignore_exchange: false,
meta: PhysicalPlanMeta::new("Exchange"),
});
}

Expand Down
65 changes: 23 additions & 42 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::executor::format_partial_tree;
use databend_common_sql::executor::MutationBuildInfo;
use databend_common_sql::plans::Mutation;
use databend_common_sql::BindContext;
use databend_common_sql::ColumnSet;
Expand All @@ -55,6 +53,10 @@ use crate::interpreters::interpreter::on_execution_finished;
use crate::interpreters::interpreter_mutation::build_mutation_info;
use crate::interpreters::interpreter_mutation::MutationInterpreter;
use crate::interpreters::Interpreter;
use crate::physical_plans::MutationBuildInfo;
use crate::physical_plans::PhysicalPlan;
use crate::physical_plans::PhysicalPlanBuilder;
use crate::physical_plans::PhysicalPlanDynExt;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
Expand All @@ -64,16 +66,13 @@ use crate::schedulers::build_query_pipeline;
use crate::schedulers::Fragmenter;
use crate::schedulers::QueryFragmentsActions;
use crate::sessions::QueryContext;
use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalPlanBuilder;
use crate::sql::optimizer::ir::SExpr;
use crate::sql::plans::Plan;

pub struct ExplainInterpreter {
ctx: Arc<QueryContext>,
config: ExplainConfig,
kind: ExplainKind,
partial: bool,
graphical: bool,
plan: Plan,
}
Expand Down Expand Up @@ -164,22 +163,11 @@ impl Interpreter for ExplainInterpreter {
_ => self.explain_plan(&self.plan)?,
},

ExplainKind::Join => match &self.plan {
Plan::Query {
s_expr,
metadata,
bind_context,
..
} => {
let ctx = self.ctx.clone();
let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx, true);
let plan = builder.build(s_expr, bind_context.column_set()).await?;
self.explain_join_order(&plan, metadata)?
}
_ => Err(ErrorCode::Unimplemented(
ExplainKind::Join => {
return Err(ErrorCode::Unimplemented(
"Unsupported EXPLAIN JOIN statement",
))?,
},
));
}

ExplainKind::AnalyzePlan | ExplainKind::Graphical => match &self.plan {
Plan::Query {
Expand Down Expand Up @@ -299,15 +287,13 @@ impl ExplainInterpreter {
plan: Plan,
kind: ExplainKind,
config: ExplainConfig,
partial: bool,
graphical: bool,
) -> Result<Self> {
Ok(ExplainInterpreter {
ctx,
plan,
kind,
config,
partial,
graphical,
})
}
Expand Down Expand Up @@ -360,25 +346,15 @@ impl ExplainInterpreter {
}
}

let metadata = metadata.read();
let result = plan
.format(metadata.clone(), Default::default())?
.format(&metadata, Default::default())?
.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

pub fn explain_join_order(
&self,
plan: &PhysicalPlan,
metadata: &MetadataRef,
) -> Result<Vec<DataBlock>> {
let result = plan.format_join(metadata)?.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

fn format_pipeline(build_res: &PipelineBuildResult) -> Vec<DataBlock> {
let mut blocks = Vec::with_capacity(1 + build_res.sources_pipelines.len());
// Format root pipeline
Expand Down Expand Up @@ -412,10 +388,13 @@ impl ExplainInterpreter {
.build(&s_expr, required)
.await?;

let root_fragment = Fragmenter::try_create(ctx.clone())?.build_fragment(&plan)?;
let fragments = Fragmenter::try_create(ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
root_fragment.get_actions(ctx, &mut fragments_actions)?;

for fragment in fragments {
fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
}

let display_string = fragments_actions.display_indent(&metadata).to_string();
let line_split_result = display_string.lines().collect::<Vec<_>>();
Expand Down Expand Up @@ -479,10 +458,9 @@ impl ExplainInterpreter {
if !pruned_partitions_stats.is_empty() {
plan.set_pruning_stats(&mut pruned_partitions_stats);
}
let result = if self.partial {
format_partial_tree(&plan, metadata, &query_profiles)?.format_pretty()?
} else {
plan.format(metadata.clone(), query_profiles.clone())?
let result = {
let metadata = metadata.read();
plan.format(&metadata, query_profiles.clone())?
.format_pretty()?
};
let line_split_result: Vec<&str> = result.lines().collect();
Expand Down Expand Up @@ -570,10 +548,13 @@ impl ExplainInterpreter {
mutation.metadata.clone(),
)?;
let plan = interpreter.build_physical_plan(&mutation, true).await?;
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;
let fragments = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone());
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;

for fragment in fragments {
fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;
}

let display_string = fragments_actions
.display_indent(&mutation.metadata)
Expand Down
8 changes: 1 addition & 7 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,34 +227,28 @@ impl InterpreterFactory {
kind.clone(),
config.clone(),
false,
false,
)?)),
Plan::ExplainAst { formatted_string } => Ok(Arc::new(ExplainInterpreter::try_create(
ctx,
plan.clone(),
ExplainKind::Ast(formatted_string.clone()),
ExplainConfig::default(),
false,
false,
)?)),
Plan::ExplainSyntax { formatted_sql } => Ok(Arc::new(ExplainInterpreter::try_create(
ctx,
plan.clone(),
ExplainKind::Syntax(formatted_sql.clone()),
ExplainConfig::default(),
false,
false,
)?)),
Plan::ExplainAnalyze {
graphical,
partial,
plan,
graphical, plan, ..
} => Ok(Arc::new(ExplainInterpreter::try_create(
ctx,
*plan.clone(),
ExplainKind::AnalyzePlan,
ExplainConfig::default(),
*partial,
*graphical,
)?)),
Plan::ExplainPerf { sql } => Ok(Arc::new(ExplainPerfInterpreter::try_create(
Expand Down
Loading
Loading