Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1763,12 +1763,12 @@ Possible values:
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
Changes the behaviour of object storage cluster function or table.

ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table.

Restrictions:

- Only applied for JOIN subqueries.
- Only if the FROM section uses a object storage cluster function ot table.
- Only if the FROM section uses a object storage cluster function or table.

Possible values:

Expand Down
13 changes: 2 additions & 11 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_max_nodes", 0, 0, "New setting"},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down Expand Up @@ -143,24 +145,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
/// RELEASE CLOSED
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.6.5.2000",
{
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
{
Expand Down
2 changes: 1 addition & 1 deletion src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t

DECLARE_SETTING_ENUM(DistributedProductMode)

/// The setting for executing object storage cluster function ot table JOIN sections.
/// The setting for executing object storage cluster function or table JOIN sections.
enum class ObjectStorageClusterJoinMode : uint8_t
{
LOCAL, /// Convert to local query
Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ InterpreterInsertQuery::distributedWriteIntoReplicatedMergeTreeFromClusterStorag
if (!src_storage_cluster)
return {};

if (src_storage_cluster->getOriginalClusterName().empty())
return {};

if (!isInsertSelectTrivialEnoughForDistributedExecution(query))
return {};

Expand Down
2 changes: 1 addition & 1 deletion src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent)
/// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
if (table_node && table_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
else if (table_function_node && table_function_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
Expand Down
109 changes: 71 additions & 38 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>

Expand Down Expand Up @@ -112,7 +114,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}

bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
{
Expand All @@ -126,15 +128,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito

auto node_type = node->getNodeType();

if (node_type == type)
if (types.contains(node_type))
{
passed_node = node;
passed_type = node_type;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }
std::optional<QueryTreeNodeType> getType() const { return passed_type; }

private:
QueryTreeNodeType type;
std::unordered_set<QueryTreeNodeType> types;
QueryTreeNodePtr passed_node;
std::optional<QueryTreeNodeType> passed_type;
};

/*
Expand Down Expand Up @@ -216,49 +223,69 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto modified_query_tree = query_tree->clone();
bool need_modify = false;
if (has_join || has_local_columns_in_where)
{
auto modified_query_tree = query_tree->clone();

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
table_function_searcher.visit(modified_query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");

if (has_join)
{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
QueryTreeNodePtr query_tree_distributed;

auto & query_node = modified_query_tree->as<QueryNode &>();

if (has_join)
{
if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION)
{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
}
else
{
auto join_node = query_node.getJoinTree();
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
}
}

// Find add used columns from table function to make proper projection list
// Need to do before changing WHERE condition
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
collector.visit(query_tree);
collector.visit(modified_query_tree);
const auto & columns = collector.getColumns();

auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.resolveProjectionColumns(columns);
auto column_nodes_to_select = std::make_shared<ListNode>();
column_nodes_to_select->getNodes().reserve(columns.size());
for (auto & column : columns)
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
query_node.getProjectionNode() = column_nodes_to_select;

// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
if (has_local_columns_in_where)
{
if (query_node.getPrewhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context);
if (query_node.getWhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context);
}

need_modify = true;
}
query_node.getOrderByNode() = std::make_shared<ListNode>();
query_node.getGroupByNode() = std::make_shared<ListNode>();

if (has_local_columns_in_where)
{
auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.getWhere() = {};
}
if (query_tree_distributed)
{
// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
}

if (need_modify)
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
Expand Down Expand Up @@ -501,25 +528,31 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
SearcherVisitor join_searcher({QueryTreeNodeType::JOIN}, context);
join_searcher.visit(query_info.query_tree);
if (join_searcher.getNode())
has_join = true;

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
table_function_searcher.visit(query_info.query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node");

CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
auto & query_node = query_info.query_tree->as<QueryNode &>();
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// Can't use 'WHERE' on remote node if it contains columns from other sources
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;
if (query_node.hasWhere() || query_node.hasPrewhere())
{
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
if (query_node.hasPrewhere())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite odd that the original code didn't take prewhere into account...

If those are actually needed, we need to place a mental pin here that after backporting prewhere+row_policy we'll need to add hasRowLevelFilter here as well.

collector_where.visit(query_node.getPrewhere());
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// SELECT x FROM datalake.table WHERE x IN local.table
// Need to modify 'WHERE' on remote node if it contains columns from other sources
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;
}

if (has_join || has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query)
return table_expression ? table_expression->table_function : nullptr;
}

ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query)
{
auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query);
return table_expression ? table_expression->database_and_table_name : nullptr;
}

ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query)
{
auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query);
Expand Down
1 change: 1 addition & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct ASTTableExpression;

ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query);
ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query);
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);

Expand Down
12 changes: 12 additions & 0 deletions tests/integration/test_database_iceberg/configs/cluster.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>
Loading