Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add cluster by config in Snowflake for dynamic tables
time: 2025-08-30T12:08:02.458481+02:00
custom:
Author: nazliander
Issue: "706"
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from dataclasses import dataclass
from typing import Optional, Dict, Any, TYPE_CHECKING
from typing import Optional, Dict, Any, TYPE_CHECKING, Union

from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import ComponentName
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self

from dbt.adapters.snowflake.parse_model import cluster_by
Copy link
Contributor

@colin-rogers-dbt colin-rogers-dbt Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this import is causing issues and it doesn't appear used

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using on line 105 simply I was lazy to write a new parser or change the old one.

I guess I should have used a generic type, not a | 🤦 I will try to solve it somewhere this week.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like should have been compatible with Python 3.9, I was using the wrong Union notation with |. I pushed a fixup commit now. At least locally I can see unit tests are passing. 👍

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase

if TYPE_CHECKING:
Expand Down Expand Up @@ -45,6 +46,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
- snowflake_warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table
- refresh_mode: specifies the refresh type for the dynamic table
- initialize: specifies the behavior of the initial refresh of the dynamic table
- cluster_by: specifies the columns to cluster on

There are currently no non-configurable parameters.
"""
Expand All @@ -59,6 +61,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
initialize: Optional[Initialize] = Initialize.default()
row_access_policy: Optional[str] = None
table_tag: Optional[str] = None
cluster_by: Optional[Union[str, list[str]]] = None

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
Expand All @@ -79,6 +82,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
"initialize": config_dict.get("initialize"),
"row_access_policy": config_dict.get("row_access_policy"),
"table_tag": config_dict.get("table_tag"),
"cluster_by": config_dict.get("cluster_by"),
}

return super().from_dict(kwargs_dict) # type:ignore
Expand All @@ -98,6 +102,7 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
"row_access_policy"
),
"table_tag": relation_config.config.extra.get("table_tag"), # type:ignore
"cluster_by": cluster_by(relation_config),
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"): # type:ignore
Expand All @@ -122,6 +127,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str,
"refresh_mode": dynamic_table.get("refresh_mode"),
"row_access_policy": dynamic_table.get("row_access_policy"),
"table_tag": dynamic_table.get("table_tag"),
"cluster_by": dynamic_table.get("cluster_by"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
{{ optional('initialize', dynamic_table.initialize) }}
{{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }}
{{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
as (
{{ sql }}
)
Expand Down Expand Up @@ -74,6 +75,7 @@
{{ optional('initialize', dynamic_table.initialize) }}
{{ optional('row_access_policy', dynamic_table.row_access_policy) }}
{{ optional('table_tag', dynamic_table.table_tag) }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
as (
{{ sql }}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ create or replace dynamic table {{ relation }}
{{ optional('initialize', dynamic_table.initialize) }}
{{ optional('with row access policy', dynamic_table.row_access_policy, equals_char='') }}
{{ optional('with tag', dynamic_table.table_tag, quote_char='(', equals_char='') }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
as (
{{ sql }}
)
Expand Down Expand Up @@ -87,6 +88,7 @@ create or replace dynamic iceberg table {{ relation }}
{{ optional('initialize', dynamic_table.initialize) }}
{{ optional('row_access_policy', dynamic_table.row_access_policy) }}
{{ optional('table_tag', dynamic_table.table_tag) }}
{{ optional('cluster by', dynamic_table.cluster_by, quote_char='(', equals_char='') }}
as (
{{ sql }}
)
Expand Down
123 changes: 123 additions & 0 deletions dbt-snowflake/tests/functional/adapter/test_cluster_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import pytest
from dbt.tests.util import check_table_does_exist, run_dbt

_SEED_CSV = """
id,first_name,last_name,email,product_id
1,Jack,Hunter,jhunter0@foo.bar,1
2,Kathryn,Walker,kwalker1@foo.bar,1
3,Gerald,Ryan,gryan2@foo.bar,3
4,Jack,Hunter,jhunter1@foo.bar,4
5,Kathryn,Walker,kwalker2@foo.bar,5
6,Gerald,Ryan,gryan3@foo.bar,6
""".lstrip()


class TestClusterBy:
@pytest.fixture(scope="class")
def seeds(self):
return {"seed.csv": _SEED_CSV}

@pytest.fixture(scope="class")
def models(self, dbt_profile_target):
warehouse_name = dbt_profile_target["warehouse"]

_DYNAMIC_TABLE_1_SQL = f"""
{{{{ config(materialized='dynamic_table', snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
select * from {{{{ ref('seed') }}}}
""".lstrip()

_DYNAMIC_TABLE_2_SQL = f"""
{{{{ config(materialized='dynamic_table', cluster_by=['last_name'], snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
select * from {{{{ ref('dynamic_table_1') }}}}
""".lstrip()

_DYNAMIC_TABLE_3_SQL = f"""
{{{{ config(materialized='dynamic_table', cluster_by=['last_name', 'first_name'], snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
select
last_name,
first_name,
count(*) as count
from {{{{ ref('seed') }}}}
group by 1, 2
""".lstrip()

_DYNAMIC_TABLE_4_SQL = f"""
{{{{ config(materialized='dynamic_table', cluster_by=['last_name', 'product_id % 3'], snowflake_warehouse='{warehouse_name}', target_lag='1 minute') }}}}
select
last_name,
first_name,
product_id,
count(*) as count
from {{{{ ref('seed') }}}}
group by 1, 2, 3
""".lstrip()

return {
"dynamic_table_1.sql": _DYNAMIC_TABLE_1_SQL,
"dynamic_table_2.sql": _DYNAMIC_TABLE_2_SQL,
"dynamic_table_3.sql": _DYNAMIC_TABLE_3_SQL,
"dynamic_table_4.sql": _DYNAMIC_TABLE_4_SQL,
}

def test_snowflake_dynamic_table_cluster_by(self, project):

run_dbt(["seed"])

db_with_schema = f"{project.database}.{project.test_schema}"

check_table_does_exist(
project.adapter, f"{db_with_schema}.{self._available_models_in_setup()['seed_table']}"
)

run_dbt()

# Check that all dynamic tables exist
check_table_does_exist(
project.adapter,
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_1']}",
)
check_table_does_exist(
project.adapter,
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_2']}",
)
check_table_does_exist(
project.adapter,
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_3']}",
)
check_table_does_exist(
project.adapter,
f"{db_with_schema}.{self._available_models_in_setup()['dynamic_table_4']}",
)

with project.adapter.connection_named("__test"):
# Check if cluster_by is applied to dynamic_table_2 (should cluster by last_name)
cluster_by = self._get_dynamic_table_ddl(
project, self._available_models_in_setup()["dynamic_table_2"]
)
assert "CLUSTER BY (LAST_NAME)" in cluster_by.upper()

# Check if cluster_by is applied to dynamic_table_3 (should cluster by last_name, first_name)
cluster_by = self._get_dynamic_table_ddl(
project, self._available_models_in_setup()["dynamic_table_3"]
)
assert "CLUSTER BY (LAST_NAME, FIRST_NAME)" in cluster_by.upper()

# Check if cluster_by is applied to dynamic_table_4 (should cluster by last_name, product_id % 3)
cluster_by = self._get_dynamic_table_ddl(
project, self._available_models_in_setup()["dynamic_table_4"]
)
assert "CLUSTER BY (LAST_NAME, PRODUCT_ID % 3)" in cluster_by.upper()

def _get_dynamic_table_ddl(self, project, table_name: str) -> str:
ddl_query = f"SELECT GET_DDL('DYNAMIC_TABLE', '{project.database}.{project.test_schema}.{table_name}')"
ddl = project.run_sql(ddl_query, fetch="one")
return ddl[0]

def _available_models_in_setup(self) -> dict[str, str]:
return dict(
seed_table="SEED",
dynamic_table_1="DYNAMIC_TABLE_1",
dynamic_table_2="DYNAMIC_TABLE_2",
dynamic_table_3="DYNAMIC_TABLE_3",
dynamic_table_4="DYNAMIC_TABLE_4",
)
Loading