diff --git a/src/Storages/Streaming/ProxyStream.cpp b/src/Storages/Streaming/ProxyStream.cpp index b335b2f3965..eae9b9d302b 100644 --- a/src/Storages/Streaming/ProxyStream.cpp +++ b/src/Storages/Streaming/ProxyStream.cpp @@ -107,11 +107,12 @@ ProxyStream::ProxyStream( QueryProcessingStage::Enum ProxyStream::getQueryProcessingStage( ContextPtr context_, QueryProcessingStage::Enum to_stage, - const StorageSnapshotPtr & storage_snapshot, + const StorageSnapshotPtr & /*storage_snapshot*/, SelectQueryInfo & query_info) const { if (storage) - return storage->getQueryProcessingStage(context_, to_stage, storage_snapshot, query_info); + return storage->getQueryProcessingStage( + context_, to_stage, storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_), query_info); else /// When it is created by subquery not a table return QueryProcessingStage::FetchColumns; @@ -174,7 +175,7 @@ void ProxyStream::read( void ProxyStream::doRead( QueryPlan & query_plan, const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, + const StorageSnapshotPtr & /*storage_snapshot*/, SelectQueryInfo & query_info, ContextPtr context_, QueryProcessingStage::Enum processed_stage, @@ -206,32 +207,35 @@ void ProxyStream::doRead( return; } + assert(storage); + auto proxy_storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_); if (auto * view = storage->as()) { auto view_context = createProxySubqueryContext(context_, query_info, isStreamingQuery()); - view->read(query_plan, column_names, storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams); + view->read( + query_plan, column_names, proxy_storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams); query_plan.addInterpreterContext(view_context); return; } else if (auto * materialized_view = storage->as()) return materialized_view->read( - query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); + query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); else if (auto * external_stream = storage->as()) return external_stream->read( - query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); + query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); else if (auto * random_stream = storage->as()) return random_stream->read( - query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); + query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); else if (auto * file_stream = storage->as()) return file_stream->read( - query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); + query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); else if (nested_proxy_storage) return nested_proxy_storage->read( - query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); + query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); auto * distributed = storage->as(); assert(distributed); - distributed->read(query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); + distributed->read(query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams); } Names ProxyStream::getRequiredColumnsForProxyStorage(const Names & column_names) const diff --git a/src/Storages/Streaming/SourceColumnsDescription.cpp b/src/Storages/Streaming/SourceColumnsDescription.cpp index c76b7eca409..e127f7bb6ec 100644 --- a/src/Storages/Streaming/SourceColumnsDescription.cpp +++ b/src/Storages/Streaming/SourceColumnsDescription.cpp @@ -1,4 +1,4 @@ -#include "SourceColumnsDescription.h" +#include #include #include @@ -6,6 +6,8 @@ #include #include +#include + namespace DB { SourceColumnsDescription::PhysicalColumnPositions & @@ -30,21 +32,39 @@ void SourceColumnsDescription::PhysicalColumnPositions::clear() subcolumns.clear(); } -SourceColumnsDescription::SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot) +SourceColumnsDescription::SourceColumnsDescription( + const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read) : SourceColumnsDescription( - storage_snapshot->getColumnsByNames(GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names), + storage_snapshot->getColumnsByNames( + GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names), storage_snapshot->getMetadataForQuery()->getSampleBlock(), - storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects())) + storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()), + enable_partial_read) { } -SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns) +SourceColumnsDescription::SourceColumnsDescription( + const NamesAndTypesList & columns_to_read, + const Block & schema, + const NamesAndTypesList & all_extended_columns, + bool enable_partial_read) { /// FIXME, when we have multi-version of schema, the header and the schema may be mismatched auto column_size = columns_to_read.size(); + if (enable_partial_read) + { + /// Just read required partial physical columns + physical_column_positions_to_read.positions.reserve(column_size); + } + else + { + /// Read full physical columns + physical_column_positions_to_read.positions.resize(schema.columns()); + std::iota(physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), 0); + } + positions.reserve(column_size); - physical_column_positions_to_read.positions.reserve(column_size); subcolumns_to_read.reserve(column_size); std::vector read_all_subcolumns_positions; @@ -112,45 +132,48 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col auto pos_in_schema = schema.getPositionByName(name_in_storage); const auto & column_in_storage = schema.getByName(name_in_storage); - /// Calculate main column pos - size_t physical_pos_in_schema_to_read = 0; - /// We don't need to read duplicate physical columns from schema - auto physical_pos_iter = std::find( - physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema); - if (physical_pos_iter == physical_column_positions_to_read.positions.end()) + size_t physical_pos_in_schema_to_read = pos_in_schema; + /// Specially, re-calculate pos in partially read schema + if (enable_partial_read) { - physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size(); - physical_column_positions_to_read.positions.emplace_back(pos_in_schema); + /// We don't need to read duplicate physical columns from schema + auto physical_pos_iter = std::find( + physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema); + if (physical_pos_iter == physical_column_positions_to_read.positions.end()) + { + physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size(); + physical_column_positions_to_read.positions.emplace_back(pos_in_schema); + } + else + physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin(); + } - /// json, array(json), tuple(..., json, ...) - if (column_in_storage.type->hasDynamicSubcolumns()) + /// json, array(json), tuple(..., json, ...) + if (column_in_storage.type->hasDynamicSubcolumns()) + { + /// We like to read parent json column once if multiple subcolumns of the same json are required + /// like `select json.a, json.b from stream` + auto find_iter = std::find_if( + physical_object_columns_to_read.begin(), + physical_object_columns_to_read.end(), + [&name_in_storage](const auto & col_name_type) { return col_name_type.name == name_in_storage; }); + + if (find_iter == physical_object_columns_to_read.end()) { - /// We like to read parent json column once if multiple subcolumns of the same json are required - /// like `select json.a, json.b from stream` - auto find_iter = std::find_if( - physical_object_columns_to_read.begin(), - physical_object_columns_to_read.end(), - [&column](const auto & col_name_type) { return col_name_type.name == column.name; }); - - if (find_iter == physical_object_columns_to_read.end()) + if (column.isSubcolumn()) { - if (column.isSubcolumn()) - { - /// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column - auto name_and_type = all_extended_columns.tryGetByName(name_in_storage); - assert(name_and_type); - physical_object_columns_to_read.emplace_back(std::move(*name_and_type)); - } - else - { - /// This column is parent json column, like `select json from stream`, use the name and type directly - physical_object_columns_to_read.emplace_back(column); - } + /// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column + auto name_and_type = all_extended_columns.tryGetByName(name_in_storage); + assert(name_and_type); + physical_object_columns_to_read.emplace_back(std::move(*name_and_type)); + } + else + { + /// This column is parent json column, like `select json from stream`, use the name and type directly + physical_object_columns_to_read.emplace_back(column); } } } - else - physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin(); /// For subcolumn, which dependents on the main column if (column.isSubcolumn()) @@ -181,7 +204,7 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col physical_column_positions_to_read.subcolumns.erase(pos); /// Clients like to read virtual columns only, add `_tp_time`, then we know how many rows - if (physical_column_positions_to_read.positions.empty()) + if (enable_partial_read && physical_column_positions_to_read.positions.empty()) physical_column_positions_to_read.positions.emplace_back(schema.getPositionByName(ProtonConsts::RESERVED_EVENT_TIME)); } } diff --git a/src/Storages/Streaming/SourceColumnsDescription.h b/src/Storages/Streaming/SourceColumnsDescription.h index d6b3afc27b7..17bfadf2456 100644 --- a/src/Storages/Streaming/SourceColumnsDescription.h +++ b/src/Storages/Streaming/SourceColumnsDescription.h @@ -21,8 +21,12 @@ using StorageSnapshotPtr = std::shared_ptr; struct SourceColumnsDescription { SourceColumnsDescription() = default; - SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot); - SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns); + SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read = true); + SourceColumnsDescription( + const NamesAndTypesList & columns_to_read, + const Block & schema, + const NamesAndTypesList & all_extended_columns, + bool enable_partial_read = true); enum class ReadColumnType : uint8_t { diff --git a/src/Storages/Streaming/StorageStream.cpp b/src/Storages/Streaming/StorageStream.cpp index 9183f50adc2..532cc085b8f 100644 --- a/src/Storages/Streaming/StorageStream.cpp +++ b/src/Storages/Streaming/StorageStream.cpp @@ -486,15 +486,19 @@ void StorageStream::readConcat( for (auto & stream_shard : shards_to_read) { auto create_streaming_source = [this, header, storage_snapshot, stream_shard, seek_to_info = query_info.seek_to_info, context_]( - Int64 & max_sn_in_parts) { + Int64 & max_sn_in_parts) -> SourcePtr { if (max_sn_in_parts < 0) { /// Fallback to seek streaming store auto offsets = stream_shard->getOffsets(seek_to_info); LOG_INFO(log, "Fused read fallbacks to seek stream for shard={} since there are no historical data", stream_shard->shard); - return std::make_shared( - stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log); + if (context_->getSettingsRef().query_resource_group.value == "shared") + return source_multiplexers->createChannel( + stream_shard, header.getNames(), storage_snapshot, context_, offsets[stream_shard->shard]); + else + return std::make_shared( + stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log); } auto committed = stream_shard->storage->inMemoryCommittedSN(); @@ -526,7 +530,12 @@ void StorageStream::readConcat( max_sn_in_parts, committed); - return std::make_shared(stream_shard, header, storage_snapshot, context_, max_sn_in_parts + 1, log); + if (context_->getSettingsRef().query_resource_group.value == "shared") + return source_multiplexers->createChannel( + stream_shard, header.getNames(), storage_snapshot, context_, max_sn_in_parts + 1); + else + return std::make_shared( + stream_shard, header, storage_snapshot, context_, max_sn_in_parts + 1, log); } else { @@ -542,8 +551,12 @@ void StorageStream::readConcat( /// We need reset max_sn_in_parts to tell caller that we are seeking streaming store directly max_sn_in_parts = -1; - return std::make_shared( - stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log); + if (context_->getSettingsRef().query_resource_group.value == "shared") + return source_multiplexers->createChannel( + stream_shard, header.getNames(), storage_snapshot, context_, offsets[stream_shard->shard]); + else + return std::make_shared( + stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log); } }; @@ -601,22 +614,18 @@ void StorageStream::readStreaming( assert(query_info.seek_to_info); const auto & settings_ref = context_->getSettingsRef(); - /// 1) Checkpointed queries shall not be multiplexed - /// 2) Queries which seek to a specific timestamp shall not be multiplexed - auto share_resource_group = (settings_ref.query_resource_group.value == "shared") - && (query_info.seek_to_info->getSeekTo().empty() || query_info.seek_to_info->getSeekTo() == "latest") - && (settings_ref.exec_mode == ExecuteMode::NORMAL); - - if (share_resource_group) + if (settings_ref.query_resource_group.value == "shared") { + auto offsets = stream_shards.back()->getOffsets(query_info.seek_to_info); for (auto stream_shard : shards_to_read) { + const auto & offset = offsets[stream_shard->shard]; if (!column_names.empty()) pipes.emplace_back( - stream_shard->source_multiplexers->createChannel(stream_shard->shard, column_names, storage_snapshot, context_)); + source_multiplexers->createChannel(std::move(stream_shard), column_names, storage_snapshot, context_, offset)); else - pipes.emplace_back(stream_shard->source_multiplexers->createChannel( - stream_shard->shard, {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_)); + pipes.emplace_back(source_multiplexers->createChannel( + std::move(stream_shard), {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_, offset)); } LOG_INFO(log, "Starting reading {} streams in shared resource group", pipes.size()); @@ -945,6 +954,8 @@ void StorageStream::startup() assert(native_log->enabled()); } + source_multiplexers.reset(new StreamingStoreSourceMultiplexers(getContext(), log)); + log_initialized.test_and_set(); LOG_INFO(log, "Started"); diff --git a/src/Storages/Streaming/StorageStream.h b/src/Storages/Streaming/StorageStream.h index 15e0f901763..2afc88c8250 100644 --- a/src/Storages/Streaming/StorageStream.h +++ b/src/Storages/Streaming/StorageStream.h @@ -10,6 +10,7 @@ #include #include +#include namespace nlog { @@ -298,8 +299,7 @@ class StorageStream final : public shared_ptr_helper, public Merg UInt64 base_block_id, UInt64 sub_block_id); - void - appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data); + void appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data); void appendToKafka( nlog::RecordPtr & record, @@ -354,5 +354,8 @@ class StorageStream final : public shared_ptr_helper, public Merg std::atomic_flag inited; std::atomic_flag stopped; + + /// Multiplex latest records of each shard. + std::unique_ptr source_multiplexers; }; } diff --git a/src/Storages/Streaming/StreamShard.cpp b/src/Storages/Streaming/StreamShard.cpp index cf90cf7def1..ebe9c1e003b 100644 --- a/src/Storages/Streaming/StreamShard.cpp +++ b/src/Storages/Streaming/StreamShard.cpp @@ -124,8 +124,6 @@ StreamShard::~StreamShard() void StreamShard::startup() { - source_multiplexers.reset(new StreamingStoreSourceMultiplexers(shared_from_this(), storage_stream->getContext(), log)); - initLog(); /// for virtual tables or in-memory storage type, there is no storage object diff --git a/src/Storages/Streaming/StreamShard.h b/src/Storages/Streaming/StreamShard.h index 781001cce44..1b47c486f69 100644 --- a/src/Storages/Streaming/StreamShard.h +++ b/src/Storages/Streaming/StreamShard.h @@ -148,8 +148,6 @@ class StreamShard final : public std::enable_shared_from_this std::unique_ptr callback_data; - std::unique_ptr source_multiplexers; - // For random shard index generation mutable std::mutex rng_mutex; pcg64 rng; diff --git a/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp b/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp index 64b0f613441..69b18b598ac 100644 --- a/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp +++ b/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp @@ -148,9 +148,8 @@ nlog::RecordPtrs StreamingBlockReaderNativeLog::processCached(nlog::RecordPtrs r { /// In general, an object has a large number of subcolumns, /// so when a few subcolumns required for the object, we only copy partials to improve performance - if (isObject(col_with_type->type) && !schema_ctx.column_positions.positions.empty()) + if (isObject(col_with_type->type) && !schema_ctx.column_positions.subcolumns.empty()) { - assert(column_names.size() == schema_ctx.column_positions.positions.size()); auto iter = schema_ctx.column_positions.subcolumns.find(i); if (iter != schema_ctx.column_positions.subcolumns.end()) { diff --git a/src/Storages/Streaming/StreamingStoreSource.cpp b/src/Storages/Streaming/StreamingStoreSource.cpp index 5bf9dfb7ea2..11e4a01964c 100644 --- a/src/Storages/Streaming/StreamingStoreSource.cpp +++ b/src/Storages/Streaming/StreamingStoreSource.cpp @@ -1,7 +1,7 @@ -#include "StreamingStoreSource.h" -#include "StreamShard.h" -#include "StreamingBlockReaderKafka.h" -#include "StreamingBlockReaderNativeLog.h" +#include +#include +#include +#include #include #include @@ -16,7 +16,8 @@ StreamingStoreSource::StreamingStoreSource( ContextPtr context_, Int64 sn, Poco::Logger * log_) - : StreamingStoreSourceBase(header, storage_snapshot_, std::move(context_), log_, ProcessorID::StreamingStoreSourceID) + : StreamingStoreSourceBase( + header, storage_snapshot_, /*enable_partial_read*/ true, std::move(context_), log_, ProcessorID::StreamingStoreSourceID) { const auto & settings = query_context->getSettingsRef(); if (settings.record_consume_batch_count.value != 0) @@ -98,7 +99,8 @@ void StreamingStoreSource::readAndProcess() /// NOTE: The `FilterTransform` will try optimizing filter ConstColumn to always_false or always_true, /// for exmaple: `_tp_sn < 1`, if filter first data _tp_sn is 0, it will be optimized always_true. /// So we can not create a constant column, since the virtual column data isn't constants value in fact. - auto virtual_column = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst(); + auto virtual_column + = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst(); columns.push_back(std::move(virtual_column)); break; } diff --git a/src/Storages/Streaming/StreamingStoreSource.h b/src/Storages/Streaming/StreamingStoreSource.h index a28acb88511..72aa93e2645 100644 --- a/src/Storages/Streaming/StreamingStoreSource.h +++ b/src/Storages/Streaming/StreamingStoreSource.h @@ -1,6 +1,6 @@ #pragma once -#include "StreamingStoreSourceBase.h" +#include namespace DB { diff --git a/src/Storages/Streaming/StreamingStoreSourceBase.cpp b/src/Storages/Streaming/StreamingStoreSourceBase.cpp index 61ec43ae665..9b47c4385ee 100644 --- a/src/Storages/Streaming/StreamingStoreSourceBase.cpp +++ b/src/Storages/Streaming/StreamingStoreSourceBase.cpp @@ -1,4 +1,4 @@ -#include "StreamingStoreSourceBase.h" +#include #include #include @@ -16,14 +16,19 @@ extern const int RECOVER_CHECKPOINT_FAILED; } StreamingStoreSourceBase::StreamingStoreSourceBase( - const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr query_context_, Poco::Logger * log_, ProcessorID pid_) + const Block & header, + const StorageSnapshotPtr & storage_snapshot_, + bool enable_partial_read, + ContextPtr query_context_, + Poco::Logger * log_, + ProcessorID pid_) : ISource(header, true, pid_) , storage_snapshot( std::make_shared(*storage_snapshot_)) /// We like to make a copy of it since we will mutate the snapshot , query_context(std::move(query_context_)) , log(log_) , header_chunk(header.getColumns(), 0) - , columns_desc(header.getNames(), storage_snapshot) + , columns_desc(header.getNames(), storage_snapshot, enable_partial_read) { is_streaming = true; @@ -52,7 +57,8 @@ StreamingStoreSourceBase::getSubcolumnFromBlock(const Block & block, size_t pare /// Convert subcolumn if the subcolumn type of dynamic object may be dismatched with header. /// FIXME: Cache the ExpressionAction - Block subcolumn_block({ColumnWithTypeAndName{std::move(subcolumn), std::move(subcolumn_type), subcolumn_pair.name}}); /// NOLINT(performance-move-const-arg) + Block subcolumn_block({ColumnWithTypeAndName{ + std::move(subcolumn), std::move(subcolumn_type), subcolumn_pair.name}}); /// NOLINT(performance-move-const-arg) ExpressionActions convert_act(ActionsDAG::makeConvertingActions( subcolumn_block.getColumnsWithTypeAndName(), {ColumnWithTypeAndName{subcolumn_pair.type->createColumn(), subcolumn_pair.type, subcolumn_pair.name}}, diff --git a/src/Storages/Streaming/StreamingStoreSourceBase.h b/src/Storages/Streaming/StreamingStoreSourceBase.h index 69194cf49e8..2573c68a0ab 100644 --- a/src/Storages/Streaming/StreamingStoreSourceBase.h +++ b/src/Storages/Streaming/StreamingStoreSourceBase.h @@ -1,6 +1,6 @@ #pragma once -#include "SourceColumnsDescription.h" +#include #include #include @@ -16,7 +16,12 @@ class StreamingStoreSourceBase : public ISource { public: StreamingStoreSourceBase( - const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr context_, Poco::Logger * log_, ProcessorID pid_); + const Block & header, + const StorageSnapshotPtr & storage_snapshot_, + bool enable_partial_read, + ContextPtr context_, + Poco::Logger * log_, + ProcessorID pid_); Chunk generate() override; diff --git a/src/Storages/Streaming/StreamingStoreSourceChannel.cpp b/src/Storages/Streaming/StreamingStoreSourceChannel.cpp index 1f51808fa44..81c41eb92b7 100644 --- a/src/Storages/Streaming/StreamingStoreSourceChannel.cpp +++ b/src/Storages/Streaming/StreamingStoreSourceChannel.cpp @@ -1,8 +1,9 @@ -#include "StreamingStoreSourceChannel.h" -#include "StreamingStoreSourceMultiplexer.h" +#include #include +#include #include +#include namespace DB { @@ -12,24 +13,34 @@ StreamingStoreSourceChannel::StreamingStoreSourceChannel( StorageSnapshotPtr storage_snapshot_, ContextPtr query_context_, Poco::Logger * log_) - : StreamingStoreSourceBase(header, storage_snapshot_, std::move(query_context_), log_, ProcessorID::StreamingStoreSourceChannelID) /// NOLINT(performance-move-const-arg) + : StreamingStoreSourceBase( + header, + storage_snapshot_, + /*enable_partial_read*/ false, + std::move(query_context_), + log_, + ProcessorID::StreamingStoreSourceChannelID) /// NOLINT(performance-move-const-arg) , id(sequence_id++) , multiplexer(std::move(multiplexer_)) , records_queue(1000) { + const auto & settings = query_context->getSettingsRef(); + if (settings.record_consume_timeout_ms.value != 0) + record_consume_timeout_ms = static_cast(settings.record_consume_timeout_ms.value); } std::atomic StreamingStoreSourceChannel::sequence_id = 0; StreamingStoreSourceChannel::~StreamingStoreSourceChannel() { + std::lock_guard lock(multiplexer_mutex); multiplexer->removeChannel(id); } void StreamingStoreSourceChannel::readAndProcess() { nlog::RecordPtrs records; - auto got_records = records_queue.tryPop(records, 100); + auto got_records = records_queue.tryPop(records, record_consume_timeout_ms); if (!got_records) return; @@ -48,13 +59,19 @@ void StreamingStoreSourceChannel::readAndProcess() if (record->empty()) continue; + /// Ingore duplicate records, It's possible for re-attach to shared group from independent multiplexer + if (record->getSN() <= last_sn) + continue; + + last_sn = record->getSN(); + Columns columns; columns.reserve(header_chunk.getNumColumns()); Block & block = record->getBlock(); auto rows = block.rows(); /// Block in channel shall always contain full columns - assert(block.columns() == columns_desc.positions.size()); + assert(block.columns() == columns_desc.physical_column_positions_to_read.positions.size()); fillAndUpdateObjectsIfNecessary(block); @@ -76,8 +93,12 @@ void StreamingStoreSourceChannel::readAndProcess() /// The current column to return is a virtual column which needs be calculated lively assert(columns_desc.virtual_col_calcs[pos.virtualPosition()]); auto ts = columns_desc.virtual_col_calcs[pos.virtualPosition()](record); - auto time_column = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts); - columns.push_back(std::move(time_column)); + /// NOTE: The `FilterTransform` will try optimizing filter ConstColumn to always_false or always_true, + /// for exmaple: `_tp_sn < 1`, if filter first data _tp_sn is 0, it will be optimized always_true. + /// So we can not create a constant column, since the virtual column data isn't constants value in fact. + auto virtual_column + = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst(); + columns.push_back(std::move(virtual_column)); break; } case SourceColumnsDescription::ReadColumnType::SUB: @@ -109,6 +130,27 @@ void StreamingStoreSourceChannel::add(nlog::RecordPtrs records) std::pair StreamingStoreSourceChannel::getStreamShard() const { + std::lock_guard lock(multiplexer_mutex); return multiplexer->getStreamShard(); } + +void StreamingStoreSourceChannel::attachTo(std::shared_ptr new_multiplexer) +{ + std::lock_guard lock(multiplexer_mutex); + multiplexer = std::move(new_multiplexer); +} + +void StreamingStoreSourceChannel::recover(CheckpointContextPtr ckpt_ctx_) +{ + StreamingStoreSourceBase::recover(std::move(ckpt_ctx_)); + + if (last_sn >= 0) + { + std::lock_guard lock(multiplexer_mutex); + assert(multiplexer->totalChannels() == 1); + multiplexer->resetSequenceNumber(last_sn + 1); + multiplexer->startup(); + } +} + } diff --git a/src/Storages/Streaming/StreamingStoreSourceChannel.h b/src/Storages/Streaming/StreamingStoreSourceChannel.h index 9d8264abb02..e7c68a5857b 100644 --- a/src/Storages/Streaming/StreamingStoreSourceChannel.h +++ b/src/Storages/Streaming/StreamingStoreSourceChannel.h @@ -1,8 +1,7 @@ #pragma once -#include "StreamingStoreSourceBase.h" - #include +#include #include namespace DB @@ -21,6 +20,10 @@ class StreamingStoreSourceChannel final : public StreamingStoreSourceBase ~StreamingStoreSourceChannel() override; + void attachTo(std::shared_ptr new_multiplexer); + + void recover(CheckpointContextPtr ckpt_ctx_) override; + String getName() const override { return "StreamingStoreSourceChannel"; } UInt32 getID() const { return id; } @@ -35,8 +38,12 @@ class StreamingStoreSourceChannel final : public StreamingStoreSourceBase static std::atomic sequence_id; UInt32 id; + + mutable std::mutex multiplexer_mutex; std::shared_ptr multiplexer; + Int32 record_consume_timeout_ms = 100; + /// FIXME, use another lock-free one? ConcurrentBoundedQueue records_queue; }; diff --git a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp index 73895db03f5..f15399a6c13 100644 --- a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp +++ b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp @@ -1,7 +1,9 @@ -#include "StreamingStoreSourceMultiplexer.h" -#include "StreamShard.h" +#include #include +#include +#include +#include #include namespace DB @@ -13,18 +15,51 @@ extern const int DWAL_FATAL_ERROR; } StreamingStoreSourceMultiplexer::StreamingStoreSourceMultiplexer( - UInt32 id_, std::shared_ptr stream_shard_, ContextPtr global_context, Poco::Logger * log_) + UInt32 id_, + std::shared_ptr stream_shard_, + ContextPtr query_context, + Poco::Logger * log_, + StreamingStoreSourceMultiplexer::AttachToSharedGroupFunc attach_to_shared_group_func_) : id(id_) , stream_shard(std::move(stream_shard_)) + , attach_to_shared_group_func(attach_to_shared_group_func_) , poller(std::make_unique(1)) , last_metrics_log_time(MonotonicMilliseconds::now()) , log(log_) { - auto consumer = klog::KafkaWALPool::instance(global_context).getOrCreateStreaming(stream_shard->logStoreClusterId()); - reader = std::make_shared( - stream_shard, -1 /*latest*/, SourceColumnsDescription::PhysicalColumnPositions{}, std::move(consumer), log); + const auto & settings = query_context->getSettingsRef(); + if (settings.record_consume_batch_count.value != 0) + record_consume_batch_count = static_cast(settings.record_consume_batch_count.value); - poller->scheduleOrThrowOnError([this] { backgroundPoll(); }); + if (settings.record_consume_timeout_ms.value != 0) + record_consume_timeout_ms = static_cast(settings.record_consume_timeout_ms.value); + + if (stream_shard->isLogStoreKafka()) + { + auto consumer = klog::KafkaWALPool::instance(query_context).getOrCreateStreaming(stream_shard->logStoreClusterId()); + assert(consumer); + kafka_reader = std::make_unique( + stream_shard, nlog::LATEST_SN, SourceColumnsDescription::PhysicalColumnPositions{}, std::move(consumer), log); + } + else + { + auto fetch_buffer_size = query_context->getSettingsRef().fetch_buffer_size; + fetch_buffer_size = std::min(64 * 1024 * 1024, fetch_buffer_size); + nativelog_reader = std::make_unique( + stream_shard, + nlog::LATEST_SN, + record_consume_timeout_ms, + fetch_buffer_size, + /*schema_provider*/ nullptr, + /*schema_version*/ 0, + SourceColumnsDescription::PhysicalColumnPositions{}, + log); + } + + /// So far, the `attach_to_shared_group_func` is only set in an independent multiplexer, which requires lazy start up + bool need_lazy_startup = static_cast(attach_to_shared_group_func); + if (!need_lazy_startup) + startup(); } StreamingStoreSourceMultiplexer::~StreamingStoreSourceMultiplexer() @@ -45,13 +80,38 @@ StreamingStoreSourceMultiplexer::~StreamingStoreSourceMultiplexer() metrics.total_time / (metrics.total_count == 0 ? 1 : metrics.total_count)); } +void StreamingStoreSourceMultiplexer::startup() +{ + if (started.test_and_set()) + return; + + poller->scheduleOrThrowOnError([this] { backgroundPoll(); }); +} + +void StreamingStoreSourceMultiplexer::resetSequenceNumber(Int64 start_sn) +{ + assert(!started.test()); + if (nativelog_reader) + nativelog_reader->resetSequenceNumber(start_sn); + else + kafka_reader->resetOffset(start_sn); +} + +nlog::RecordPtrs StreamingStoreSourceMultiplexer::read() +{ + if (nativelog_reader) + return nativelog_reader->read(); + else + return kafka_reader->read(record_consume_batch_count, record_consume_timeout_ms); +} + void StreamingStoreSourceMultiplexer::backgroundPoll() { while (!shutdown) { try { - auto records = reader->read(1000, 100); + auto records = read(); auto start = MonotonicNanoseconds::now(); if (!records.empty()) @@ -61,8 +121,17 @@ void StreamingStoreSourceMultiplexer::backgroundPoll() metrics.total_time = MonotonicNanoseconds::now() - start; ++metrics.total_count; } + else if (attach_to_shared_group_func) + { + /// Assume that the latest record has been read, we can try attach to shared group, + /// After attached, means its all channels will consume from shared group + attach_to_shared_group_func(shared_from_this()); + attach_to_shared_group_func = {}; + + LOG_INFO(log, "StreamingStoreSourceMultiplexer id={} for shard {} attached to shared group.", id, stream_shard->getShard()); + } - if (start - last_metrics_log_time >= 5000000000) + if (start - last_metrics_log_time >= 30000000000) { LOG_INFO( log, @@ -126,6 +195,11 @@ void StreamingStoreSourceMultiplexer::fanOut(nlog::RecordPtrs records) if (channel) fanout_channels.push_back(std::move(channel)); } + + /// Update fanout max sn + auto iter = std::find_if(records.rbegin(), records.rend(), [](const auto & record) { return !record->empty(); }); + if (iter != records.rend()) + fanout_sn = (*iter)->getSN(); } for (auto & channel : fanout_channels) @@ -147,15 +221,41 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexer::createChannel( return channel; } +bool StreamingStoreSourceMultiplexer::tryDetachChannelsInto(std::shared_ptr new_multiplexer) +{ + /// NOTE: `fanout_sn` is only updated during locking channels_mutex in `fanOut()` + { + std::lock_guard lock1{channels_mutex}; + { + std::lock_guard lock2{new_multiplexer->channels_mutex}; + if (fanout_sn < new_multiplexer->fanout_sn) + return false; + + for (auto & shard_channel : channels) + { + auto channel = shard_channel.second.lock(); + if (channel) + { + channel->attachTo(new_multiplexer); + [[maybe_unused]] auto [_, inserted] = new_multiplexer->channels.emplace(shard_channel.first, std::move(channel)); + assert(inserted); + } + } + } + channels.clear(); + } + doShutdown(); + return true; +} + void StreamingStoreSourceMultiplexer::removeChannel(UInt32 channel_id) { bool need_shutdown = false; LOG_INFO(log, "Removing streaming store channel id={}", channel_id); { std::lock_guard lock{channels_mutex}; - auto erased = channels.erase(channel_id); + [[maybe_unused]] auto erased = channels.erase(channel_id); assert(erased == 1); - (void)erased; if (channels.empty()) need_shutdown = true; @@ -180,23 +280,40 @@ std::pair StreamingStoreSourceMultiplexer::getStreamShard() const return stream_shard->getStreamShard(); } -StreamingStoreSourceMultiplexers::StreamingStoreSourceMultiplexers( - std::shared_ptr stream_shard_, ContextPtr global_context_, Poco::Logger * log_) - : stream_shard(std::move(stream_shard_)), global_context(std::move(global_context_)), log(log_) +std::atomic StreamingStoreSourceMultiplexers::multiplexer_id = 0; + +StreamingStoreSourceMultiplexers::StreamingStoreSourceMultiplexers(ContextPtr global_context_, Poco::Logger * log_) + : global_context(std::move(global_context_)), log(log_) { } StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel( - Int32 shard, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context) + std::shared_ptr stream_shard, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ContextPtr query_context, + Int64 start_sn) { + /// In following scenarios, we need independent channel to read some past data + /// 1) Recover from checkpointed queries + /// 2) Queries which seek to a specific timestamp or earliest + if (query_context->getSettingsRef().exec_mode == ExecuteMode::RECOVER) + return createIndependentChannelForRecover(stream_shard, column_names, storage_snapshot, query_context); + else if (start_sn != nlog::LATEST_SN) + return createIndependentChannelWithSeekTo(stream_shard, column_names, storage_snapshot, query_context, start_sn); + + assert(start_sn == nlog::LATEST_SN); + std::lock_guard lock{multiplexers_mutex}; + auto shard = stream_shard->getShard(); auto iter = multiplexers.find(shard); if (iter == multiplexers.end()) { multiplexers.emplace( shard, - StreamingStoreSourceMultiplexerPtrs{std::make_shared(0, stream_shard, global_context, log)}); + StreamingStoreSourceMultiplexerPtrs{ + std::make_shared(getMultiplexerID(), stream_shard, global_context, log)}); iter = multiplexers.find(shard); } @@ -225,11 +342,10 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel( if (best_multiplexer) { /// Found one - /// If min channels is greater than > 20, create another multiplexer for this shard - /// FIXME, make this configurable + /// If min channels is greater than > 20(default value), create another multiplexer for this shard if (min_channels > global_context->getSettingsRef().max_channels_per_resource_group.value) { - best_multiplexer = std::make_shared(iter->second.size(), stream_shard, global_context, log); + best_multiplexer = std::make_shared(getMultiplexerID(), stream_shard, global_context, log); iter->second.push_back(best_multiplexer); } @@ -238,9 +354,74 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel( else { /// All multiplexers are shutdown - auto multiplexer{std::make_shared(iter->second.size(), stream_shard, global_context, log)}; + auto multiplexer{std::make_shared(getMultiplexerID(), stream_shard, global_context, log)}; iter->second.push_back(multiplexer); return multiplexer->createChannel(column_names, storage_snapshot, query_context); } } + +StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndependentChannelForRecover( + std::shared_ptr stream_shard, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ContextPtr query_context) +{ + /// will startup after `StreamingStoreSourceChannel::recover()` and reset recovered sn + /// The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one + auto multiplexer = std::make_shared( + getMultiplexerID(), std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); }); + return multiplexer->createChannel(column_names, storage_snapshot, query_context); +} + +StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndependentChannelWithSeekTo( + std::shared_ptr stream_shard, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ContextPtr query_context, + Int64 start_sn) +{ + /// The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one + auto multiplexer = std::make_shared( + getMultiplexerID(), std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); }); + auto channel = multiplexer->createChannel(column_names, storage_snapshot, query_context); + multiplexer->resetSequenceNumber(start_sn); + multiplexer->startup(); + return channel; +} + +void StreamingStoreSourceMultiplexers::attachToSharedGroup(StreamingStoreSourceMultiplexerPtr multiplexer) +{ + std::lock_guard lock{multiplexers_mutex}; + + /// First release the detached multiplexers + detached_multiplexers.clear(); + + auto & multiplexer_list = multiplexers[multiplexer->stream_shard->getShard()]; + for (auto it = multiplexer_list.begin(); it != multiplexer_list.end();) + { + if ((*it)->isShutdown()) + { + it = multiplexer_list.erase(it); + continue; + } + + auto & shared_multiplexer = *it; + ++it; + + /// Skip multiplexer that already have too many channels + if (shared_multiplexer->totalChannels() > global_context->getSettingsRef().max_channels_per_resource_group.value) + continue; + + if (multiplexer->tryDetachChannelsInto(shared_multiplexer)) + { + /// keep the detached multiplexer for a while since we cannot release itself in his own background polling thread, + /// we will release it on next call (another background polling thread), + detached_multiplexers.emplace_back(std::move(multiplexer)); + return; + } + } + + /// Not detach channels into any existed shared multiplexer, so we reuse it and join in shared groups + multiplexer_list.emplace_back(std::move(multiplexer)); +} } diff --git a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h index aaa8c26785b..21b41f7c34d 100644 --- a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h +++ b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h @@ -1,13 +1,14 @@ #pragma once -#include "StreamingBlockReaderKafka.h" -#include "StreamingStoreSourceChannel.h" - -#include +#include +#include +#include namespace DB { class StreamShard; +struct StorageSnapshot; +using StorageSnapshotPtr = std::shared_ptr; /// The multiplexer fans out one streaming store reader to different streaming queries. This has /// efficiency of disk read / TFF deserialization, memory allocation etc. But we may introduce @@ -23,13 +24,20 @@ class StreamShard; class StreamingStoreSourceMultiplexer final : public std::enable_shared_from_this { public: + using AttachToSharedGroupFunc = std::function)>; StreamingStoreSourceMultiplexer( - UInt32 id_, std::shared_ptr storage_, ContextPtr global_context, Poco::Logger * log_); + UInt32 id_, + std::shared_ptr storage_, + ContextPtr global_context, + Poco::Logger * log_, + AttachToSharedGroupFunc attach_to_shared_group_func = {}); ~StreamingStoreSourceMultiplexer(); StreamingStoreSourceChannelPtr createChannel(const Names & column_names, const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context); + bool tryDetachChannelsInto(std::shared_ptr new_multiplexer); + void removeChannel(UInt32 channel_id); size_t totalChannels() const; @@ -38,15 +46,31 @@ class StreamingStoreSourceMultiplexer final : public std::enable_shared_from_thi std::pair getStreamShard() const; + /// NOTE: Reset sequence number only before startup() + void resetSequenceNumber(Int64 start_sn); + void startup(); + private: + inline nlog::RecordPtrs read(); void backgroundPoll(); void fanOut(nlog::RecordPtrs records); void doShutdown(); + friend class StreamingStoreSourceMultiplexers; + private: UInt32 id; std::shared_ptr stream_shard; - std::shared_ptr reader; + std::unique_ptr kafka_reader; + std::unique_ptr nativelog_reader; + + std::atomic_flag started; + + AttachToSharedGroupFunc attach_to_shared_group_func; + Int64 fanout_sn = -1; + + UInt32 record_consume_batch_count = 1000; + Int32 record_consume_timeout_ms = 100; std::unique_ptr poller; std::atomic shutdown = false; @@ -73,17 +97,41 @@ using StreamingStoreSourceMultiplexerPtrs = std::list stream_shard_, ContextPtr global_context_, Poco::Logger * log_); + StreamingStoreSourceMultiplexers(ContextPtr global_context_, Poco::Logger * log_); - StreamingStoreSourceChannelPtr - createChannel(Int32 shard, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context); + StreamingStoreSourceChannelPtr createChannel( + std::shared_ptr stream_shard, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ContextPtr query_context, + Int64 start_sn); + +private: + StreamingStoreSourceChannelPtr createIndependentChannelForRecover( + std::shared_ptr stream_shard, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ContextPtr query_context); + + StreamingStoreSourceChannelPtr createIndependentChannelWithSeekTo( + std::shared_ptr stream_shard, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + ContextPtr query_context, + Int64 start_sn); + + void attachToSharedGroup(StreamingStoreSourceMultiplexerPtr multiplexer); + + uint32_t getMultiplexerID() { return multiplexer_id.fetch_add(1); } private: - std::shared_ptr stream_shard; ContextPtr global_context; Poco::Logger * log; + static std::atomic multiplexer_id; + std::mutex multiplexers_mutex; std::unordered_map multiplexers; + StreamingStoreSourceMultiplexerPtrs detached_multiplexers; }; } diff --git a/src/Storages/Streaming/tests/gtest_source_columns_description.cpp b/src/Storages/Streaming/tests/gtest_source_columns_description.cpp index d4d2d99d3c4..c833c65d19f 100644 --- a/src/Storages/Streaming/tests/gtest_source_columns_description.cpp +++ b/src/Storages/Streaming/tests/gtest_source_columns_description.cpp @@ -666,3 +666,149 @@ TEST(SourceColumnsDescription, PhysicalAndVirtualAndSubcolumn) EXPECT_EQ(columns_desc.physical_object_columns_to_read.rbegin()->name, "col4"); } } + +TEST(SourceColumnsDescription, PhysicalAndVirtualAndSubcolumnWithoutPartialRead) +{ + auto schema = generateCommonSchema(); + + { /// physical + virtual + subcolumn + DB::NamesAndTypesList columns_to_read{ + {"col1", getType("string")}, + {DB::ProtonConsts::RESERVED_APPEND_TIME, getType("int64")}, + {"col3", "y", getType("tuple(x int, y string)"), getType("string")}, + {"col5", "abc", getType("tuple(abc int, xyz string)"), getType("int")}}; + DB::SourceColumnsDescription columns_desc(columns_to_read, schema, all_extended_columns, /*enable_partial_read*/false); + /// Pos to read + ASSERT_EQ(columns_desc.positions.size(), 4); + ASSERT_EQ(columns_desc.positions[0].type(), Physical); + EXPECT_EQ(columns_desc.positions[0].physicalPosition(), 1); + ASSERT_EQ(columns_desc.positions[1].type(), Virtual); + EXPECT_EQ(columns_desc.positions[1].virtualPosition(), 0); + ASSERT_EQ(columns_desc.positions[2].type(), Sub); + EXPECT_EQ(columns_desc.positions[2].parentPosition(), 3); + EXPECT_EQ(columns_desc.positions[2].subPosition(), 0); + ASSERT_EQ(columns_desc.positions[3].type(), Sub); + EXPECT_EQ(columns_desc.positions[3].parentPosition(), 5); + EXPECT_EQ(columns_desc.positions[3].subPosition(), 1); + + /// Physical columns description + ASSERT_EQ(columns_desc.physical_column_positions_to_read.positions.size(), 8); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[0], 0); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[1], 1); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[2], 2); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[3], 3); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[4], 4); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[5], 5); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[6], 6); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[7], 7); + ASSERT_EQ(columns_desc.physical_column_positions_to_read.subcolumns.size(), 2); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.subcolumns[3], std::vector({"y"})); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.subcolumns[5], std::vector({"abc"})); + + /// Virtual columns description + ASSERT_EQ(columns_desc.virtual_col_calcs.size(), 1); + ASSERT_EQ(columns_desc.virtual_col_calcs.size(), columns_desc.virtual_col_types.size()); + ASSERT_TRUE(columns_desc.virtual_col_types[0]->equals(*getType("int64"))); + + /// Sub-columns description + ASSERT_EQ(columns_desc.subcolumns_to_read.size(), 2); + EXPECT_TRUE(columns_desc.subcolumns_to_read[0].isSubcolumn()); + EXPECT_EQ(columns_desc.subcolumns_to_read[0].getNameInStorage(), "col3"); + EXPECT_EQ(columns_desc.subcolumns_to_read[0].getSubcolumnName(), "y"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[0].getTypeInStorage()->equals(*getType("tuple(x int, y string)"))); + EXPECT_TRUE(columns_desc.subcolumns_to_read[0].type->equals(*getType("string"))); + EXPECT_EQ(columns_desc.subcolumns_to_read[0].name, "col3.y"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[1].isSubcolumn()); + EXPECT_EQ(columns_desc.subcolumns_to_read[1].getNameInStorage(), "col5"); + EXPECT_EQ(columns_desc.subcolumns_to_read[1].getSubcolumnName(), "abc"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[1].getTypeInStorage()->equals(*getType("json"))); + EXPECT_TRUE(columns_desc.subcolumns_to_read[1].type->equals(*getType("int"))); + EXPECT_EQ(columns_desc.subcolumns_to_read[1].name, "col5.abc"); + + /// Json description + ASSERT_EQ(columns_desc.physical_object_columns_to_read.size(), 1); + EXPECT_EQ(columns_desc.physical_object_columns_to_read.begin()->name, "col5"); + } + + { /// (complex) physical + virtual + subcolumn + DB::NamesAndTypesList columns_to_read{ + {"col3", "y", getType("tuple(x int, y string)"), getType("string")}, + {"col1", getType("string")}, + {"col5", getType("json")}, + {"col5", "xyz", getType("tuple(abc int, xyz string)"), getType("string")}, + {"col4", getType("json")}, + {"col2", getType("tuple(a int, b int)")}, + {DB::ProtonConsts::RESERVED_APPEND_TIME, getType("int64")}, + {"col5", "abc", getType("tuple(abc int, xyz string)"), getType("int")}, + {DB::ProtonConsts::RESERVED_PROCESS_TIME, getType("int64")}}; + DB::SourceColumnsDescription columns_desc(columns_to_read, schema, all_extended_columns, /*enable_partial_read*/ false); + /// Pos to read + ASSERT_EQ(columns_desc.positions.size(), 9); + ASSERT_EQ(columns_desc.positions[0].type(), Sub); + EXPECT_EQ(columns_desc.positions[0].parentPosition(), 3); + EXPECT_EQ(columns_desc.positions[0].subPosition(), 0); + ASSERT_EQ(columns_desc.positions[1].type(), Physical); + EXPECT_EQ(columns_desc.positions[1].physicalPosition(), 1); + ASSERT_EQ(columns_desc.positions[2].type(), Physical); + EXPECT_EQ(columns_desc.positions[2].physicalPosition(), 5); + ASSERT_EQ(columns_desc.positions[3].type(), Sub); + EXPECT_EQ(columns_desc.positions[3].parentPosition(), 5); + EXPECT_EQ(columns_desc.positions[3].subPosition(), 1); + ASSERT_EQ(columns_desc.positions[4].type(), Physical); + EXPECT_EQ(columns_desc.positions[4].physicalPosition(), 4); + ASSERT_EQ(columns_desc.positions[5].type(), Physical); + EXPECT_EQ(columns_desc.positions[5].physicalPosition(), 2); + ASSERT_EQ(columns_desc.positions[6].type(), Virtual); + EXPECT_EQ(columns_desc.positions[6].virtualPosition(), 0); + ASSERT_EQ(columns_desc.positions[7].type(), Sub); + EXPECT_EQ(columns_desc.positions[7].parentPosition(), 5); + EXPECT_EQ(columns_desc.positions[7].subPosition(), 2); + ASSERT_EQ(columns_desc.positions[8].type(), Virtual); + EXPECT_EQ(columns_desc.positions[8].virtualPosition(), 1); + + /// Physical columns description + ASSERT_EQ(columns_desc.physical_column_positions_to_read.positions.size(), 8); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[0], 0); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[1], 1); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[2], 2); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[3], 3); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[4], 4); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[5], 5); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[6], 6); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[7], 7); + ASSERT_EQ(columns_desc.physical_column_positions_to_read.subcolumns.size(), 1); + EXPECT_EQ(columns_desc.physical_column_positions_to_read.subcolumns[3], std::vector({"y"})); + + /// Virtual columns description + ASSERT_EQ(columns_desc.virtual_col_calcs.size(), 2); + ASSERT_EQ(columns_desc.virtual_col_calcs.size(), columns_desc.virtual_col_types.size()); + ASSERT_TRUE(columns_desc.virtual_col_types[0]->equals(*getType("int64"))); + ASSERT_TRUE(columns_desc.virtual_col_types[1]->equals(*getType("int64"))); + + /// Sub-columns description + ASSERT_EQ(columns_desc.subcolumns_to_read.size(), 3); + EXPECT_TRUE(columns_desc.subcolumns_to_read[0].isSubcolumn()); + EXPECT_EQ(columns_desc.subcolumns_to_read[0].getNameInStorage(), "col3"); + EXPECT_EQ(columns_desc.subcolumns_to_read[0].getSubcolumnName(), "y"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[0].getTypeInStorage()->equals(*getType("tuple(x int, y string)"))); + EXPECT_TRUE(columns_desc.subcolumns_to_read[0].type->equals(*getType("string"))); + EXPECT_EQ(columns_desc.subcolumns_to_read[0].name, "col3.y"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[1].isSubcolumn()); + EXPECT_EQ(columns_desc.subcolumns_to_read[1].getNameInStorage(), "col5"); + EXPECT_EQ(columns_desc.subcolumns_to_read[1].getSubcolumnName(), "xyz"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[1].getTypeInStorage()->equals(*getType("json"))); + EXPECT_TRUE(columns_desc.subcolumns_to_read[1].type->equals(*getType("string"))); + EXPECT_EQ(columns_desc.subcolumns_to_read[1].name, "col5.xyz"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[2].isSubcolumn()); + EXPECT_EQ(columns_desc.subcolumns_to_read[2].getNameInStorage(), "col5"); + EXPECT_EQ(columns_desc.subcolumns_to_read[2].getSubcolumnName(), "abc"); + EXPECT_TRUE(columns_desc.subcolumns_to_read[2].getTypeInStorage()->equals(*getType("json"))); + EXPECT_TRUE(columns_desc.subcolumns_to_read[2].type->equals(*getType("int"))); + EXPECT_EQ(columns_desc.subcolumns_to_read[2].name, "col5.abc"); + + /// Json description + ASSERT_EQ(columns_desc.physical_object_columns_to_read.size(), 2); + EXPECT_EQ(columns_desc.physical_object_columns_to_read.begin()->name, "col5"); + EXPECT_EQ(columns_desc.physical_object_columns_to_read.rbegin()->name, "col4"); + } +} diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml index e4028e9cb9e..1e21757902a 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml @@ -46,12 +46,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -118,12 +116,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -190,17 +186,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -255,17 +248,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -289,7 +279,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4 - client: python @@ -332,7 +321,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -356,17 +344,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -421,17 +406,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -474,12 +456,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -527,12 +507,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -575,12 +553,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -628,12 +604,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -676,7 +650,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -722,7 +695,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -765,17 +737,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -830,17 +799,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -924,12 +890,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -987,7 +951,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -1073,12 +1036,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -1136,7 +1097,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: @@ -1223,17 +1183,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_4; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; - client: python @@ -1298,12 +1255,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_4; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_4; expected_results: diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml index 0975aef35e2..8b8b8ee83f9 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml @@ -106,7 +106,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -230,17 +229,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_5; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -305,12 +301,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -336,12 +330,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -404,7 +396,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -428,12 +419,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -496,7 +485,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -527,17 +515,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_5; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -607,12 +592,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -643,7 +626,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -737,17 +719,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_5; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -807,12 +786,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -835,12 +812,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -898,7 +873,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -923,12 +897,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -986,7 +958,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -1011,17 +982,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_5; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -1083,12 +1051,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: @@ -1113,7 +1079,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -1186,17 +1151,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_5; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; - client: python @@ -1261,12 +1223,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_5; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_5; expected_results: diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml index 1f37e04d993..0467e7daf59 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml @@ -47,12 +47,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_6; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_6; - client: python @@ -110,7 +108,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_6; - id: 25 @@ -125,7 +122,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -237,7 +233,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -328,7 +323,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -420,7 +414,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -567,7 +560,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -641,7 +633,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -731,7 +722,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -823,7 +813,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -898,7 +887,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -974,7 +962,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python @@ -1061,7 +1048,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_6; - client: python diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml index 81eddcebc88..2e3a3bc829b 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml @@ -107,7 +107,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_7; - client: python @@ -181,7 +180,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_7; - client: python @@ -332,7 +330,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_substream_7; - client: python @@ -409,7 +406,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_stream; - client: python @@ -495,7 +491,6 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop stream if exists test14_stream; - client: python @@ -601,12 +596,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python @@ -675,7 +668,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; expected_results: @@ -720,12 +712,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python @@ -798,7 +788,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; expected_results: @@ -843,17 +832,14 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_7; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python @@ -929,12 +915,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; expected_results: @@ -979,7 +963,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python @@ -1085,22 +1068,18 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_7; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_7; - client: python query_type: table - wait: 0 query: drop view if exists test14_view_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python @@ -1176,12 +1155,10 @@ tests: - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; expected_results: @@ -1207,12 +1184,10 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python @@ -1256,7 +1231,6 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; expected_results: @@ -1283,17 +1257,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_target_changelog_kv_7; - client: python @@ -1303,7 +1274,6 @@ tests: - client: python query_type: table - wait: 0 query: create stream if not exists test14_target_changelog_kv_7(i float, k1 int, k2 string) primary key k2 settings mode='changelog_kv'; - client: python @@ -1346,12 +1316,10 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_7; - client: python query_type: table - wait: 0 query: drop stream if exists test14_target_changelog_kv_7; expected_results: diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml index 80683c514e8..1d2f03b7050 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml @@ -28,17 +28,14 @@ tests: - statements: - client: python query_type: table - wait: 0 query: drop view if exists test14_view2_8; - client: python query_type: table - wait: 0 query: drop view if exists test14_view1_8; - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_8; - client: python @@ -321,12 +318,10 @@ tests: - client: python query_type: table - wait: 0 query: drop stream if exists test14_subquery_8; - client: python query_type: table - wait: 0 query: drop stream if exists test14_target_changelog_kv_8; expected_results: @@ -360,8 +355,8 @@ tests: type: javascript name: test_14_8_add_five arguments: - - name: value - - type: float32 + - name: value + type: float32 return_type: float32 source: | function test_14_8_add_five(value) { @@ -524,7 +519,6 @@ tests: - statements: - client: python query_type: table - wait: 2 query: drop stream if exists test14_substream_8; - client: python @@ -535,6 +529,7 @@ tests: - client: python query_id: '1453' depends_on_stream: test14_substream_8 + wait: 1 query_type: stream query: select max(val), min(val), avg(val), id, name from test14_substream_8 partition by id group by name emit periodic 1s; @@ -618,8 +613,8 @@ tests: type: javascript name: test_14_8_add_five arguments: - - name: value - - type: float32 + - name: value + type: float32 return_type: float32 source: | function test_14_8_add_five(value) { diff --git a/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json b/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json index 2e905aa075f..bf73291f91c 100644 --- a/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json +++ b/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json @@ -47,10 +47,10 @@ "steps":[ { "statements": [ - {"client":"python","query_id":"1601", "depends_on_stream":"changelog_kv_stream_aggr","query_end_timer":6, "query_type": "stream", "query":"select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_kv_stream_aggr emit periodic 1s"}, + {"client":"python","query_id":"1601", "depends_on_stream":"changelog_kv_stream_aggr", "query_type": "stream", "query":"select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_kv_stream_aggr emit periodic 1s"}, {"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "query": "insert into changelog_kv_stream_aggr(i, s) values (1, 's1'), (2, 's2'), (3, 's3')"}, {"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "wait":1, "query": "insert into changelog_kv_stream_aggr(i, s, _tp_delta) values (1, 's1', -1)"}, - {"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "wait":1, "query": "insert into changelog_kv_stream_aggr(i, s, _tp_delta) values (3, 's3', -1)"} + {"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "wait":1, "kill":1601, "kill_wait":3, "query": "insert into changelog_kv_stream_aggr(i, s, _tp_delta) values (3, 's3', -1)"} ] } ], diff --git a/tests/stream/test_stream_smoke/0099_fixed_issues.json b/tests/stream/test_stream_smoke/0099_fixed_issues.json index 0d637eeb66c..6525cae41f8 100644 --- a/tests/stream/test_stream_smoke/0099_fixed_issues.json +++ b/tests/stream/test_stream_smoke/0099_fixed_issues.json @@ -15,14 +15,14 @@ "steps":[ { "statements": [ - {"client":"python", "query_type": "table", "wait":2, "query":"drop view if exists mv_2"}, - {"client":"python", "query_type": "table", "wait":2, "query":"drop view if exists mv_1"}, + {"client":"python", "query_type": "table", "query":"drop view if exists mv_2"}, + {"client":"python", "query_type": "table", "query":"drop view if exists mv_1"}, {"client":"python", "query_type": "table", "wait":2, "query":"drop view if exists mv_truck_track"}, {"client":"python", "query_type": "table", "wait":2, "query":"drop stream if exists ttp_truck_track"}, {"client":"python", "query_type": "table", "wait":2, "query_id":"9900", "query": "create stream ttp_truck_track(`lpn` string, `vno` string, `drc` string, `drcCode` int32, `wgs84Lat` float32, `wgs84Lon` float32, `gcj02Lat` float32, `gcj02Lon` float32, `province` nullable(string), `city` nullable(string), `country` nullable(string), `spd` float32, `mil` float32, `time` string, `adr` string)"}, - {"client":"python", "query_type": "table", "wait":5, "query_id":"9901","depends_on_stream":"ttp_truck_track", "query": "create materialized view mv_truck_track as (select * from ttp_truck_track where date_diff('second', _tp_time, now()) < 30)"}, - {"client":"python", "query_type": "table", "wait":5, "query_id":"9902", "depends_on_stream":"mv_truck_track", "query": "create materialized view if not exists mv_1 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"}, - {"client":"python", "query_type": "table", "wait":5, "query_id":"9903", "depends_on_stream":"mv_truck_track","drop_view":"mv_2,mv_1,mv_truck_track", "drop_view_wait":2, "query": "create materialized view if not exists mv_2 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"} + {"client":"python", "query_type": "table", "wait":2, "query_id":"9901","depends_on_stream":"ttp_truck_track", "query": "create materialized view mv_truck_track as (select * from ttp_truck_track where date_diff('second', _tp_time, now()) < 30)"}, + {"client":"python", "query_type": "table", "wait":2, "query_id":"9902", "depends_on_stream":"mv_truck_track", "query": "create materialized view if not exists mv_1 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"}, + {"client":"python", "query_type": "table", "wait":2, "query_id":"9903", "depends_on_stream":"mv_truck_track","drop_view":"mv_2,mv_1,mv_truck_track", "drop_view_wait":2, "query": "create materialized view if not exists mv_2 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"} ] } ], @@ -142,9 +142,9 @@ "steps":[ { "statements": [ - {"client":"python", "query_type": "table", "wait":2, "query": "drop view if exists test_mv_1934"}, + {"client":"python", "query_type": "table", "query": "drop view if exists test_mv_1934"}, {"client":"python", "query_type": "table", "wait":2, "query": "drop stream if exists test_stream_1934"}, - {"client":"python", "query_type": "table", "wait":2, "query": "create stream test_stream_1934(value int)"}, + {"client":"python", "query_type": "table", "exist":"test_stream_1934", "exist_wait":2, "wait":1, "query": "create stream test_stream_1934(value int)"}, {"client":"python", "query_type": "table", "depends_on_stream":"test_stream_1934", "query": "create materialized view test_mv_1934 as select count() from test_stream_1934"}, {"client":"python", "query_type": "table", "depends_on_stream":"test_mv_1934", "wait":2, "query": "insert into test_stream_1934(value) values(1)"}, {"client":"python", "query_type": "table", "wait":3, "query": "insert into test_stream_1934(value) values(2)"},