Skip to content

Commit 6a4df0f

Browse files
authored
Bugfix/issue 370 fix backfill with virtual column (#372)
* fix backfill from historical store with virtual column * fix incorrectly caching shared records in tail cache
1 parent 0b7eb54 commit 6a4df0f

File tree

10 files changed

+86
-8
lines changed

10 files changed

+86
-8
lines changed

src/Core/Block.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -978,6 +978,30 @@ void Block::reorderColumnsInplace(const Block & header)
978978
// }
979979
}
980980

981+
void Block::reorderColumnsInplace(const Names & names)
982+
{
983+
assert(names.size() >= 1);
984+
985+
auto num_columns = columns();
986+
if (num_columns <= 1)
987+
return;
988+
989+
assert(num_columns >= names.size());
990+
991+
Block result;
992+
result.reserve(names.size());
993+
994+
for (const auto & name : names)
995+
{
996+
auto & target_col = getByName(name);
997+
result.insert(std::move(target_col));
998+
}
999+
1000+
/// Don't swap block.info
1001+
data.swap(result.data);
1002+
index_by_name.swap(result.index_by_name);
1003+
}
1004+
9811005
void Block::concat(const Block & other)
9821006
{
9831007
assert(blocksHaveEqualStructure(*this, other));

src/Core/Block.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ class Block
167167
/// in-place sort columns according to header
168168
void reorderColumnsInplace(const Block & header);
169169

170+
/// in-place sort columns according to names
171+
void reorderColumnsInplace(const Names & names);
172+
170173
void renameColumn(String new_name, size_t column_pos);
171174

172175
/// Deep clone, use cautiously. Most of time, we don't need deepClone

src/NativeLog/Cache/TailCache.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ void TailCache::put(const StreamShard & stream_shard, RecordPtr record)
3232
{
3333
assert(entry);
3434
std::unique_lock lock(entry->mutex);
35-
entry->shard_records.push_back(std::move(record));
35+
if (record.use_count() == 1)
36+
entry->shard_records.push_back(std::move(record));
37+
else
38+
/// Cache a copy record to prevent other references modified it
39+
entry->shard_records.push_back(record->clone());
40+
3641
if (entry->shard_records.size() > static_cast<size_t>(config.max_cached_entries_per_shard))
3742
{
3843
size -= entry->shard_records.front()->totalSerializedBytes();

src/NativeLog/Record/Record.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ struct Record
5858
setCodec(DB::CompressionMethodByte::NONE);
5959
}
6060

61+
RecordPtr clone() const
62+
{
63+
return std::make_shared<Record>(*this);
64+
}
65+
6166
RecordPtr clone(DB::Block && block_) const
6267
{
6368
auto r{std::make_shared<Record>(sn, std::move(block_))};

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,15 @@ ReadFromMergeTree::ReadFromMergeTree(
9393
Poco::Logger * log_,
9494
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
9595
bool enable_parallel_reading,
96-
std::function<std::shared_ptr<ISource>(Int64 &)> create_streaming_source_)
96+
std::function<std::shared_ptr<ISource>(Int64 &)> create_streaming_source_,
97+
const Names & column_names_to_return)
9798
: ISourceStep(DataStream{
9899
.header = MergeTreeBaseSelectProcessor::transformHeader(
99100
storage_snapshot_->getSampleBlockForColumns(real_column_names_),
100101
getPrewhereInfo(query_info_),
101102
data_.getPartitionValueType(),
102-
virt_column_names_),
103+
virt_column_names_,
104+
column_names_to_return),
103105
.is_streaming = static_cast<bool>(create_streaming_source_)})
104106
, reader_settings(getMergeTreeReaderSettings(context_, query_info_))
105107
, prepared_parts(std::move(parts_))

src/Processors/QueryPlan/ReadFromMergeTree.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ class ReadFromMergeTree final : public ISourceStep
100100
Poco::Logger * log_,
101101
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
102102
bool enable_parallel_reading,
103-
std::function<std::shared_ptr<ISource>(Int64 &)> create_streaming_source_ = {});
103+
std::function<std::shared_ptr<ISource>(Int64 &)> create_streaming_source_ = {},
104+
const Names & column_names_to_return = {});
104105

