From a1432ffd924530d657971caee89c73711320738c Mon Sep 17 00:00:00 2001 From: yoko Date: Sun, 21 Jan 2024 22:41:31 -0800 Subject: [PATCH 01/11] Add query level settings: parts_to_delay_insert and parts_to_throw_insert porting https://github.com/ClickHouse/ClickHouse/pull/36371 on Apr 28, 2022 --- src/Core/Settings.h | 2 ++ src/Storages/MergeTree/MergeTreeData.cpp | 14 +++++++++----- src/Storages/MergeTree/MergeTreeData.h | 2 +- src/Storages/MergeTree/MergeTreeSink.cpp | 2 +- .../02280_add_query_level_settings.reference | 0 .../02280_add_query_level_settings.sql | 0 6 files changed, 13 insertions(+), 7 deletions(-) rename tests/{queries_not_supported/0_stateless/parts_to_throw_insert => queries_ported/0_stateless}/02280_add_query_level_settings.reference (100%) rename tests/{queries_not_supported/0_stateless/parts_to_throw_insert => queries_ported/0_stateless}/02280_add_query_level_settings.sql (100%) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2307b4c21f4..920ed64c1d8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -271,6 +271,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified value (not inclusive). Zero means do not take delay into account.", 0) \ M(Bool, fallback_to_stale_replicas_for_distributed_queries, true, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.", 0) \ M(UInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.", 0) \ + M(UInt64, parts_to_delay_insert, 150, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \ + M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \ \ M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \ M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 61bd95d814f..7d7f3085195 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3492,9 +3492,10 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const { const auto settings = getSettings(); + const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getPartsCount(); if (parts_count_in_total >= settings->max_parts_in_total) { @@ -3517,8 +3518,11 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const } k_inactive = ssize_t(inactive_parts_count_in_partition) - ssize_t(settings->inactive_parts_to_delay_insert); } + + auto parts_to_delay_insert = query_settings.parts_to_delay_insert.changed ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; + auto parts_to_throw_insert = query_settings.parts_to_throw_insert.changed ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; - if (parts_count_in_partition >= settings->parts_to_throw_insert) + if (parts_count_in_partition >= parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( @@ -3527,15 +3531,15 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const parts_count_in_partition); } - if (k_inactive < 0 && parts_count_in_partition < settings->parts_to_delay_insert) + if (k_inactive < 0 && parts_count_in_partition < parts_to_delay_insert) return; - const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(settings->parts_to_delay_insert); + const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); size_t max_k; size_t k; if (k_active > k_inactive) { - max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert; + max_k = parts_to_throw_insert - parts_to_delay_insert; k = k_active + 1; } else diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 69152c3ff1a..1ec92d3ec08 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -528,7 +528,7 @@ class MergeTreeData : public IStorage, public WithMutableContext /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 9ee287ea268..e81dfe57f6c 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -51,7 +51,7 @@ void MergeTreeSink::onStart() { /// Only check "too many parts" before write, /// because interrupting long-running INSERT query in the middle is not convenient for users. - storage.delayInsertOrThrowIfNeeded(); + storage.delayInsertOrThrowIfNeeded(nullptr, context); } void MergeTreeSink::onFinish() diff --git a/tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.reference b/tests/queries_ported/0_stateless/02280_add_query_level_settings.reference similarity index 100% rename from tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.reference rename to tests/queries_ported/0_stateless/02280_add_query_level_settings.reference diff --git a/tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.sql b/tests/queries_ported/0_stateless/02280_add_query_level_settings.sql similarity index 100% rename from tests/queries_not_supported/0_stateless/parts_to_throw_insert/02280_add_query_level_settings.sql rename to tests/queries_ported/0_stateless/02280_add_query_level_settings.sql From dbf179cbb4daa0e0c06b3c7bab5a335f96603809 Mon Sep 17 00:00:00 2001 From: yoko Date: Sun, 21 Jan 2024 23:08:05 -0800 Subject: [PATCH 02/11] More sane behavior of part number thresholds override in query level settings porting clickhouse/clickhouse#42001 on Oct 3, 2022 --- src/Core/Settings.h | 5 ++--- src/Storages/MergeTree/MergeTreeData.cpp | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 920ed64c1d8..e0f9902379c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -271,9 +271,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified value (not inclusive). Zero means do not take delay into account.", 0) \ M(Bool, fallback_to_stale_replicas_for_distributed_queries, true, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.", 0) \ M(UInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.", 0) \ - M(UInt64, parts_to_delay_insert, 150, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \ - M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \ - \ + M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \ + M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \ M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \ M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \ M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 7d7f3085195..93396abb7e3 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3519,8 +3519,8 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q k_inactive = ssize_t(inactive_parts_count_in_partition) - ssize_t(settings->inactive_parts_to_delay_insert); } - auto parts_to_delay_insert = query_settings.parts_to_delay_insert.changed ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; - auto parts_to_throw_insert = query_settings.parts_to_throw_insert.changed ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; + auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; if (parts_count_in_partition >= parts_to_throw_insert) { From a0e88c1a110f0e685612a778a1ef9e2d37ae301d Mon Sep 17 00:00:00 2001 From: yoko Date: Sun, 21 Jan 2024 23:45:42 -0800 Subject: [PATCH 03/11] Relax "too many parts" threshold porting clickhouse/clickhouse#42002 on Oct 9, 2022 --- src/Interpreters/AsynchronousMetrics.cpp | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 53 ++++++++++++------- src/Storages/MergeTree/MergeTreeData.h | 8 ++- src/Storages/MergeTree/MergeTreeSettings.h | 1 + .../02458_relax_too_many_parts.reference | 0 .../02458_relax_too_many_parts.sql | 14 ++--- 6 files changed, 48 insertions(+), 30 deletions(-) rename tests/{queries_not_supported/0_stateless/tuple => queries_ported/0_stateless}/02458_relax_too_many_parts.reference (100%) rename tests/{queries_not_supported/0_stateless/tuple => queries_ported/0_stateless}/02458_relax_too_many_parts.sql (73%) diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index ccc7fc92305..298225c4f00 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -1429,7 +1429,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti const auto & settings = getContext()->getSettingsRef(); - calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition()); + calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountAndSizeForPartition().first); total_number_of_bytes += table_merge_tree->totalBytes(settings).value(); total_number_of_rows += table_merge_tree->totalRows(settings).value(); total_number_of_parts += table_merge_tree->getPartsCount(); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 93396abb7e3..01ce790e98c 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3438,42 +3438,49 @@ size_t MergeTreeData::getPartsCount() const } -size_t MergeTreeData::getMaxPartsCountForPartitionWithState(DataPartState state) const +std::pair MergeTreeData::getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const { auto lock = lockParts(); - size_t res = 0; - size_t cur_count = 0; + size_t cur_parts_count = 0; + size_t cur_parts_size = 0; + size_t max_parts_count = 0; + size_t argmax_parts_size = 0; + const String * cur_partition_id = nullptr; for (const auto & part : getDataPartsStateRange(state)) { - if (cur_partition_id && part->info.partition_id == *cur_partition_id) - { - ++cur_count; - } - else + if (!cur_partition_id || part->info.partition_id != *cur_partition_id) { cur_partition_id = &part->info.partition_id; - cur_count = 1; + cur_parts_count = 0; + cur_parts_size = 0; } - res = std::max(res, cur_count); + ++cur_parts_count; + cur_parts_size += part->getBytesOnDisk(); + + if (cur_parts_count > max_parts_count) + { + max_parts_count = cur_parts_count; + argmax_parts_size = cur_parts_size; + } } - return res; + return {max_parts_count, argmax_parts_size}; } -size_t MergeTreeData::getMaxPartsCountForPartition() const +std::pair MergeTreeData::getMaxPartsCountAndSizeForPartition() const { - return getMaxPartsCountForPartitionWithState(DataPartState::Active); + return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Active); } size_t MergeTreeData::getMaxInactivePartsCountForPartition() const { - return getMaxPartsCountForPartitionWithState(DataPartState::Outdated); + return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first; } @@ -3503,7 +3510,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-stream setting.", ErrorCodes::TOO_MANY_PARTS); } - size_t parts_count_in_partition = getMaxPartsCountForPartition(); + auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); ssize_t k_inactive = -1; if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) { @@ -3522,13 +3529,17 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; - if (parts_count_in_partition >= parts_to_throw_insert) + size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0; + bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts + && average_part_size > settings->max_avg_part_size_for_too_many_parts; + + if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( ErrorCodes::TOO_MANY_PARTS, - "Too many parts ({}). Merges are processing significantly slower than inserts", - parts_count_in_partition); + "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", + parts_count_in_partition, ReadableSize(average_part_size)); } if (k_inactive < 0 && parts_count_in_partition < parts_to_delay_insert) @@ -3537,7 +3548,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); size_t max_k; size_t k; - if (k_active > k_inactive) + if (k_active > k_inactive && !parts_are_large_enough_in_average) { max_k = parts_to_throw_insert - parts_to_delay_insert; k = k_active + 1; @@ -3554,7 +3565,8 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts); - LOG_INFO(log, "Delaying inserting block by {} ms. because there are {} parts", delay_milliseconds, parts_count_in_partition); + LOG_INFO(log, "Delaying inserting block by {} ms. because there are {} parts and their average size is {}", + delay_milliseconds, parts_count_in_partition, ReadableSize(average_part_size)); if (until) until->tryWait(delay_milliseconds); @@ -3562,6 +3574,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q std::this_thread::sleep_for(std::chrono::milliseconds(static_cast(delay_milliseconds))); } + MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 1ec92d3ec08..837c63623d2 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -506,8 +506,12 @@ class MergeTreeData : public IStorage, public WithMutableContext size_t getTotalActiveSizeInRows() const; size_t getPartsCount() const; - size_t getMaxPartsCountForPartitionWithState(DataPartState state) const; - size_t getMaxPartsCountForPartition() const; + + /// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition. + /// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them) + std::pair getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const; + std::pair getMaxPartsCountAndSizeForPartition() const; + size_t getMaxInactivePartsCountForPartition() const; /// Get min value of part->info.getDataVersion() for all active parts. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 684a7ceb9ec..e4702fe2890 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -70,6 +70,7 @@ struct Settings; M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \ M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \ + M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ \ diff --git a/tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.reference b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.reference similarity index 100% rename from tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.reference rename to tests/queries_ported/0_stateless/02458_relax_too_many_parts.reference diff --git a/tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.sql b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql similarity index 73% rename from tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.sql rename to tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql index 5ff4851f155..e8aefb4c939 100644 --- a/tests/queries_not_supported/0_stateless/tuple/02458_relax_too_many_parts.sql +++ b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql @@ -1,5 +1,5 @@ DROP STREAM IF EXISTS test; -CREATE STREAM test (x uint64, s string) ENGINE = MergeTree ORDER BY tuple() SETTINGS parts_to_throw_insert = 3; +CREATE STREAM test (x uint64, s string) ENGINE = MergeTree ORDER BY x SETTINGS parts_to_throw_insert = 3; -- The "too many parts" threshold works: SET max_block_size = 1, min_insert_block_size_rows = 1, min_insert_block_size_bytes = 1; @@ -14,7 +14,7 @@ ALTER STREAM test MODIFY SETTING max_avg_part_size_for_too_many_parts = '1M'; -- It works in the same way if parts are small: SYSTEM START MERGES test; -OPTIMIZE STREAM test FINAL; +OPTIMIZE TABLE test FINAL; SYSTEM STOP MERGES test; INSERT INTO test VALUES (5, 'a'); @@ -23,14 +23,14 @@ INSERT INTO test VALUES (7, 'a'); -- { serverError TOO_MANY_PARTS } -- But it allows having more parts if their average size is large: SYSTEM START MERGES test; -OPTIMIZE STREAM test FINAL; +OPTIMIZE TABLE test FINAL; SYSTEM STOP MERGES test; SET max_block_size = 65000, min_insert_block_size_rows = 65000, min_insert_block_size_bytes = '1M'; -INSERT INTO test SELECT number, randomString(1000) FROM numbers(0, 10000); -INSERT INTO test SELECT number, randomString(1000) FROM numbers(10000, 10000); -INSERT INTO test SELECT number, randomString(1000) FROM numbers(20000, 10000); +INSERT INTO test SELECT number, random_string(1000) FROM numbers(0, 10000); +INSERT INTO test SELECT number, random_string(1000) FROM numbers(10000, 10000); +INSERT INTO test SELECT number, random_string(1000) FROM numbers(20000, 10000); -SELECT count(), round(avg(bytes), -6) FROM system.parts WHERE database = currentDatabase() AND stream = 'test' AND active; +SELECT count(), round(avg(bytes), -6) FROM system.parts WHERE database = current_database() AND table = 'test' AND active; DROP STREAM test; From 9098611b6c67b8603f85d80b5b1f9fcd8456caa1 Mon Sep 17 00:00:00 2001 From: yoko Date: Sun, 21 Jan 2024 23:47:45 -0800 Subject: [PATCH 04/11] Fix relaxed "too many parts" threshold porting clickhouse/clickhouse#44021 on Dec 9, 2022 --- src/Storages/MergeTree/MergeTreeData.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 01ce790e98c..5bb57e96190 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3542,13 +3542,13 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q parts_count_in_partition, ReadableSize(average_part_size)); } - if (k_inactive < 0 && parts_count_in_partition < parts_to_delay_insert) + if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average)) return; const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); size_t max_k; size_t k; - if (k_active > k_inactive && !parts_are_large_enough_in_average) + if (k_active > k_inactive) { max_k = parts_to_throw_insert - parts_to_delay_insert; k = k_active + 1; From c50337c122c56a26aff0e14233bedc4006f97317 Mon Sep 17 00:00:00 2001 From: yoko Date: Sun, 21 Jan 2024 23:49:41 -0800 Subject: [PATCH 05/11] Fast fix: force upper bound for time to delay INSERT porting clickhouse/clickhouse#44916 --- src/Storages/MergeTree/MergeTreeData.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5bb57e96190..03083d54821 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3558,7 +3558,11 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr q max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; k = k_inactive + 1; } - const UInt64 delay_milliseconds = static_cast(::pow(settings->max_delay_to_insert * 1000, static_cast(k) / max_k)); + + const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); + /// min() as a save guard here + const UInt64 delay_milliseconds + = std::min(max_delay_milliseconds, static_cast(::pow(max_delay_milliseconds, static_cast(k) / max_k))); ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); From cf47a02f948b04530bbe59097c0531cfdaa94cdc Mon Sep 17 00:00:00 2001 From: yoko Date: Mon, 22 Jan 2024 00:53:21 -0800 Subject: [PATCH 06/11] Fix: insert delay calculation porting ClickHouse/ClickHouse#44954 on Jan 12, 2023 --- src/Storages/MergeTree/MergeTreeData.cpp | 110 +++++++++++------- src/Storages/MergeTree/MergeTreeData.h | 4 +- src/Storages/MergeTree/MergeTreeSettings.h | 3 +- ...rrect_dealy_for_insert_bug_44902.reference | 6 + ...21_incorrect_dealy_for_insert_bug_44902.sh | 24 ++++ 5 files changed, 105 insertions(+), 42 deletions(-) create mode 100644 tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference create mode 100644 tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 03083d54821..76246d1ffaa 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3478,7 +3478,7 @@ std::pair MergeTreeData::getMaxPartsCountAndSizeForPartition() c } -size_t MergeTreeData::getMaxInactivePartsCountForPartition() const +size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const { return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first; } @@ -3499,70 +3499,102 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const { const auto settings = getSettings(); const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getPartsCount(); + + /// check if have too many parts in total if (parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in element in config.xml or with per-stream setting.", ErrorCodes::TOO_MANY_PARTS); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified " + "with 'max_parts_in_total' setting in element in config.xml or with per-table setting.", + toString(parts_count_in_total)); } - auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); - ssize_t k_inactive = -1; - if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + size_t outdated_parts_over_threshold = 0; { - size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition(); - if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) + size_t outdated_parts_count_in_partition = 0; + if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) + outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition(); + + if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( ErrorCodes::TOO_MANY_PARTS, "Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts", - inactive_parts_count_in_partition); + outdated_parts_count_in_partition); } - k_inactive = ssize_t(inactive_parts_count_in_partition) - ssize_t(settings->inactive_parts_to_delay_insert); + if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert) + outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1; } - - auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; - auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition(); size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0; - bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts - && average_part_size > settings->max_avg_part_size_for_too_many_parts; - - if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average) + const auto active_parts_to_delay_insert + = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert; + const auto active_parts_to_throw_insert + = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert; + size_t active_parts_over_threshold = 0; { - ProfileEvents::increment(ProfileEvents::RejectedInserts); - throw Exception( - ErrorCodes::TOO_MANY_PARTS, - "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", - parts_count_in_partition, ReadableSize(average_part_size)); + bool parts_are_large_enough_in_average + = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts; + + if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) + { + ProfileEvents::increment(ProfileEvents::RejectedInserts); + throw Exception( + ErrorCodes::TOO_MANY_PARTS, + "Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts", + parts_count_in_partition, + ReadableSize(average_part_size)); + } + if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert + && !parts_are_large_enough_in_average) + /// if parts_count == parts_to_delay_insert -> we're 1 part over threshold + active_parts_over_threshold = parts_count_in_partition - active_parts_to_delay_insert + 1; } - if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average)) + /// no need for delay + if (!active_parts_over_threshold && !outdated_parts_over_threshold) return; - const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert); - size_t max_k; - size_t k; - if (k_active > k_inactive) - { - max_k = parts_to_throw_insert - parts_to_delay_insert; - k = k_active + 1; - } - else + UInt64 delay_milliseconds = 0; { - max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; - k = k_inactive + 1; - } + size_t parts_over_threshold = 0; + size_t allowed_parts_over_threshold = 1; + const bool use_active_parts_threshold = (active_parts_over_threshold >= outdated_parts_over_threshold); + if (use_active_parts_threshold) + { + parts_over_threshold = active_parts_over_threshold; + allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert; + } + else + { + parts_over_threshold = outdated_parts_over_threshold; + allowed_parts_over_threshold = outdated_parts_over_threshold; /// if throw threshold is not set, will use max delay + if (settings->inactive_parts_to_throw_insert > 0) + allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; + } - const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); - /// min() as a save guard here - const UInt64 delay_milliseconds - = std::min(max_delay_milliseconds, static_cast(::pow(max_delay_milliseconds, static_cast(k) / max_k))); + if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) [[unlikely]] + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Incorrect calculation of {} parts over threshold: allowed_parts_over_threshold={}, parts_over_threshold={}", + (use_active_parts_threshold ? "active" : "inactive"), + allowed_parts_over_threshold, + parts_over_threshold); + + const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); + double delay_factor = static_cast(parts_over_threshold) / allowed_parts_over_threshold; + const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms; + delay_milliseconds = std::max(min_delay_milliseconds, static_cast(max_delay_milliseconds * delay_factor)); + } ProfileEvents::increment(ProfileEvents::DelayedInserts); ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 837c63623d2..b8b3ecf6810 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -512,7 +512,7 @@ class MergeTreeData : public IStorage, public WithMutableContext std::pair getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const; std::pair getMaxPartsCountAndSizeForPartition() const; - size_t getMaxInactivePartsCountForPartition() const; + size_t getMaxOutdatedPartsCountForPartition() const; /// Get min value of part->info.getDataVersion() for all active parts. /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. @@ -532,7 +532,7 @@ class MergeTreeData : public IStorage, public WithMutableContext /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index e4702fe2890..84dd7d5a686 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -66,12 +66,13 @@ struct Settings; M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ \ /** Inserts settings. */ \ - M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \ + M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \ M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \ M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ + M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ \ /** Replication settings. */ \ diff --git a/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference b/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference new file mode 100644 index 00000000000..c104ff58aff --- /dev/null +++ b/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference @@ -0,0 +1,6 @@ +0 +300 +500 +750 +1000 +TOO_MANY_PARTS diff --git a/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh b/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh new file mode 100644 index 00000000000..b1efb033d3b --- /dev/null +++ b/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "DROP STREAM IF EXISTS test_02521_insert_delay" +# Create MergeTree with settings which allow to insert maximum 5 parts, on 6th it'll throw TOO_MANY_PARTS +$CLICKHOUSE_CLIENT -q "CREATE STREAM test_02521_insert_delay (key uint32, value string) Engine=MergeTree() ORDER BY key SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=5, max_delay_to_insert=1, min_delay_to_insert_ms=300" +$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES test_02521_insert_delay" + +# Every delay is increased by max_delay_to_insert*1000/(parts_to_throw_insert - parts_to_delay_insert + 1), here it's 250ms +# 0-indexed INSERT - no delay, 1-indexed INSERT - 300ms instead of 250ms due to min_delay_to_insert_ms +for i in {0..4} +do + query_id="${CLICKHOUSE_DATABASE}_02521_${i}_$RANDOM$RANDOM" + $CLICKHOUSE_CLIENT --query_id="$query_id" -q "INSERT INTO test_02521_insert_delay SELECT number, to_string(number) FROM numbers(${i}, 1)" + $CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS" + $CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "select ProfileEvents['DelayedInsertsMilliseconds'] as delay from system.query_log where event_date >= yesterday() and query_id = {query_id:string} order by delay desc limit 1" +done + +$CLICKHOUSE_CLIENT -q "INSERT INTO test_02521_insert_delay VALUES(0, 'This query throws error')" 2>&1 | grep -o 'TOO_MANY_PARTS' | head -n 1 + +$CLICKHOUSE_CLIENT -q "DROP STREAM test_02521_insert_delay" From b76e29211cae75b60366841648190c924bf5803b Mon Sep 17 00:00:00 2001 From: yoko Date: Mon, 22 Jan 2024 00:57:12 -0800 Subject: [PATCH 07/11] Update INSERT delay doc with example porting clickhouse/clickhouse#45592 on Jan 26, 2023 --- docs/en/operations/settings/merge-tree-settings.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/en/operations/settings/merge-tree-settings.md b/docs/en/operations/settings/merge-tree-settings.md index a7bba76a05a..165af2f82b0 100644 --- a/docs/en/operations/settings/merge-tree-settings.md +++ b/docs/en/operations/settings/merge-tree-settings.md @@ -97,8 +97,16 @@ max_k = parts_to_throw_insert - parts_to_delay_insert k = 1 + parts_count_in_partition - parts_to_delay_insert delay_milliseconds = pow(max_delay_to_insert * 1000, k / max_k) ``` +For example, if a partition has 299 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, `INSERT` is delayed for `pow( 1 * 1000, (1 + 299 - 150) / (300 - 150) ) = 1000` milliseconds. -For example if a partition has 299 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, `INSERT` is delayed for `pow( 1 * 1000, (1 + 299 - 150) / (300 - 150) ) = 1000` milliseconds. +Starting from version 23.1 formula has been changed to: +```code +allowed_parts_over_threshold = parts_to_throw_insert - parts_to_delay_insert +parts_over_threshold = parts_count_in_partition - parts_to_delay_insert + 1 +delay_milliseconds = max(min_delay_to_insert_ms, (max_delay_to_insert * 1000) * parts_over_threshold / allowed_parts_over_threshold) +``` + +For example, if a partition has 224 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, min_delay_to_insert_ms = 10, `INSERT` is delayed for `max( 10, 1 * 1000 * (224 - 150 + 1) / (300 - 150) ) = 500` milliseconds. ## max_parts_in_total {#max-parts-in-total} From 54f8d084f5666c62e2d65a59855c3f41277b9439 Mon Sep 17 00:00:00 2001 From: yoko Date: Mon, 22 Jan 2024 02:21:35 -0800 Subject: [PATCH 08/11] Relax "too many parts" further porting clickhouse/clickhouse#50856 on Jun 18, 2023 --- src/Loggers/OwnPatternFormatter.cpp | 2 -- src/Storages/MergeTree/MergeTreeData.cpp | 31 +++++++++++----------- src/Storages/MergeTree/MergeTreeData.h | 2 +- src/Storages/MergeTree/MergeTreeSettings.h | 6 ++--- src/Storages/MergeTree/MergeTreeSink.cpp | 9 +++++-- src/Storages/MergeTree/MergeTreeSink.h | 3 ++- 6 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/Loggers/OwnPatternFormatter.cpp b/src/Loggers/OwnPatternFormatter.cpp index 02a2c2e510b..0c2256aaa1b 100644 --- a/src/Loggers/OwnPatternFormatter.cpp +++ b/src/Loggers/OwnPatternFormatter.cpp @@ -4,8 +4,6 @@ #include #include #include -#include -#include #include diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 76246d1ffaa..02ebcfc7e72 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3499,14 +3499,14 @@ std::optional MergeTreeData::getMinPartDataVersion() const } -void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const +void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const { const auto settings = getSettings(); const auto & query_settings = query_context->getSettingsRef(); const size_t parts_count_in_total = getPartsCount(); - /// check if have too many parts in total - if (parts_count_in_total >= settings->max_parts_in_total) + /// Check if we have too many parts in total + if (allow_throw && parts_count_in_total >= settings->max_parts_in_total) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( @@ -3522,7 +3522,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0) outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition(); - if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) + if (allow_throw && settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( @@ -3545,7 +3545,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts; - if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) + if (allow_throw && parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average) { ProfileEvents::increment(ProfileEvents::RejectedInserts); throw Exception( @@ -3582,18 +3582,17 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const Contex allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert; } - if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) [[unlikely]] - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Incorrect calculation of {} parts over threshold: allowed_parts_over_threshold={}, parts_over_threshold={}", - (use_active_parts_threshold ? "active" : "inactive"), - allowed_parts_over_threshold, - parts_over_threshold); - const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000); - double delay_factor = static_cast(parts_over_threshold) / allowed_parts_over_threshold; - const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms; - delay_milliseconds = std::max(min_delay_milliseconds, static_cast(max_delay_milliseconds * delay_factor)); + if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) + { + delay_milliseconds = max_delay_milliseconds; + } + else + { + double delay_factor = static_cast(parts_over_threshold) / allowed_parts_over_threshold; + const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms; + delay_milliseconds = std::max(min_delay_milliseconds, static_cast(max_delay_milliseconds * delay_factor)); + } } ProfileEvents::increment(ProfileEvents::DelayedInserts); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index b8b3ecf6810..aea8edfbea5 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -532,7 +532,7 @@ class MergeTreeData : public IStorage, public WithMutableContext /// If the table contains too many active parts, sleep for a while to give them time to merge. /// If until is non-null, wake up from the sleep earlier if the event happened. - void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const; + void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const; /// Renames temporary part to a permanent part and adds it to the parts set. /// It is assumed that the part does not intersect with existing parts. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 84dd7d5a686..b2112c9f120 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -66,11 +66,11 @@ struct Settings; M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ \ /** Inserts settings. */ \ - M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ + M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \ - M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ + M(UInt64, parts_to_throw_insert, 3000, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \ M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \ - M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ + M(UInt64, max_avg_part_size_for_too_many_parts, 1ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \ M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \ M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index e81dfe57f6c..5d424f29856 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -49,9 +49,9 @@ MergeTreeSink::MergeTreeSink( void MergeTreeSink::onStart() { - /// Only check "too many parts" before write, + /// It's only allowed to throw "too many parts" before write, /// because interrupting long-running INSERT query in the middle is not convenient for users. - storage.delayInsertOrThrowIfNeeded(nullptr, context); + storage.delayInsertOrThrowIfNeeded(nullptr, context, true); } void MergeTreeSink::onFinish() @@ -61,6 +61,9 @@ void MergeTreeSink::onFinish() void MergeTreeSink::consume(Chunk chunk) { + if (num_blocks_processed > 0) + storage.delayInsertOrThrowIfNeeded(nullptr, context, false); + auto block = getHeader().cloneWithColumns(chunk.detachColumns()); if (!storage_snapshot->object_columns.get()->empty()) convertDynamicColumnsToTuples(block, storage_snapshot); @@ -149,6 +152,8 @@ void MergeTreeSink::consume(Chunk chunk) finishDelayedChunk(); delayed_chunk = std::make_unique(); delayed_chunk->partitions = std::move(partitions); + + ++num_blocks_processed; } void MergeTreeSink::finishDelayedChunk() diff --git a/src/Storages/MergeTree/MergeTreeSink.h b/src/Storages/MergeTree/MergeTreeSink.h index f0cbcc2e6b6..c4723d38a43 100644 --- a/src/Storages/MergeTree/MergeTreeSink.h +++ b/src/Storages/MergeTree/MergeTreeSink.h @@ -45,7 +45,8 @@ class MergeTreeSink final : public SinkToStorage size_t max_parts_per_block; ContextPtr context; StorageSnapshotPtr storage_snapshot; - uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token + UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token + UInt64 num_blocks_processed = 0; /// We can delay processing for previous chunk and start writing a new one. struct DelayedChunk; From ae17f431f6490fc5686a2c74b3543834610c11f9 Mon Sep 17 00:00:00 2001 From: yoko Date: Mon, 22 Jan 2024 02:25:31 -0800 Subject: [PATCH 09/11] Attempt to fix the relax_too_many_parts test porting clickhouse/clickhouse#51375 on Jun 28, 2023 --- .../0_stateless/02458_relax_too_many_parts.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql index e8aefb4c939..559ae149746 100644 --- a/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql +++ b/tests/queries_ported/0_stateless/02458_relax_too_many_parts.sql @@ -1,5 +1,5 @@ DROP STREAM IF EXISTS test; -CREATE STREAM test (x uint64, s string) ENGINE = MergeTree ORDER BY x SETTINGS parts_to_throw_insert = 3; +CREATE STREAM test (x uint64, s string) ENGINE = MergeTree ORDER BY x SETTINGS parts_to_throw_insert = 3, max_parts_to_merge_at_once = 1; -- The "too many parts" threshold works: SET max_block_size = 1, min_insert_block_size_rows = 1, min_insert_block_size_bytes = 1; @@ -14,7 +14,7 @@ ALTER STREAM test MODIFY SETTING max_avg_part_size_for_too_many_parts = '1M'; -- It works in the same way if parts are small: SYSTEM START MERGES test; -OPTIMIZE TABLE test FINAL; +OPTIMIZE TABLE test FINAL SETTINGS optimize_throw_if_noop=1; SYSTEM STOP MERGES test; INSERT INTO test VALUES (5, 'a'); @@ -23,7 +23,7 @@ INSERT INTO test VALUES (7, 'a'); -- { serverError TOO_MANY_PARTS } -- But it allows having more parts if their average size is large: SYSTEM START MERGES test; -OPTIMIZE TABLE test FINAL; +OPTIMIZE TABLE test FINAL SETTINGS optimize_throw_if_noop=1; SYSTEM STOP MERGES test; SET max_block_size = 65000, min_insert_block_size_rows = 65000, min_insert_block_size_bytes = '1M'; From d0347db985425590eaff8870541222b56896aa76 Mon Sep 17 00:00:00 2001 From: yoko Date: Mon, 22 Jan 2024 02:27:18 -0800 Subject: [PATCH 10/11] Update merge-tree-settings.md porting clickhouse/clickhouse#53527 on Aug 18, 2023 --- docs/en/operations/settings/merge-tree-settings.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/operations/settings/merge-tree-settings.md b/docs/en/operations/settings/merge-tree-settings.md index 165af2f82b0..648403cbeed 100644 --- a/docs/en/operations/settings/merge-tree-settings.md +++ b/docs/en/operations/settings/merge-tree-settings.md @@ -41,11 +41,11 @@ Possible values: - Any positive integer. -Default value: 300. +Default value: 3000. To achieve maximum performance of `SELECT` queries, it is necessary to minimize the number of parts processed, see [Merge Tree](../../development/architecture.md#merge-tree). -You can set a larger value to 600 (1200), this will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300. +Prior to 23.6 this setting was set to 300. You can set a higher different value, it will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300. ## parts_to_delay_insert {#parts-to-delay-insert} From 39525adeda2c621e687d9ab232c7bee4fbe3619b Mon Sep 17 00:00:00 2001 From: yokofly Date: Mon, 22 Jan 2024 16:23:18 -0800 Subject: [PATCH 11/11] fix stateless bash script permission denied: https://github.com/timeplus-io/proton/actions/runs/7613100717/job/20735699428#step:7:1131 --- ...rence => 02521_incorrect_delay_for_insert_bug_44902.reference} | 0 ...bug_44902.sh => 02521_incorrect_delay_for_insert_bug_44902.sh} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/queries_ported/0_stateless/{02521_incorrect_dealy_for_insert_bug_44902.reference => 02521_incorrect_delay_for_insert_bug_44902.reference} (100%) rename tests/queries_ported/0_stateless/{02521_incorrect_dealy_for_insert_bug_44902.sh => 02521_incorrect_delay_for_insert_bug_44902.sh} (100%) mode change 100644 => 100755 diff --git a/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference b/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.reference similarity index 100% rename from tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.reference rename to tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.reference diff --git a/tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh b/tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.sh old mode 100644 new mode 100755 similarity index 100% rename from tests/queries_ported/0_stateless/02521_incorrect_dealy_for_insert_bug_44902.sh rename to tests/queries_ported/0_stateless/02521_incorrect_delay_for_insert_bug_44902.sh