diff --git a/dbt/include/clickhouse/macros/adapters.sql b/dbt/include/clickhouse/macros/adapters.sql index 6ae897d7..2c2c4a0f 100644 --- a/dbt/include/clickhouse/macros/adapters.sql +++ b/dbt/include/clickhouse/macros/adapters.sql @@ -79,12 +79,27 @@ {%- endcall %} {% endmacro %} -{% macro clickhouse__make_temp_relation(base_relation, suffix) %} - {% set tmp_identifier = base_relation.identifier ~ suffix %} - {% set tmp_relation = base_relation.incorporate( +{% macro clickhouse__make_intermediate_relation(base_relation, suffix='__dbt_tmp') %} + {%- set intermediate_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%} + {%- set intermediate_relation = base_relation.incorporate(path={"identifier": intermediate_identifier}) -%} + {{ return(intermediate_relation) }} +{%- endmacro %} + +{% macro clickhouse__make_backup_relation(base_relation, backup_relation_type, suffix='__dbt_backup') %} + {%- set backup_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%} + {%- set backup_relation = base_relation.incorporate( + path={"identifier": backup_identifier}, + type=backup_relation_type + ) -%} + {{ return(backup_relation) }} +{%- endmacro %} + +{% macro clickhouse__make_temp_relation(base_relation, suffix='__dbt_tmp') %} + {%- set tmp_identifier = base_relation.identifier ~ suffix ~ '_' ~ invocation_id.replace('-', '_') -%} + {%- set tmp_relation = base_relation.incorporate( path={"identifier": tmp_identifier, "schema": None}) -%} - {% do return(tmp_relation) %} -{% endmacro %} + {{ return(tmp_relation) }} +{%- endmacro %} {% macro clickhouse__generate_database_name(custom_database_name=none, node=none) -%} diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index e84e8396..a9ee17fb 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -31,7 +31,7 @@ {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} {% endif %} {% endif %} - {% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %} + {%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%} -- drop the temp relations if they exist already in the database {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index ef31a76c..fb973e68 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -34,17 +34,12 @@ {{ create_schema(target_relation_local) }} {%- set intermediate_relation = make_intermediate_relation(target_relation_local)-%} {%- set distributed_intermediate_relation = make_intermediate_relation(target_relation)-%} - {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%} - {%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - {%- set view_relation = default__make_temp_relation(target_relation, '__dbt_view_tmp') -%} - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ drop_relation_if_exists(view_relation) }} {{ drop_relation_if_exists(distributed_intermediate_relation) }} + + {%- set view_relation = make_intermediate_relation(target_relation, '__dbt_view_tmp') -%} + {{ drop_relation_if_exists(view_relation) }} {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} @@ -74,6 +69,12 @@ {% endcall %} {% else %} + {% if existing_relation is none %} + {{ drop_relation_if_exists(existing_relation) }} + {% do run_query(create_distributed_table(target_relation, target_relation_local)) %} + {% set existing_relation = target_relation %} + {% endif %} + {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {%- if on_schema_change != 'ignore' %} @@ -99,6 +100,11 @@ {% endif %} {% if need_swap %} + {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%} + {%- set distributed_backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + {{ drop_relation_if_exists(preexisting_backup_relation) }} {% if False %} {% do adapter.rename_relation(intermediate_relation, backup_relation) %} {% do exchange_tables_atomic(backup_relation, target_relation_local) %} @@ -108,7 +114,7 @@ {% endif %} -- Structure could have changed, need to update distributed table from replaced local table - {% set target_relation_new = target_relation.incorporate(path={"identifier": target_relation.identifier + '_temp'}) %} + {%- set target_relation_new = make_intermediate_relation(target_relation) -%} {{ drop_relation_if_exists(target_relation_new) }} {% do run_query(create_distributed_table(target_relation_new, target_relation_local)) %} @@ -124,6 +130,8 @@ {% do to_drop.append(distributed_backup_relation) %} {% endif %} + {{ drop_relation_if_exists(view_relation) }} + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {% do apply_grants(target_relation_local, grant_config, should_revoke=should_revoke) %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index c4dcb1b8..80530e4e 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -200,10 +200,9 @@ {% macro clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, is_distributed=False) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier - + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} + {%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%} {{ drop_relation_if_exists(new_data_relation) }} - {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} + {%- set distributed_new_data_relation = make_intermediate_relation(existing_relation, '__dbt_distributed_new_data') -%} {%- set inserting_relation = new_data_relation -%} @@ -223,18 +222,18 @@ {% call statement('delete_existing_data') %} {% if is_distributed %} {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} - delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ inserting_relation }}) {% else %} - delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) + delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} + from {{ inserting_relation }}) {% endif %} {%- if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {%- endif -%} - {{ adapter.get_model_query_settings(model) }} + {{ adapter.get_model_query_settings(model) }} {% endcall %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} @@ -247,8 +246,7 @@ {% endmacro %} {% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] - + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} + {%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%} {{ drop_relation_if_exists(new_data_relation) }} {% call statement('create_new_data_temp') -%} {{ get_create_table_as_sql(False, new_data_relation, sql) }} diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index 1ef0e8b5..7aad977c 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -208,7 +208,7 @@ {%- endmacro %} {% macro clickhouse__insert_into(target_relation, sql, has_contract) %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_columns = adapter.get_column_schema_from_query(sql) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} insert into {{ target_relation }}