Skip to content

Commit ea30ae1

Browse files
authored
throw exception when using emit changelog without _tp_delta column (#466)
1 parent cd43c4d commit ea30ae1

File tree

6 files changed

+24
-17
lines changed

6 files changed

+24
-17
lines changed

src/Interpreters/Streaming/ChangelogQueryVisitor.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,14 @@ void ChangelogQueryVisitorMatcher::addDeltaColumn(ASTSelectQuery & select_query,
128128
}
129129
}
130130

131-
/// Need add delta if _tp_delta is not present and the @p select_query is a subquery or an `EMIT CHANGELOG` query
132-
if (!found_delta_col && (is_subquery || query_info.force_emit_changelog))
133-
select_expression_list->children.emplace_back(std::make_shared<ASTIdentifier>(ProtonConsts::RESERVED_DELTA_FLAG));
131+
if (!found_delta_col)
132+
{
133+
if (is_subquery)
134+
/// Need add delta if _tp_delta is not present and the @p select_query is a subquery
135+
select_expression_list->children.emplace_back(std::make_shared<ASTIdentifier>(ProtonConsts::RESERVED_DELTA_FLAG));
136+
else if (query_info.force_emit_changelog)
137+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The query with emit changelog explicitly requires a `_tp_delta` in select list");
138+
}
134139

135140
if (add_new_required_result_columns)
136141
new_required_result_column_names.push_back(ProtonConsts::RESERVED_DELTA_FLAG);

src/Storages/StorageView.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ void StorageView::read(
139139
Streaming::rewriteAsChangelogQuery(current_inner_query->as<ASTSelectWithUnionQuery &>());
140140
/// proton: ends.
141141

142-
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, false, query_info.settings_limit_offset_done);
142+
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done);
143143
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, options, column_names);
144144
interpreter.addStorageLimits(*query_info.storage_limits);
145145
interpreter.buildQueryPlan(query_plan);

tests/stream/test_stream_smoke/0013_changelog_stream11.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,8 @@ tests:
454454
create materialized view if not exists test14_view1_11 into test14_target_multishard_changelog_kv_11 as
455455
(select i,
456456
k1,
457-
k2
457+
k2,
458+
_tp_delta
458459
from
459460
(select i,
460461
k1,
@@ -1257,7 +1258,8 @@ tests:
12571258
create materialized view if not exists test14_view1_11 into test14_target_changelog_kv_multishard_11 as
12581259
(select i,
12591260
k1,
1260-
k2
1261+
k2,
1262+
_tp_delta
12611263
from
12621264
(select i,
12631265
k1,

tests/stream/test_stream_smoke/0013_changelog_stream13.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ tests:
4040
wait: 2
4141
depends_on_stream: v_12181627
4242
query_id: '12181627214'
43-
query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val)
43+
query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val), _tp_delta
4444
from v_12181627 emit changelog settings checkpoint_interval=1;
4545
- client: python
4646
query_type: table

tests/stream/test_stream_smoke/0013_changelog_stream8.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ tests:
282282
- client: python
283283
query_type: table
284284
wait: 2
285-
query: create materialized view if not exists test14_view2_8 into test14_target_changelog_kv_8 as (select i, k1, k2 from test14_view1_8 emit changelog);
285+
query: create materialized view if not exists test14_view2_8 into test14_target_changelog_kv_8 as (select i, k1, k2, _tp_delta from test14_view1_8 emit changelog);
286286

287287
- client: python
288288
query_id: '1450'

tests/stream/test_stream_smoke/0029_emit_changelog.yaml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ tests:
4545
depends_on_stream: test30_stream
4646
wait: 1
4747
query: |
48-
select count() from test30_stream emit changelog periodic 1s;
48+
select count(), _tp_delta from test30_stream emit changelog periodic 1s;
4949
5050
- client: python
5151
query_type: table
@@ -96,7 +96,7 @@ tests:
9696
depends_on_stream: test30_vk_stream
9797
wait: 1
9898
query: |
99-
select k, count() from test30_vk_stream group by k emit changelog periodic 1s;
99+
select k, count(), _tp_delta from test30_vk_stream group by k emit changelog periodic 1s;
100100
101101
- client: python
102102
query_type: table
@@ -148,7 +148,7 @@ tests:
148148
depends_on_stream: test30_changelog_kv_stream
149149
wait: 1
150150
query: |
151-
select k, count() from test30_changelog_kv_stream group by k emit changelog periodic 1s;
151+
select k, count(), _tp_delta from test30_changelog_kv_stream group by k emit changelog periodic 1s;
152152
153153
- client: python
154154
query_type: table
@@ -200,7 +200,7 @@ tests:
200200
depends_on_stream: test30_stream
201201
wait: 1
202202
query: |
203-
select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s;
203+
select k, count(), _tp_delta from changelog(test30_stream, i) group by k emit changelog periodic 1s;
204204
205205
- client: python
206206
query_type: table
@@ -259,14 +259,14 @@ tests:
259259
query_id: 3001
260260
depends_on_stream: test30_stream
261261
query: |
262-
select count() from (select k, count() from changelog(test30_stream, i) group by k emit periodic 1s) emit changelog periodic 1s;
262+
select count(), _tp_delta from (select k, count() from changelog(test30_stream, i) group by k emit periodic 1s) emit changelog periodic 1s;
263263
264264
- client: python
265265
query_type: stream
266266
query_id: 3002
267267
depends_on_stream: test30_stream
268268
query: |
269-
select count() from (select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s) emit changelog periodic 1s;
269+
select count(), _tp_delta from (select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s) emit changelog periodic 1s;
270270
271271
- client: python
272272
query_type: table
@@ -339,13 +339,13 @@ tests:
339339
depends_on_stream: test30_right_vk_stream
340340
wait: 1
341341
query: |
342-
select i, count(), max(test30_right_vk_stream.k) from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s;
342+
select i, count(), max(test30_right_vk_stream.k), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s;
343343
344344
- client: python
345345
query_type: stream
346346
query_id: 3001
347347
query: |
348-
select test30_right_vk_stream.k, count(), max(i) from test30_left_stream join test30_right_vk_stream using(i) group by test30_right_vk_stream.k emit changelog periodic 1s;
348+
select test30_right_vk_stream.k, count(), max(i), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by test30_right_vk_stream.k emit changelog periodic 1s;
349349
350350
- client: python
351351
query_type: table
@@ -428,7 +428,7 @@ tests:
428428
depends_on_stream: test30_right_vk_stream
429429
wait: 1
430430
query: |
431-
subscribe to select i, count(), max(test30_right_vk_stream.k) from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s settings checkpoint_interval=1;
431+
subscribe to select i, count(), max(test30_right_vk_stream.k), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s settings checkpoint_interval=1;
432432
433433
- client: python
434434
query_type: table

0 commit comments

Comments
 (0)