Skip to content

Commit 78e8aba

Browse files
authored
fix table function dedup (#883)
1 parent 8b185a3 commit 78e8aba

File tree

4 files changed

+24
-2
lines changed

4 files changed

+24
-2
lines changed

src/Processors/QueryPlan/Streaming/DedupTransformStep.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ DedupTransformStep::DedupTransformStep(
3535

3636
void DedupTransformStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* settings */)
3737
{
38+
/// Use one dedup transform for all input streams
39+
pipeline.resize(1);
3840
pipeline.addSimpleTransform([&](const Block & header) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA
3941
return std::make_shared<DedupTransform>(header, getOutputStream().header, dedup_func_desc);
4042
});

src/TableFunctions/TableFunctionProxyBase.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ StoragePtr TableFunctionProxyBase::calculateColumnDescriptions(ContextPtr contex
7676
}
7777
else if (subquery)
7878
{
79-
auto interpreter_subquery
80-
= std::make_unique<InterpreterSelectWithUnionQuery>(subquery->children[0], context, SelectQueryOptions().subquery().analyze());
79+
auto interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
80+
subquery->children[0], context, SelectQueryOptions().subquery().analyze().removeDuplicates());
8181

8282
auto source_header = interpreter_subquery->getSampleBlock();
8383
columns = ColumnsDescription(source_header.getNamesAndTypesList());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
3
2+
2024-01-01 00:00:00.100Z 1
3+
2024-01-01 00:00:00.200Z 2
4+
2024-01-01 00:00:00.300Z 3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
DROP STREAM IF EXISTS 99010_test_stream;
2+
CREATE STREAM 99010_test_stream(id int);
3+
4+
SELECT sleep(2) FORMAT Null;
5+
6+
INSERT INTO 99010_test_stream(id, _tp_time) VALUES (1, '2024-01-01 00:00:00.1');
7+
INSERT INTO 99010_test_stream(id, _tp_time) VALUES (2, '2024-01-01 00:00:00.2');
8+
INSERT INTO 99010_test_stream(id, _tp_time) VALUES (3, '2024-01-01 00:00:00.3');
9+
INSERT INTO 99010_test_stream(id, _tp_time) VALUES (1, '2024-01-01 00:00:00.4');
10+
11+
SELECT sleep(3) FORMAT Null;
12+
13+
with cte as (select _tp_time, * from table(99010_test_stream)) select count() from dedup(cte, id);
14+
with cte as (select _tp_time, * from table(99010_test_stream) order by _tp_time asc) select * from dedup(cte, id);
15+
16+
DROP STREAM 99010_test_stream;

0 commit comments

Comments
 (0)