From 38a45e4e46ceb2b2cf04ac9405535f0e181a8f12 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 14 Oct 2025 20:12:33 +0200 Subject: [PATCH 1/6] Join for tables --- src/Planner/PlannerJoinTree.cpp | 2 +- src/Storages/IStorageCluster.cpp | 51 +++++++++++++------ .../extractTableFunctionFromSelectQuery.cpp | 6 +++ .../extractTableFunctionFromSelectQuery.h | 1 + 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index eed732ef6728..568865c881cf 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent) /// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed /// Hopefully there is no other case when we read from Distributed up to FetchColumns. - if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) + if (table_node && table_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); else if (table_function_node && table_function_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 076ec5b5a28b..a507b3d85a19 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -112,7 +113,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) { @@ -126,15 +127,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); - if (node_type == type) + if (types.contains(node_type)) + { passed_node = node; + passed_type = node_type; + } } QueryTreeNodePtr getNode() const { return passed_node; } + std::optional getType() const { return passed_type; } private: - QueryTreeNodeType type; + std::unordered_set types; QueryTreeNodePtr passed_node; + std::optional passed_type; }; /* @@ -219,7 +225,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto modified_query_tree = query_tree->clone(); bool need_modify = false; - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); table_function_searcher.visit(query_tree); auto table_function_node = table_function_searcher.getNode(); if (!table_function_node) @@ -227,17 +233,28 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( if (has_join) { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); + QueryTreeNodePtr query_tree_distributed; + + auto & query_node = modified_query_tree->as(); + + if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + } + else + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } // Find add used columns from table function to make proper projection list CollectUsedColumnsForSourceVisitor collector(table_function_node, context); collector.visit(query_tree); const auto & columns = collector.getColumns(); - auto & query_node = modified_query_tree->as(); query_node.resolveProjectionColumns(columns); auto column_nodes_to_select = std::make_shared(); column_nodes_to_select->getNodes().reserve(columns.size()); @@ -501,25 +518,27 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + SearcherVisitor join_searcher({QueryTreeNodeType::JOIN}, context); join_searcher.visit(query_info.query_tree); if (join_searcher.getNode()) has_join = true; - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); table_function_searcher.visit(query_info.query_tree); auto table_function_node = table_function_searcher.getNode(); if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); - CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); auto & query_node = query_info.query_tree->as(); if (query_node.hasWhere()) + { + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); collector_where.visit(query_node.getWhere()); - // Can't use 'WHERE' on remote node if it contains columns from other sources - if (!collector_where.getColumns().empty()) - has_local_columns_in_where = true; + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + has_local_columns_in_where = true; + } if (has_join || has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 8477798b62b1..064f538eeae7 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) return table_expression ? table_expression->table_function : nullptr; } +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->database_and_table_name : nullptr; +} + ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) { auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 9834f3dc7573..2a845477df82 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -10,6 +10,7 @@ struct ASTTableExpression; ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); From 43825ba9239471fcac2b3123d48db49a162d937e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 15 Oct 2025 17:21:12 +0200 Subject: [PATCH 2/6] Some fixes --- src/Storages/IStorageCluster.cpp | 80 +++++++++------- tests/integration/test_s3_cluster/test.py | 14 +++ .../integration/test_storage_iceberg/test.py | 91 +++++++++++++++++++ 3 files changed, 152 insertions(+), 33 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index a507b3d85a19..85ca80c5d85a 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -222,37 +223,40 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto modified_query_tree = query_tree->clone(); - bool need_modify = false; + if (has_join || has_local_columns_in_where) + { + auto modified_query_tree = query_tree->clone(); - SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); - table_function_searcher.visit(query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + table_function_searcher.visit(modified_query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); - if (has_join) - { QueryTreeNodePtr query_tree_distributed; auto & query_node = modified_query_tree->as(); - if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) - { - auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); - query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); - } - else + if (has_join) { - auto join_node = query_node.getJoinTree(); - query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + } + else + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } } // Find add used columns from table function to make proper projection list + // Need to do before changing WHERE condition CollectUsedColumnsForSourceVisitor collector(table_function_node, context); - collector.visit(query_tree); + collector.visit(modified_query_tree); const auto & columns = collector.getColumns(); query_node.resolveProjectionColumns(columns); @@ -262,20 +266,26 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); query_node.getProjectionNode() = column_nodes_to_select; - // Left only table function to send on cluster nodes - modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + if (has_local_columns_in_where) + { + if (query_node.getPrewhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context); + if (query_node.getWhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context); + } + + query_node.getOrderByNode() = std::make_shared(); + query_node.getGroupByNode() = std::make_shared(); - need_modify = true; - } + if (query_tree_distributed) + { + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + } - if (has_local_columns_in_where) - { - auto & query_node = modified_query_tree->as(); - query_node.getWhere() = {}; + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); } - if (need_modify) - query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); return; } case ObjectStorageClusterJoinMode::GLOBAL: @@ -530,12 +540,16 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); auto & query_node = query_info.query_tree->as(); - if (query_node.hasWhere()) + if (query_node.hasWhere() || query_node.hasPrewhere()) { CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); - collector_where.visit(query_node.getWhere()); + if (query_node.hasPrewhere()) + collector_where.visit(query_node.getPrewhere()); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); - // Can't use 'WHERE' on remote node if it contains columns from other sources + // SELECT x FROM datalake.table WHERE x IN local.table + // Need to modify 'WHERE' on remote node if it contains columns from other sources if (!collector_where.getColumns().empty()) has_local_columns_in_where = true; } diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index a4a306f0d34c..e8d73c24dcbb 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1163,6 +1163,20 @@ def test_joins(started_cluster): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 + def test_graceful_shutdown(started_cluster): node = started_cluster.instances["s0_0_0"] diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index bcce89ac357a..d091512b6a40 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3492,3 +3492,94 @@ def compare_selects(query): compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL") compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL") compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL") + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +def test_cluster_joins(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str, table_name): + return execute_spark_query_general( + spark, + started_cluster, + storage_type, + table_name, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 'john'), + (2, 'jack') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME_2} ( + id INT, + second_name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME_2 + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME_2} VALUES + (1, 'dow'), + (2, 'sparrow') + """, TABLE_NAME_2 + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True + ) + + creation_expression_2 = get_creation_expression( + storage_type, TABLE_NAME_2, started_cluster, table_function=True, run_on_cluster=True + ) + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN {creation_expression_2} AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tsparrow\njohn\tdow\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" From 4e9ac7e6d57279f910f8190776f6e1e1975468a5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 15 Oct 2025 18:10:10 +0200 Subject: [PATCH 3/6] Add test for tables --- .../integration/test_database_iceberg/test.py | 87 ++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 6fa8113f5e37..81cc0d67ca68 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -14,12 +14,13 @@ import pytz from minio import Minio from pyiceberg.catalog import load_catalog -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( DoubleType, + LongType, FloatType, NestedField, StringType, @@ -27,6 +28,7 @@ TimestampType, TimestamptzType ) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.config_cluster import minio_secret_key, minio_access_key @@ -609,3 +611,86 @@ def test_table_with_slash(started_cluster): create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1}) assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n" + + +def test_cluster_joins(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_join_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + table_name_2 = f"{test_ref}_table_2" + + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, + name="tag", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="name", + field_type=StringType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}] + df = pa.Table.from_pylist(data) + table.append(df) + + schema2 = Schema( + NestedField( + field_id=1, + name="id", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="second_name", + field_type=StringType(), + required=False, + ), + ) + table2 = create_table(catalog, root_namespace, table_name_2, schema2, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}] + df = pa.Table.from_pylist(data) + table2.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tSparrow\nJohn\tDow\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" From 3dd9b46b087efa4c5912b81733f8261c0c8a3271 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 16 Oct 2025 11:47:16 +0200 Subject: [PATCH 4/6] Fix stateless tests --- src/Core/SettingsChangesHistory.cpp | 13 ++----------- src/Interpreters/InterpreterInsertQuery.cpp | 3 +++ 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 5915d3f3714d..5f6aa56e1929 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -48,6 +48,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_max_nodes", 0, 0, "New setting"}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { @@ -143,13 +145,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"}, {"allow_experimental_insert_into_iceberg", false, false, "New setting."}, /// RELEASE CLOSED - {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"}, - {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, - {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"}, - {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, - {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, - {"object_storage_cluster", "", "", "New setting"}, - {"object_storage_max_nodes", 0, 0, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.6.5.2000", { @@ -157,10 +152,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"}, {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, - {"object_storage_cluster", "", "", "New setting"}, - {"object_storage_max_nodes", 0, 0, "New setting"}, - {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, - {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 5b52cdbb9920..0a118070c5e6 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -772,6 +772,9 @@ InterpreterInsertQuery::distributedWriteIntoReplicatedMergeTreeFromClusterStorag if (!src_storage_cluster) return {}; + if (src_storage_cluster->getOriginalClusterName().empty()) + return {}; + if (!isInsertSelectTrivialEnoughForDistributedExecution(query)) return {}; From f5215f29c4f75776edea827dc54c7b9e50464f71 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 20 Oct 2025 14:21:24 +0200 Subject: [PATCH 5/6] Fix typo --- src/Core/Settings.cpp | 4 ++-- src/Core/SettingsEnums.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 8ef7a64bad9e..4bcb3f704efd 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1763,12 +1763,12 @@ Possible values: DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( Changes the behaviour of object storage cluster function or table. -ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. +ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table. Restrictions: - Only applied for JOIN subqueries. -- Only if the FROM section uses a object storage cluster function ot table. +- Only if the FROM section uses a object storage cluster function or table. Possible values: diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d4472e339edf..9be08214e1a7 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) -/// The setting for executing object storage cluster function ot table JOIN sections. +/// The setting for executing object storage cluster function or table JOIN sections. enum class ObjectStorageClusterJoinMode : uint8_t { LOCAL, /// Convert to local query From 023e38d6bb7af7cc7361fe01202acfbd4a9dd6ce Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 20 Oct 2025 18:45:53 +0200 Subject: [PATCH 6/6] Add tests with local table --- .../test_database_iceberg/configs/cluster.xml | 12 +++++ .../integration/test_database_iceberg/test.py | 47 +++++++++++++++++-- .../integration/test_storage_iceberg/test.py | 44 ++++++++++++++++- 3 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_database_iceberg/configs/cluster.xml diff --git a/tests/integration/test_database_iceberg/configs/cluster.xml b/tests/integration/test_database_iceberg/configs/cluster.xml new file mode 100644 index 000000000000..b9638e40bc1e --- /dev/null +++ b/tests/integration/test_database_iceberg/configs/cluster.xml @@ -0,0 +1,12 @@ + + + + + + node1 + 9000 + + + + + \ No newline at end of file diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 81cc0d67ca68..b5004b395120 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -188,10 +188,11 @@ def started_cluster(): cluster = ClickHouseCluster(__file__) cluster.add_instance( "node1", - main_configs=["configs/backups.xml"], + main_configs=["configs/backups.xml", "configs/cluster.xml"], user_configs=[], stay_alive=True, with_iceberg_catalog=True, + with_zookeeper=True, ) logging.info("Starting cluster...") @@ -619,6 +620,7 @@ def test_cluster_joins(started_cluster): test_ref = f"test_join_tables_{uuid.uuid4()}" table_name = f"{test_ref}_table" table_name_2 = f"{test_ref}_table_2" + table_name_local = f"{test_ref}_table_local" root_namespace = f"{test_ref}_namespace" @@ -665,6 +667,9 @@ def test_cluster_joins(started_cluster): df = pa.Table.from_pylist(data) table2.append(df) + node.query(f"CREATE TABLE `{table_name_local}` (id Int64, second_name String) ENGINE = Memory()") + node.query(f"INSERT INTO `{table_name_local}` VALUES (1, 'Silver'), (2, 'Black')") + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) res = node.query( @@ -674,7 +679,9 @@ def test_cluster_joins(started_cluster): JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 ON t1.tag=t2.id ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) @@ -689,7 +696,41 @@ def test_cluster_joins(started_cluster): FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` ) ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN `{table_name_local}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJohn\tSilver\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index d091512b6a40..a25e747c119e 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3500,6 +3500,7 @@ def test_cluster_joins(started_cluster, storage_type): spark = started_cluster.spark_session TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_LOCAL = "test_cluster_joins_local_" + storage_type + "_" + get_uuid_str() def execute_spark_query(query: str, table_name): return execute_spark_query_general( @@ -3556,6 +3557,9 @@ def execute_spark_query(query: str, table_name): storage_type, TABLE_NAME_2, started_cluster, table_function=True, run_on_cluster=True ) + instance.query(f"CREATE TABLE `{TABLE_NAME_LOCAL}` (id Int64, second_name String) ENGINE = Memory()") + instance.query(f"INSERT INTO `{TABLE_NAME_LOCAL}` VALUES (1, 'silver'), (2, 'black')") + res = instance.query( f""" SELECT t1.name,t2.second_name @@ -3563,7 +3567,9 @@ def execute_spark_query(query: str, table_name): JOIN {creation_expression_2} AS t2 ON t1.tag=t2.id ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ ) @@ -3578,7 +3584,41 @@ def execute_spark_query(query: str, table_name): FROM {creation_expression_2} ) ORDER BY ALL - SETTINGS object_storage_cluster_join_mode='local' + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN `{TABLE_NAME_LOCAL}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njohn\tsilver\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' """ )