Skip to content

Commit c2e462e

Browse files
authored
chore(query): fix hash join hang (#17389)
* chore(query): fix hash join hang * chore(test): add sqllogictest
1 parent 1c57f68 commit c2e462e

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs

+7-8
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ impl TransformHashJoinBuild {
168168
}
169169

170170
fn collect(&mut self) -> Result<Event> {
171+
if self.input_port.has_data() {
172+
self.add_data_block(self.input_port.pull_data().unwrap()?);
173+
if self.need_spill() {
174+
return self.next_step(Step::Async(AsyncStep::Spill));
175+
}
176+
}
177+
171178
if self.input_port.is_finished() {
172179
if self.need_check_spill_happen() {
173180
self.next_step(Step::Async(AsyncStep::CheckSpillHappen))
@@ -176,14 +183,6 @@ impl TransformHashJoinBuild {
176183
} else {
177184
self.next_step(Step::Async(AsyncStep::WaitCollect))
178185
}
179-
} else if self.input_port.has_data() {
180-
self.add_data_block(self.input_port.pull_data().unwrap()?);
181-
if self.need_spill() {
182-
self.next_step(Step::Async(AsyncStep::Spill))
183-
} else {
184-
self.input_port.set_need_data();
185-
Ok(Event::NeedData)
186-
}
187186
} else {
188187
self.input_port.set_need_data();
189188
Ok(Event::NeedData)

tests/sqllogictests/suites/query/join/join.test

+3
Original file line numberDiff line numberDiff line change
@@ -282,3 +282,6 @@ select * from (select number from numbers(5)) t2 full outer join (select a, 'A'
282282

283283
statement ok
284284
drop table if exists t;
285+
286+
statement ok
287+
with v2 as (SELECT 'xx' || cast(number as string) AS invoice_nr FROM numbers(135) group by invoice_nr order by invoice_nr) select v2.invoice_nr from v2 where EXISTS (SELECT cast(number as string) AS invoice_nr FROM numbers(800) where v2.invoice_nr = cast(number as string)) ignore_result;

0 commit comments

Comments
 (0)