Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,27 @@
{%- endcall %}
{% endmacro %}

{% macro clickhouse__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_relation = base_relation.incorporate(
{% macro clickhouse__make_intermediate_relation(base_relation, suffix='__dbt_tmp') %}
{%- set intermediate_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%}
{%- set intermediate_relation = base_relation.incorporate(path={"identifier": intermediate_identifier}) -%}
{{ return(intermediate_relation) }}
{%- endmacro %}

{% macro clickhouse__make_backup_relation(base_relation, backup_relation_type, suffix='__dbt_backup') %}
{%- set backup_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%}
{%- set backup_relation = base_relation.incorporate(
path={"identifier": backup_identifier},
type=backup_relation_type
) -%}
{{ return(backup_relation) }}
{%- endmacro %}

{% macro clickhouse__make_temp_relation(base_relation, suffix='__dbt_tmp') %}
{%- set tmp_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%}
{%- set tmp_relation = base_relation.incorporate(
path={"identifier": tmp_identifier, "schema": None}) -%}
{% do return(tmp_relation) %}
{% endmacro %}
{{ return(tmp_relation) }}
{%- endmacro %}


{% macro clickhouse__generate_database_name(custom_database_name=none, node=none) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
{% endif %}
{% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %}
{%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@
{{ create_schema(target_relation_local) }}
{%- set intermediate_relation = make_intermediate_relation(target_relation_local)-%}
{%- set distributed_intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%}
{%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{%- set view_relation = default__make_temp_relation(target_relation, '__dbt_view_tmp') -%}

{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{{ drop_relation_if_exists(view_relation) }}
{{ drop_relation_if_exists(distributed_intermediate_relation) }}

{%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%}
{{ drop_relation_if_exists(view_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
Expand Down Expand Up @@ -74,6 +69,12 @@
{% endcall %}

{% else %}
{% if existing_relation is none %}
{{ drop_relation_if_exists(existing_relation) }}
{% do run_query(create_distributed_table(target_relation, target_relation_local)) %}
{% set existing_relation = target_relation %}
{% endif %}

{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{%- if on_schema_change != 'ignore' %}
Expand All @@ -99,6 +100,11 @@
{% endif %}

{% if need_swap %}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%}
{%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{% if False %}
{% do adapter.rename_relation(intermediate_relation, backup_relation) %}
{% do exchange_tables_atomic(backup_relation, target_relation_local) %}
Expand All @@ -108,7 +114,7 @@
{% endif %}

-- Structure could have changed, need to update distributed table from replaced local table
{% set target_relation_new = target_relation.incorporate(path={"identifier": target_relation.identifier + '_temp'}) %}
{%- set target_relation_new = make_intermediate_relation(target_relation) -%}
{{ drop_relation_if_exists(target_relation_new) }}
{% do run_query(create_distributed_table(target_relation_new, target_relation_local)) %}

Expand All @@ -124,6 +130,8 @@
{% do to_drop.append(distributed_backup_relation) %}
{% endif %}

{{ drop_relation_if_exists(view_relation) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do apply_grants(target_relation_local, grant_config, should_revoke=should_revoke) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,9 @@


{% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%}
{{ drop_relation_if_exists(new_data_relation) }}
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
{%- set distributed_new_data_relation = make_intermediate_relation(existing_relation, '__dbt_distributed_new_data') -%}

{%- set inserting_relation = new_data_relation -%}

Expand All @@ -223,18 +222,18 @@
{% call statement('delete_existing_data') %}
{% if is_distributed %}
{% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %}
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% else %}
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }}
from {{ inserting_relation }})
{% endif %}
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
Expand All @@ -247,8 +246,7 @@
{% endmacro %}

{% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name']
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%}
{{ drop_relation_if_exists(new_data_relation) }}
{% call statement('create_new_data_temp') -%}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@
{%- endmacro %}

{% macro clickhouse__insert_into(target_relation, sql, has_contract) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_columns = adapter.get_column_schema_from_query(sql) -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why this change is required? why change the columns getter from the target relation to the SQL?

{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }}
Expand Down