105106
String getName() const override
106107
{

src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(
633633
}
634634

635635
Block MergeTreeBaseSelectProcessor::transformHeader(
636-
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
636+
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns, const Names & columns_to_return)
637637
{
638638
if (prewhere_info)
639639
{
@@ -676,6 +676,11 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
676676
}
677677

678678
injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns);
679+
680+
/// proton: starts. Only for read concat, we requires return columns in order
681+
if (!columns_to_return.empty())
682+
block.reorderColumnsInplace(columns_to_return);
683+
/// proton: ends.
679684
return block;
680685
}
681686

src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class MergeTreeBaseSelectProcessor : public ISource
5050
~MergeTreeBaseSelectProcessor() override;
5151

5252
static Block transformHeader(
53-
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
53+
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns, const Names & columns_to_return = {});
5454

5555
static std::unique_ptr<MergeTreeBlockSizePredictor> getSizePredictor(
5656
const MergeTreeData::DataPartPtr & data_part,

src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,13 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
12891289

12901290
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
12911291

1292+
/// proton: starts. Add for read concat
1293+
/// Sometimes, historical source is not compatable with streaming source header
1294+
/// its format is always `<real columns> + <virtual columns>`, for example:
1295+
/// `select _tp_shard, i from t1`, historical header is `i, _tp_shard`, but streaming header is `_tp_shard, i`
1296+
bool required_return_columns_in_order = static_cast<bool>(create_streaming_source);
1297+
/// proton: ends.
1298+
12921299
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(
12931300
std::move(parts),
12941301
real_column_names,
@@ -1304,7 +1311,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
13041311
log,
13051312
merge_tree_select_result_ptr,
13061313
enable_parallel_reading,
1307-
std::move(create_streaming_source)
1314+
std::move(create_streaming_source),
1315+
required_return_columns_in_order ? column_names_to_return : Names{}
13081316
);
13091317

13101318
QueryPlanPtr plan = std::make_unique<QueryPlan>();

tests/stream/test_stream_smoke/0099_fixed_issues.json

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,31 @@
572572
["True"]
573573
]}
574574
]
575-
}
575+
},
576+
{
577+
"id": 20,
578+
"tags": ["backfill", "virtual_column"],
579+
"name": "#370",
580+
"description": "Covering backfill from historical with virtual columns test.",
581+
"steps":[
582+
{
583+
"statements": [
584+
{"client":"python", "query_type": "table", "query": "drop stream if exists fixed_test_stream_99"},
585+
{"client":"python", "query_type": "table", "exist": "fixed_test_stream_99", "exist_wait":2, "wait":1, "query": "create stream if not exists fixed_test_stream_99(id int, value int) settings shards=3, sharding_expr='weak_hash32(id)';"},
586+
{"client":"python", "query_type": "table", "depends_on_stream": "fixed_test_stream_99", "wait":2, "query": "insert into fixed_test_stream_99(id, value) values(1, 2);"},
587+
{"client":"python", "query_type": "stream", "depends_on_stream":"fixed_test_stream_99", "query_id":"9920", "query":"select id, _tp_shard from fixed_test_stream_99 where _tp_time > earliest_ts() settings enable_backfill_from_historical_store=true;"},
588+
{"client":"python", "query_type": "table", "kill":9920, "kill_wait":3, "query": "insert into fixed_test_stream_99(id, value) values(2, 2)(3, 3);"},
589+
{"client":"python", "query_type": "table", "kill":9920, "kill_wait":3, "query": "insert into fixed_test_stream_99(id, value) values(2, 2)(3, 3);"}
590+
]
591+
}
592+
],
593+
"expected_results": [
594+
{"query_id":"9920", "expected_results":[
595+
[1, 0],
596+
[2, 0],
597+
[3, 2]
598+
]}
599+
]
600+
}
576601
]
577602
}

0 commit comments

Comments
 (0)