Skip to content

Commit ec96712

Browse files
committed
enable fan out for the same source
1 parent 9a852d7 commit ec96712

24 files changed

+581
-268
lines changed

src/Storages/Streaming/ProxyStream.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,12 @@ ProxyStream::ProxyStream(
107107
QueryProcessingStage::Enum ProxyStream::getQueryProcessingStage(
108108
ContextPtr context_,
109109
QueryProcessingStage::Enum to_stage,
110-
const StorageSnapshotPtr & storage_snapshot,
110+
const StorageSnapshotPtr & /*storage_snapshot*/,
111111
SelectQueryInfo & query_info) const
112112
{
113113
if (storage)
114-
return storage->getQueryProcessingStage(context_, to_stage, storage_snapshot, query_info);
114+
return storage->getQueryProcessingStage(
115+
context_, to_stage, storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_), query_info);
115116
else
116117
/// When it is created by subquery not a table
117118
return QueryProcessingStage::FetchColumns;
@@ -174,7 +175,7 @@ void ProxyStream::read(
174175
void ProxyStream::doRead(
175176
QueryPlan & query_plan,
176177
const Names & column_names,
177-
const StorageSnapshotPtr & storage_snapshot,
178+
const StorageSnapshotPtr & /*storage_snapshot*/,
178179
SelectQueryInfo & query_info,
179180
ContextPtr context_,
180181
QueryProcessingStage::Enum processed_stage,
@@ -206,32 +207,35 @@ void ProxyStream::doRead(
206207
return;
207208
}
208209

210+
assert(storage);
211+
auto proxy_storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_);
209212
if (auto * view = storage->as<StorageView>())
210213
{
211214
auto view_context = createProxySubqueryContext(context_, query_info, isStreamingQuery());
212-
view->read(query_plan, column_names, storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams);
215+
view->read(
216+
query_plan, column_names, proxy_storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams);
213217
query_plan.addInterpreterContext(view_context);
214218
return;
215219
}
216220
else if (auto * materialized_view = storage->as<StorageMaterializedView>())
217221
return materialized_view->read(
218-
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
222+
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
219223
else if (auto * external_stream = storage->as<StorageExternalStream>())
220224
return external_stream->read(
221-
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
225+
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
222226
else if (auto * random_stream = storage->as<StorageRandom>())
223227
return random_stream->read(
224-
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
228+
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
225229
else if (auto * file_stream = storage->as<StorageFile>())
226230
return file_stream->read(
227-
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
231+
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
228232
else if (nested_proxy_storage)
229233
return nested_proxy_storage->read(
230-
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
234+
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
231235

232236
auto * distributed = storage->as<StorageStream>();
233237
assert(distributed);
234-
distributed->read(query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
238+
distributed->read(query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
235239
}
236240

237241
Names ProxyStream::getRequiredColumnsForProxyStorage(const Names & column_names) const

src/Storages/Streaming/SourceColumnsDescription.cpp

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
#include "SourceColumnsDescription.h"
1+
#include <Storages/Streaming/SourceColumnsDescription.h>
22

33
#include <Core/Block.h>
44
#include <NativeLog/Record/Record.h>
55
#include <Storages/StorageSnapshot.h>
66
#include <base/ClockUtils.h>
77
#include <Common/ProtonCommon.h>
88

9+
#include <numeric>
10+
911
namespace DB
1012
{
1113
SourceColumnsDescription::PhysicalColumnPositions &
@@ -30,21 +32,39 @@ void SourceColumnsDescription::PhysicalColumnPositions::clear()
3032
subcolumns.clear();
3133
}
3234

33-
SourceColumnsDescription::SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot)
35+
SourceColumnsDescription::SourceColumnsDescription(
36+
const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read)
3437
: SourceColumnsDescription(
35-
storage_snapshot->getColumnsByNames(GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
38+
storage_snapshot->getColumnsByNames(
39+
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
3640
storage_snapshot->getMetadataForQuery()->getSampleBlock(),
37-
storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()))
41+
storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()),
42+
enable_partial_read)
3843
{
3944
}
4045

41-
SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns)
46+
SourceColumnsDescription::SourceColumnsDescription(
47+
const NamesAndTypesList & columns_to_read,
48+
const Block & schema,
49+
const NamesAndTypesList & all_extended_columns,
50+
bool enable_partial_read)
4251
{
4352
/// FIXME, when we have multi-version of schema, the header and the schema may be mismatched
4453
auto column_size = columns_to_read.size();
4554

55+
if (enable_partial_read)
56+
{
57+
/// Just read required partial physical columns
58+
physical_column_positions_to_read.positions.reserve(column_size);
59+
}
60+
else
61+
{
62+
/// Read full physical columns
63+
physical_column_positions_to_read.positions.resize(schema.columns());
64+
std::iota(physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), 0);
65+
}
66+
4667
positions.reserve(column_size);
47-
physical_column_positions_to_read.positions.reserve(column_size);
4868
subcolumns_to_read.reserve(column_size);
4969

5070
std::vector<uint16_t> read_all_subcolumns_positions;
@@ -112,45 +132,48 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
112132
auto pos_in_schema = schema.getPositionByName(name_in_storage);
113133
const auto & column_in_storage = schema.getByName(name_in_storage);
114134

115-
/// Calculate main column pos
116-
size_t physical_pos_in_schema_to_read = 0;
117-
/// We don't need to read duplicate physical columns from schema
118-
auto physical_pos_iter = std::find(
119-
physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
120-
if (physical_pos_iter == physical_column_positions_to_read.positions.end())
135+
size_t physical_pos_in_schema_to_read = pos_in_schema;
136+
/// Specially, re-calculate pos in partially read schema
137+
if (enable_partial_read)
121138
{
122-
physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
123-
physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
139+
/// We don't need to read duplicate physical columns from schema
140+
auto physical_pos_iter = std::find(
141+
physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
142+
if (physical_pos_iter == physical_column_positions_to_read.positions.end())
143+
{
144+
physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
145+
physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
146+
}
147+
else
148+
physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
149+
}
124150

125-
/// json, array(json), tuple(..., json, ...)
126-
if (column_in_storage.type->hasDynamicSubcolumns())
151+
/// json, array(json), tuple(..., json, ...)
152+
if (column_in_storage.type->hasDynamicSubcolumns())
153+
{
154+
/// We like to read parent json column once if multiple subcolumns of the same json are required
155+
/// like `select json.a, json.b from stream`
156+
auto find_iter = std::find_if(
157+
physical_object_columns_to_read.begin(),
158+
physical_object_columns_to_read.end(),
159+
[&name_in_storage](const auto & col_name_type) { return col_name_type.name == name_in_storage; });
160+
161+
if (find_iter == physical_object_columns_to_read.end())
127162
{
128-
/// We like to read parent json column once if multiple subcolumns of the same json are required
129-
/// like `select json.a, json.b from stream`
130-
auto find_iter = std::find_if(
131-
physical_object_columns_to_read.begin(),
132-
physical_object_columns_to_read.end(),
133-
[&column](const auto & col_name_type) { return col_name_type.name == column.name; });
134-
135-
if (find_iter == physical_object_columns_to_read.end())
163+
if (column.isSubcolumn())
136164
{
137-
if (column.isSubcolumn())
138-
{
139-
/// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
140-
auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
141-
assert(name_and_type);
142-
physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
143-
}
144-
else
145-
{
146-
/// This column is parent json column, like `select json from stream`, use the name and type directly
147-
physical_object_columns_to_read.emplace_back(column);
148-
}
165+
/// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
166+
auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
167+
assert(name_and_type);
168+
physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
169+
}
170+
else
171+
{
172+
/// This column is parent json column, like `select json from stream`, use the name and type directly
173+
physical_object_columns_to_read.emplace_back(column);
149174
}
150175
}
151176
}
152-
else
153-
physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
154177

155178
/// For subcolumn, which dependents on the main column
156179
if (column.isSubcolumn())
@@ -181,7 +204,7 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
181204
physical_column_positions_to_read.subcolumns.erase(pos);
182205

183206
/// Clients like to read virtual columns only, add `_tp_time`, then we know how many rows
184-
if (physical_column_positions_to_read.positions.empty())
207+
if (enable_partial_read && physical_column_positions_to_read.positions.empty())
185208
physical_column_positions_to_read.positions.emplace_back(schema.getPositionByName(ProtonConsts::RESERVED_EVENT_TIME));
186209
}
187210
}

src/Storages/Streaming/SourceColumnsDescription.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
2121
struct SourceColumnsDescription
2222
{
2323
SourceColumnsDescription() = default;
24-
SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot);
25-
SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns);
24+
SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read = true);
25+
SourceColumnsDescription(
26+
const NamesAndTypesList & columns_to_read,
27+
const Block & schema,
28+
const NamesAndTypesList & all_extended_columns,
29+
bool enable_partial_read = true);
2630

2731
enum class ReadColumnType : uint8_t
2832
{

src/Storages/Streaming/StorageStream.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -601,22 +601,18 @@ void StorageStream::readStreaming(
601601

602602
assert(query_info.seek_to_info);
603603
const auto & settings_ref = context_->getSettingsRef();
604-
/// 1) Checkpointed queries shall not be multiplexed
605-
/// 2) Queries which seek to a specific timestamp shall not be multiplexed
606-
auto share_resource_group = (settings_ref.query_resource_group.value == "shared")
607-
&& (query_info.seek_to_info->getSeekTo().empty() || query_info.seek_to_info->getSeekTo() == "latest")
608-
&& (settings_ref.exec_mode == ExecuteMode::NORMAL);
609-
610-
if (share_resource_group)
604+
if (settings_ref.query_resource_group.value == "shared")
611605
{
606+
auto offsets = stream_shards.back()->getOffsets(query_info.seek_to_info);
612607
for (auto stream_shard : shards_to_read)
613608
{
609+
const auto & offset = offsets[stream_shard->shard];
614610
if (!column_names.empty())
615611
pipes.emplace_back(
616-
stream_shard->source_multiplexers->createChannel(stream_shard->shard, column_names, storage_snapshot, context_));
612+
source_multiplexers->createChannel(std::move(stream_shard), column_names, storage_snapshot, context_, offset));
617613
else
618-
pipes.emplace_back(stream_shard->source_multiplexers->createChannel(
619-
stream_shard->shard, {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_));
614+
pipes.emplace_back(source_multiplexers->createChannel(
615+
std::move(stream_shard), {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_, offset));
620616
}
621617

622618
LOG_INFO(log, "Starting reading {} streams in shared resource group", pipes.size());
@@ -945,6 +941,8 @@ void StorageStream::startup()
945941
assert(native_log->enabled());
946942
}
947943

944+
source_multiplexers.reset(new StreamingStoreSourceMultiplexers(getContext(), log));
945+
948946
log_initialized.test_and_set();
949947

950948
LOG_INFO(log, "Started");

src/Storages/Streaming/StorageStream.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <Storages/MergeTree/MergeTreeData.h>
1212
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
13+
#include <Storages/Streaming/StreamingStoreSourceMultiplexer.h>
1314

1415
namespace nlog
1516
{
@@ -298,8 +299,7 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg
298299
UInt64 base_block_id,
299300
UInt64 sub_block_id);
300301

301-
void
302-
appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
302+
void appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
303303

304304
void appendToKafka(
305305
nlog::RecordPtr & record,
@@ -354,5 +354,8 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg
354354

355355
std::atomic_flag inited;
356356
std::atomic_flag stopped;
357+
358+
/// Multiplex latest records of each shard.
359+
std::unique_ptr<StreamingStoreSourceMultiplexers> source_multiplexers;
357360
};
358361
}

src/Storages/Streaming/StreamShard.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ StreamShard::~StreamShard()
124124

125125
void StreamShard::startup()
126126
{
127-
source_multiplexers.reset(new StreamingStoreSourceMultiplexers(shared_from_this(), storage_stream->getContext(), log));
128-
129127
initLog();
130128

131129
/// for virtual tables or in-memory storage type, there is no storage object

src/Storages/Streaming/StreamShard.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ class StreamShard final : public std::enable_shared_from_this<StreamShard>
148148

149149
std::unique_ptr<StreamCallbackData> callback_data;
150150

151-
std::unique_ptr<StreamingStoreSourceMultiplexers> source_multiplexers;
152-
153151
// For random shard index generation
154152
mutable std::mutex rng_mutex;
155153
pcg64 rng;

src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,8 @@ nlog::RecordPtrs StreamingBlockReaderNativeLog::processCached(nlog::RecordPtrs r
148148
{
149149
/// In general, an object has a large number of subcolumns,
150150
/// so when a few subcolumns required for the object, we only copy partials to improve performance
151-
if (isObject(col_with_type->type) && !schema_ctx.column_positions.positions.empty())
151+
if (isObject(col_with_type->type) && !schema_ctx.column_positions.subcolumns.empty())
152152
{
153-
assert(column_names.size() == schema_ctx.column_positions.positions.size());
154153
auto iter = schema_ctx.column_positions.subcolumns.find(i);
155154
if (iter != schema_ctx.column_positions.subcolumns.end())
156155
{

src/Storages/Streaming/StreamingStoreSource.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#include "StreamingStoreSource.h"
2-
#include "StreamShard.h"
3-
#include "StreamingBlockReaderKafka.h"
4-
#include "StreamingBlockReaderNativeLog.h"
1+
#include <Storages/Streaming/StreamShard.h>
2+
#include <Storages/Streaming/StreamingBlockReaderKafka.h>
3+
#include <Storages/Streaming/StreamingBlockReaderNativeLog.h>
4+
#include <Storages/Streaming/StreamingStoreSource.h>
55

66
#include <Interpreters/inplaceBlockConversions.h>
77
#include <KafkaLog/KafkaWALPool.h>
@@ -16,7 +16,8 @@ StreamingStoreSource::StreamingStoreSource(
1616
ContextPtr context_,
1717
Int64 sn,
1818
Poco::Logger * log_)
19-
: StreamingStoreSourceBase(header, storage_snapshot_, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
19+
: StreamingStoreSourceBase(
20+
header, storage_snapshot_, /*enable_partial_read*/ true, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
2021
{
2122
const auto & settings = query_context->getSettingsRef();
2223
if (settings.record_consume_batch_count.value != 0)
@@ -98,7 +99,8 @@ void StreamingStoreSource::readAndProcess()
9899
/// NOTE: The `FilterTransform` will try optimizing filter ConstColumn to always_false or always_true,
99100
/// for exmaple: `_tp_sn < 1`, if filter first data _tp_sn is 0, it will be optimized always_true.
100101
/// So we can not create a constant column, since the virtual column data isn't constants value in fact.
101-
auto virtual_column = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
102+
auto virtual_column
103+
= columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
102104
columns.push_back(std::move(virtual_column));
103105
break;
104106
}

src/Storages/Streaming/StreamingStoreSource.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "StreamingStoreSourceBase.h"
3+
#include <Storages/Streaming/StreamingStoreSourceBase.h>
44

55
namespace DB
66
{

0 commit comments

Comments
 (0)