Skip to content

Commit b8b2440

Browse files
authored
refactor(query): refactor physical plan organization in pipeline builder for better readability (#17866)
refactor(query): reorganize physical plans in pipeline builder
1 parent 272e18a commit b8b2440

File tree

2 files changed

+155
-109
lines changed

2 files changed

+155
-109
lines changed

src/query/service/src/pipelines/builders/builder_join.rs

Lines changed: 83 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -35,82 +35,42 @@ use crate::pipelines::PipelineBuilder;
3535
use crate::sessions::QueryContext;
3636

3737
impl PipelineBuilder {
38-
pub(crate) fn build_range_join(&mut self, range_join: &RangeJoin) -> Result<()> {
39-
let state = Arc::new(RangeJoinState::new(self.ctx.clone(), range_join));
40-
self.expand_right_side_pipeline(range_join, state.clone())?;
41-
self.build_left_side(range_join, state)?;
42-
Ok(())
43-
}
44-
45-
fn build_left_side(
46-
&mut self,
47-
range_join: &RangeJoin,
48-
state: Arc<RangeJoinState>,
49-
) -> Result<()> {
50-
self.build_pipeline(&range_join.left)?;
51-
let max_threads = self.settings.get_max_threads()? as usize;
52-
self.main_pipeline.try_resize(max_threads)?;
53-
self.main_pipeline.add_transform(|input, output| {
54-
Ok(ProcessorPtr::create(TransformRangeJoinLeft::create(
55-
input,
56-
output,
57-
state.clone(),
58-
)))
59-
})?;
60-
Ok(())
61-
}
62-
63-
fn expand_right_side_pipeline(
64-
&mut self,
65-
range_join: &RangeJoin,
66-
state: Arc<RangeJoinState>,
67-
) -> Result<()> {
68-
let right_side_context = QueryContext::create_from(self.ctx.as_ref());
69-
let mut right_side_builder = PipelineBuilder::create(
38+
// Create a new pipeline builder with the same context as the current builder
39+
fn create_sub_pipeline_builder(&self) -> PipelineBuilder {
40+
let sub_context = QueryContext::create_from(self.ctx.as_ref());
41+
let mut sub_builder = PipelineBuilder::create(
7042
self.func_ctx.clone(),
7143
self.settings.clone(),
72-
right_side_context,
44+
sub_context,
7345
self.main_pipeline.get_scopes(),
7446
);
75-
right_side_builder.hash_join_states = self.hash_join_states.clone();
76-
77-
let mut right_res = right_side_builder.finalize(&range_join.right)?;
78-
right_res.main_pipeline.add_sink(|input| {
79-
Ok(ProcessorPtr::create(
80-
Sinker::<TransformRangeJoinRight>::create(
81-
input,
82-
TransformRangeJoinRight::create(state.clone()),
83-
),
84-
))
85-
})?;
86-
self.pipelines.push(right_res.main_pipeline.finalize());
87-
self.pipelines.extend(right_res.sources_pipelines);
88-
Ok(())
47+
sub_builder.hash_join_states = self.hash_join_states.clone();
48+
sub_builder
8949
}
9050

91-
pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> {
92-
// for merge into target table as build side.
93-
let (enable_merge_into_optimization, merge_into_is_distributed) =
94-
self.merge_into_get_optimization_flag(join);
51+
pub(crate) fn build_hash_join(&mut self, join: &HashJoin) -> Result<()> {
52+
// Get optimization flags for merge-into operations
53+
let (enable_optimization, is_distributed) = self.merge_into_get_optimization_flag(join);
9554

96-
let state = self.build_join_state(
97-
join,
98-
merge_into_is_distributed,
99-
enable_merge_into_optimization,
100-
)?;
55+
// Create the join state with optimization flags
56+
let state = self.build_hash_join_state(join, is_distributed, enable_optimization)?;
10157
if let Some((build_cache_index, _)) = join.build_side_cache_info {
10258
self.hash_join_states
10359
.insert(build_cache_index, state.clone());
10460
}
105-
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
106-
self.build_join_probe(join, state)?;
10761

108-
// In the case of spilling, we need to share state among multiple threads. Quickly fetch all data from this round to quickly start the next round.
62+
// Build both phases of the Hash Join
63+
self.build_hash_join_build_side(&join.build, join, state.clone())?;
64+
self.build_hash_join_probe_side(join, state)?;
65+
66+
// In the case of spilling, we need to share state among multiple threads
67+
// Quickly fetch all data from this round to quickly start the next round
10968
self.main_pipeline
11069
.resize(self.main_pipeline.output_len(), true)
11170
}
11271

113-
fn build_join_state(
72+
// Create the Hash Join state
73+
fn build_hash_join_state(
11474
&mut self,
11575
join: &HashJoin,
11676
merge_into_is_distributed: bool,
@@ -128,20 +88,14 @@ impl PipelineBuilder {
12888
)
12989
}
13090

131-
fn expand_build_side_pipeline(
91+
// Build the build-side pipeline for Hash Join
92+
fn build_hash_join_build_side(
13293
&mut self,
13394
build: &PhysicalPlan,
13495
hash_join_plan: &HashJoin,
13596
join_state: Arc<HashJoinState>,
13697
) -> Result<()> {
137-
let build_side_context = QueryContext::create_from(self.ctx.as_ref());
138-
let mut build_side_builder = PipelineBuilder::create(
139-
self.func_ctx.clone(),
140-
self.settings.clone(),
141-
build_side_context,
142-
self.main_pipeline.get_scopes(),
143-
);
144-
build_side_builder.hash_join_states = self.hash_join_states.clone();
98+
let build_side_builder = self.create_sub_pipeline_builder();
14599
let mut build_res = build_side_builder.finalize(build)?;
146100

147101
assert!(build_res.main_pipeline.is_pulling_pipeline()?);
@@ -162,7 +116,7 @@ impl PipelineBuilder {
162116
build_state.clone(),
163117
)?))
164118
};
165-
// for distributed merge into when source as build side.
119+
// For distributed merge-into when source as build side
166120
if hash_join_plan.need_hold_hash_table {
167121
self.join_state = Some(build_state.clone())
168122
}
@@ -173,7 +127,12 @@ impl PipelineBuilder {
173127
Ok(())
174128
}
175129

176-
fn build_join_probe(&mut self, join: &HashJoin, state: Arc<HashJoinState>) -> Result<()> {
130+
// Build the probe-side pipeline for Hash Join
131+
fn build_hash_join_probe_side(
132+
&mut self,
133+
join: &HashJoin,
134+
state: Arc<HashJoinState>,
135+
) -> Result<()> {
177136
self.build_pipeline(&join.probe)?;
178137

179138
let max_block_size = self.settings.get_max_block_size()? as usize;
@@ -203,16 +162,66 @@ impl PipelineBuilder {
203162
)?))
204163
})?;
205164

165+
// For merge-into operations that need to hold the hash table
206166
if join.need_hold_hash_table {
207-
let mut projected_probe_fields = vec![];
167+
// Extract projected fields from probe schema
168+
let mut projected_fields = vec![];
208169
for (i, field) in probe_state.probe_schema.fields().iter().enumerate() {
209170
if probe_state.probe_projections.contains(&i) {
210-
projected_probe_fields.push(field.clone());
171+
projected_fields.push(field.clone());
211172
}
212173
}
213-
self.merge_into_probe_data_fields = Some(projected_probe_fields);
174+
self.merge_into_probe_data_fields = Some(projected_fields);
214175
}
215176

216177
Ok(())
217178
}
179+
180+
pub(crate) fn build_range_join(&mut self, range_join: &RangeJoin) -> Result<()> {
181+
let state = Arc::new(RangeJoinState::new(self.ctx.clone(), range_join));
182+
self.build_range_join_right_side(range_join, state.clone())?;
183+
self.build_range_join_left_side(range_join, state)?;
184+
Ok(())
185+
}
186+
187+
// Build the left-side pipeline for Range Join
188+
fn build_range_join_left_side(
189+
&mut self,
190+
range_join: &RangeJoin,
191+
state: Arc<RangeJoinState>,
192+
) -> Result<()> {
193+
self.build_pipeline(&range_join.left)?;
194+
let max_threads = self.settings.get_max_threads()? as usize;
195+
self.main_pipeline.try_resize(max_threads)?;
196+
self.main_pipeline.add_transform(|input, output| {
197+
Ok(ProcessorPtr::create(TransformRangeJoinLeft::create(
198+
input,
199+
output,
200+
state.clone(),
201+
)))
202+
})?;
203+
Ok(())
204+
}
205+
206+
// Build the right-side pipeline for Range Join
207+
fn build_range_join_right_side(
208+
&mut self,
209+
range_join: &RangeJoin,
210+
state: Arc<RangeJoinState>,
211+
) -> Result<()> {
212+
let right_side_builder = self.create_sub_pipeline_builder();
213+
214+
let mut right_res = right_side_builder.finalize(&range_join.right)?;
215+
right_res.main_pipeline.add_sink(|input| {
216+
Ok(ProcessorPtr::create(
217+
Sinker::<TransformRangeJoinRight>::create(
218+
input,
219+
TransformRangeJoinRight::create(state.clone()),
220+
),
221+
))
222+
})?;
223+
self.pipelines.push(right_res.main_pipeline.finalize());
224+
self.pipelines.extend(right_res.sources_pipelines);
225+
Ok(())
226+
}
218227
}

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -163,50 +163,79 @@ impl PipelineBuilder {
163163
self.is_exchange_neighbor |= self.is_exchange_neighbor(plan);
164164

165165
match plan {
166+
// ==============================
167+
// 1. Data Source Plans
168+
// ==============================
169+
// Basic table scans - retrieve data from tables
166170
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
167171
PhysicalPlan::ConstantTableScan(scan) => self.build_constant_table_scan(scan),
172+
PhysicalPlan::CacheScan(cache_scan) => self.build_cache_scan(cache_scan),
173+
PhysicalPlan::ExpressionScan(expression_scan) => {
174+
self.build_expression_scan(expression_scan)
175+
}
176+
PhysicalPlan::RecursiveCteScan(scan) => self.build_recursive_cte_scan(scan),
177+
178+
// Special source operations
179+
PhysicalPlan::MutationSource(mutation_source) => {
180+
self.build_mutation_source(mutation_source)
181+
}
182+
183+
// ==============================
184+
// 2. Relational Operators
185+
// ==============================
186+
// Filtering and projection
168187
PhysicalPlan::Filter(filter) => self.build_filter(filter),
169188
PhysicalPlan::EvalScalar(eval_scalar) => self.build_eval_scalar(eval_scalar),
189+
PhysicalPlan::ProjectSet(project_set) => self.build_project_set(project_set),
190+
191+
// Sorting and limiting
192+
PhysicalPlan::Sort(sort) => self.build_sort(sort),
193+
PhysicalPlan::Limit(limit) => self.build_limit(limit),
194+
PhysicalPlan::RowFetch(row_fetch) => self.build_row_fetch(row_fetch),
195+
196+
// Join operations
197+
PhysicalPlan::HashJoin(join) => self.build_hash_join(join),
198+
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
199+
200+
// Aggregation operations
170201
PhysicalPlan::AggregateExpand(aggregate) => self.build_aggregate_expand(aggregate),
171202
PhysicalPlan::AggregatePartial(aggregate) => self.build_aggregate_partial(aggregate),
172203
PhysicalPlan::AggregateFinal(aggregate) => self.build_aggregate_final(aggregate),
204+
205+
// Window functions
173206
PhysicalPlan::Window(window) => self.build_window(window),
174207
PhysicalPlan::WindowPartition(window_partition) => {
175208
self.build_window_partition(window_partition)
176209
}
177-
PhysicalPlan::Sort(sort) => self.build_sort(sort),
178-
PhysicalPlan::Limit(limit) => self.build_limit(limit),
179-
PhysicalPlan::RowFetch(row_fetch) => self.build_row_fetch(row_fetch),
180-
PhysicalPlan::HashJoin(join) => self.build_join(join),
210+
211+
PhysicalPlan::UnionAll(union_all) => self.build_union_all(union_all),
212+
213+
// ==============================
214+
// 3. Data Distribution
215+
// ==============================
181216
PhysicalPlan::ExchangeSink(sink) => self.build_exchange_sink(sink),
182217
PhysicalPlan::ExchangeSource(source) => self.build_exchange_source(source),
183-
PhysicalPlan::UnionAll(union_all) => self.build_union_all(union_all),
184218
PhysicalPlan::DistributedInsertSelect(insert_select) => {
185219
self.build_distributed_insert_select(insert_select)
186220
}
187-
PhysicalPlan::ProjectSet(project_set) => self.build_project_set(project_set),
188-
PhysicalPlan::Udf(udf) => self.build_udf(udf),
189-
PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal(
190-
"Invalid physical plan with PhysicalPlan::Exchange",
191-
)),
192-
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
193-
PhysicalPlan::CacheScan(cache_scan) => self.build_cache_scan(cache_scan),
194-
PhysicalPlan::ExpressionScan(expression_scan) => {
195-
self.build_expression_scan(expression_scan)
196-
}
221+
PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle),
222+
PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate),
197223

198-
// Copy into.
224+
// ==============================
225+
// 4. Data Modification Operations
226+
// ==============================
227+
// Copy operations
199228
PhysicalPlan::CopyIntoTable(copy) => self.build_copy_into_table(copy),
200229
PhysicalPlan::CopyIntoLocation(copy) => self.build_copy_into_location(copy),
201230

202-
// Replace.
231+
// Replace operations
203232
PhysicalPlan::ReplaceAsyncSourcer(async_sourcer) => {
204233
self.build_async_sourcer(async_sourcer)
205234
}
206235
PhysicalPlan::ReplaceDeduplicate(deduplicate) => self.build_deduplicate(deduplicate),
207236
PhysicalPlan::ReplaceInto(replace) => self.build_replace_into(replace),
208237

209-
// Mutation.
238+
// Mutation operations (DELETE/UPDATE)
210239
PhysicalPlan::Mutation(mutation) => self.build_mutation(mutation),
211240
PhysicalPlan::MutationSplit(mutation_split) => {
212241
self.build_mutation_split(mutation_split)
@@ -220,19 +249,14 @@ impl PipelineBuilder {
220249
PhysicalPlan::AddStreamColumn(add_stream_column) => {
221250
self.build_add_stream_column(add_stream_column)
222251
}
252+
PhysicalPlan::ColumnMutation(column_mutation) => {
253+
self.build_column_mutation(column_mutation)
254+
}
223255

224-
// Commit.
256+
// Commit operations
225257
PhysicalPlan::CommitSink(plan) => self.build_commit_sink(plan),
226258

227-
// Compact.
228-
PhysicalPlan::CompactSource(compact) => self.build_compact_source(compact),
229-
230-
// Recluster.
231-
PhysicalPlan::Recluster(recluster) => self.build_recluster(recluster),
232-
PhysicalPlan::HilbertPartition(partition) => self.build_hilbert_partition(partition),
233-
234-
PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate),
235-
PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle),
259+
// MERGE INTO chunk processing operations
236260
PhysicalPlan::ChunkFilter(chunk_filter) => self.build_chunk_filter(chunk_filter),
237261
PhysicalPlan::ChunkEvalScalar(chunk_project) => {
238262
self.build_chunk_eval_scalar(chunk_project)
@@ -250,14 +274,27 @@ impl PipelineBuilder {
250274
PhysicalPlan::ChunkCommitInsert(chunk_commit_insert) => {
251275
self.build_chunk_commit_insert(chunk_commit_insert)
252276
}
277+
278+
// ==============================
279+
// 5. Data Maintenance Operations
280+
// ==============================
281+
PhysicalPlan::CompactSource(compact) => self.build_compact_source(compact),
282+
PhysicalPlan::Recluster(recluster) => self.build_recluster(recluster),
283+
PhysicalPlan::HilbertPartition(partition) => self.build_hilbert_partition(partition),
284+
285+
// ==============================
286+
// 6. Special Processing Operations
287+
// ==============================
288+
// User-defined functions and async operations
289+
PhysicalPlan::Udf(udf) => self.build_udf(udf),
253290
PhysicalPlan::AsyncFunction(async_func) => self.build_async_function(async_func),
254-
PhysicalPlan::RecursiveCteScan(scan) => self.build_recursive_cte_scan(scan),
255-
PhysicalPlan::MutationSource(mutation_source) => {
256-
self.build_mutation_source(mutation_source)
257-
}
258-
PhysicalPlan::ColumnMutation(column_mutation) => {
259-
self.build_column_mutation(column_mutation)
260-
}
291+
292+
// ==============================
293+
// 7. Invalid Plans
294+
// ==============================
295+
PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal(
296+
"Invalid physical plan with PhysicalPlan::Exchange",
297+
)),
261298
}?;
262299

263300
self.is_exchange_neighbor = is_exchange_neighbor;

0 commit comments

Comments
 (0)