From c0b5b069907e84268aa3625f209a92470d0bd101 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 8 Oct 2025 17:02:20 -0700 Subject: [PATCH 1/3] Add support for Iceberg REST w Glue databases --- .../snowflake/catalogs/_iceberg_rest.py | 6 + .../macros/relations/table/create.sql | 76 ++++++++++-- .../test_iceberg_rest_catalog_integrations.py | 111 ++++++++++++++++++ 3 files changed, 186 insertions(+), 7 deletions(-) diff --git a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py index acd98c1d1..65ba3a188 100644 --- a/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py +++ b/dbt-snowflake/src/dbt/adapters/snowflake/catalogs/_iceberg_rest.py @@ -18,6 +18,7 @@ class IcebergRestCatalogRelation: catalog_name: Optional[str] = constants.DEFAULT_ICEBERG_REST_CATALOG.name table_format: Optional[str] = constants.ICEBERG_TABLE_FORMAT catalog_linked_database: Optional[str] = None + catalog_linked_database_type: Optional[str] = None # e.g., 'glue' for AWS Glue external_volume: Optional[str] = None file_format: Optional[str] = None target_file_size: Optional[str] = None @@ -33,6 +34,7 @@ class IcebergRestCatalogIntegration(CatalogIntegration): allows_writes = True auto_refresh = None catalog_linked_database: Optional[str] = None + catalog_linked_database_type: Optional[str] = None max_data_extension_time_in_days: Optional[int] = None target_file_size: Optional[str] = None @@ -43,6 +45,9 @@ def __init__(self, config: CatalogIntegrationConfig) -> None: self.external_volume: Optional[str] = config.external_volume if adapter_properties := config.adapter_properties: self.catalog_linked_database = adapter_properties.get("catalog_linked_database") + self.catalog_linked_database_type = adapter_properties.get( + "catalog_linked_database_type" + ) self.auto_refresh = adapter_properties.get("auto_refresh") self.target_file_size = adapter_properties.get("target_file_size") self.max_data_extension_time_in_days = adapter_properties.get( @@ -79,6 +84,7 @@ def build_relation(self, model: RelationConfig) -> IcebergRestCatalogRelation: catalog_name=self.name, external_volume=None, catalog_linked_database=self.catalog_linked_database, + catalog_linked_database_type=self.catalog_linked_database_type, auto_refresh=parse_model.auto_refresh(model) or self.auto_refresh, target_file_size=parse_model.target_file_size(model) or self.target_file_size, max_data_extension_time_in_days=parse_model.max_data_extension_time_in_days(model) diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql index 7567923aa..7ebab72e8 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql @@ -10,7 +10,12 @@ {%- elif catalog_relation.catalog_type == 'BUILT_IN' -%} {{ snowflake__create_table_built_in_sql(relation, compiled_code) }} {%- elif catalog_relation.catalog_type == 'ICEBERG_REST' -%} - {{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }} + {%- if catalog_relation.catalog_linked_database_type is defined and + catalog_relation.catalog_linked_database_type == 'glue' -%} + {{ snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) }} + {%- else -%} + {{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }} + {%- endif -%} {%- else -%} {% do exceptions.raise_compiler_error('Unexpected model config for: ' ~ relation) %} {%- endif -%} @@ -186,7 +191,7 @@ alter iceberg table {{ relation }} resume recluster; {% macro snowflake__create_table_iceberg_rest_sql(relation, compiled_code) -%} {#- - Implements CREATE ICEBERG TABLE ... CATALOG('catalog_name') (external REST catalog): + Implements CREATE ICEBERG TABLE for Iceberg REST catalogs with Catalog Linked Databases. https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-rest Limitations: @@ -194,6 +199,10 @@ alter iceberg table {{ relation }} resume recluster; - Iceberg REST does not support CREATE OR REPLACE - Iceberg catalogs do not support table renaming operations - For existing tables, we must DROP the table first before creating the new one + + Note: Iceberg REST writes only work with Catalog Linked Databases (CLD). + Most CLDs support CTAS. For AWS Glue CLD (which doesn't support CTAS), + a 4-step process is handled at the materialization level (see table.sql). -#} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} @@ -222,16 +231,12 @@ alter iceberg table {{ relation }} resume recluster; {% endif %} -{# Create the table (works for both new and replacement scenarios) #} +{#- All Iceberg REST writes use CLD and support CTAS (except Glue, handled in table.sql) -#} create iceberg table {{ relation }} {%- if contract_config.enforced %} {{ get_table_columns_and_constraints() }} {%- endif %} {{ optional('external_volume', catalog_relation.external_volume, "'") }} - {%- if not catalog_relation|attr('catalog_linked_database') -%} - catalog = '{{ catalog_relation.catalog_name }}' -- external REST catalog name - {{ optional('base_location', catalog_relation.base_location, "'") }} - {%- endif %} {{ optional('target_file_size', catalog_relation.target_file_size, "'") }} {{ optional('auto_refresh', catalog_relation.auto_refresh) }} {{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}} @@ -245,6 +250,63 @@ as ( {%- endmacro %} +{% macro snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) -%} +{#- + Creates an Iceberg table for Catalog Linked Databases (e.g., AWS Glue) with explicit column definitions. + This is used when CTAS is not supported. + + This macro is specifically for CLD where we need to create the table with an explicit schema + because CTAS is not available. +-#} + +{# Step 1: Get the schema from the compiled query #} +{% set sql_columns = get_column_schema_from_query(compiled_code) %} + +{# Step 2: Create the iceberg table in the CLD with explicit column definitions #} + +{%- set copy_grants = config.get('copy_grants', default=false) -%} +{%- set row_access_policy = config.get('row_access_policy', default=none) -%} +{%- set table_tag = config.get('table_tag', default=none) -%} + +{%- set sql_header = config.get('sql_header', none) -%} +{{ sql_header if sql_header is not none }} + +{# Check if relation exists and drop if necessary (CLD doesn't support CREATE OR REPLACE) #} +{% set existing_relation = adapter.get_relation(database=relation.database, schema=relation.schema, identifier=relation.identifier) %} +{% if existing_relation %} + drop table if exists {{ existing_relation }}; +{% endif %} + +{# Create the table with explicit column definitions #} +create iceberg table {{ relation }} ( + {%- for column in sql_columns -%} + {% if column.data_type == "FIXED" %} + {%- set data_type = "INT" -%} + {% elif "character varying" in column.data_type %} + {%- set data_type = "STRING" -%} + {% else %} + {%- set data_type = column.data_type -%} + {% endif %} + {{ adapter.quote(column.name.lower()) }} {{ data_type }} + {%- if not loop.last %}, {% endif -%} + {% endfor -%} +) +{{ optional('external_volume', catalog_relation.external_volume, "'") }} +{{ optional('target_file_size', catalog_relation.target_file_size, "'") }} +{{ optional('auto_refresh', catalog_relation.auto_refresh) }} +{{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}} +{% if row_access_policy -%} with row access policy {{ row_access_policy }} {%- endif %} +{% if table_tag -%} with tag ({{ table_tag }}) {%- endif %} +{% if copy_grants -%} copy grants {%- endif %} +; + +{# Step 3: Insert data from the view (in regular DB) into the table (in CLD) #} +insert into {{ relation }} + {{ compiled_code }}; + +{%- endmacro %} + + {% macro py_write_table(compiled_code, target_relation) %} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} diff --git a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py index a057f077f..30c91dd29 100644 --- a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py +++ b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py @@ -51,6 +51,7 @@ class TestSnowflakeIcebergRestCatalogIntegration(BaseCatalogIntegrationValidation): + """Test for non-Glue Catalog Linked Databases (e.g., Polaris) that support CTAS""" @pytest.fixture(scope="class") def catalogs(self): @@ -68,6 +69,7 @@ def catalogs(self): "catalog_linked_database": os.getenv( "SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE" ), + # No catalog_linked_database_type means standard CTAS is used "max_data_extension_time_in_days": 1, "target_file_size": "AUTO", "auto_refresh": "true", @@ -103,3 +105,112 @@ def test_basic_iceberg_rest_catalog_integration(self, project): result = run_dbt(["run"]) assert len(result) == 4 run_dbt(["run"]) + + +class TestSnowflakeIcebergRestGlueCatalogIntegration(BaseCatalogIntegrationValidation): + """Test for AWS Glue Catalog Linked Database that doesn't support CTAS + + Note: AWS Glue CLDs in Snowflake have several limitations: + 1. Require lowercase unquoted identifiers OR double-quoted identifiers + 2. May not support SHOW SCHEMAS and other metadata operations + 3. Require specific IAM permissions and AWS configuration + + This test is skipped by default. To test Glue CLD functionality: + 1. Set up a properly configured AWS Glue CLD in Snowflake + 2. Ensure proper IAM roles and permissions + 3. Manually create a schema in lowercase + 4. Remove the @pytest.mark.skip decorator and run the test + """ + + @pytest.fixture(scope="class") + def catalogs(self): + return { + "catalogs": [ + { + "name": "glue_iceberg_rest_catalog", + "active_write_integration": "glue_iceberg_rest_catalog_integration", + "write_integrations": [ + { + "name": "glue_iceberg_rest_catalog_integration", + "catalog_type": "iceberg_rest", + "table_format": "iceberg", + "adapter_properties": { + "catalog_linked_database": os.getenv( + "SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE" + ), + "catalog_linked_database_type": "glue", # Glue requires 4-step process + "max_data_extension_time_in_days": 1, + "target_file_size": "AUTO", + "auto_refresh": "true", + }, + } + ], + }, + ] + } + + @pytest.fixture(scope="class") + def project_config_update(self): + # Force quoting for Glue CLD compatibility + return { + "quoting": { + "database": False, + "schema": True, + "identifier": True, + } + } + + @pytest.fixture(scope="class", autouse=True) + def setup_glue_schema(self, project): + """Pre-create schema with quoted lowercase identifier for Glue CLD""" + adapter = project.adapter + glue_database = os.getenv("SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE") + schema_name = project.test_schema.lower() + + # Create schema with quoted identifier to preserve lowercase + create_schema_sql = f'CREATE SCHEMA IF NOT EXISTS {glue_database}."{schema_name}"' + adapter.execute(create_schema_sql, fetch=False) + + yield + + # Cleanup: drop schema after test + drop_schema_sql = f'DROP SCHEMA IF EXISTS {glue_database}."{schema_name}"' + try: + adapter.execute(drop_schema_sql, fetch=False) + except: + pass # Ignore cleanup errors + + # AWS Glue requires lowercase identifiers and alphanumeric characters only + @pytest.fixture(scope="class") + def unique_schema(self, request, prefix) -> str: + test_file = request.module.__name__ + # We only want the last part of the name + test_file = test_file.split(".")[-1] + unique_schema = f"{prefix}_{test_file}_glue" + # Remove underscores and convert to lowercase for Glue compatibility + return unique_schema.replace("_", "").lower() + + @pytest.fixture(scope="class") + def models(self): + # Use different catalog name for Glue models + return { + "models": { + "glue_basic_iceberg_table.sql": """ + {{ config(materialized='table', + catalog_name='glue_iceberg_rest_catalog') }} + select 1 as id, 'test' as name, 1.0 as price, '2021-01-01' as test_date + """, + "glue_iceberg_table_with_catalog_config.sql": """ + {{ config(materialized='table', catalog_name='glue_iceberg_rest_catalog', + target_file_size='16MB', max_data_extension_time_in_days=1, auto_refresh='true') }} + select 1 as id + """, + } + } + + def test_glue_iceberg_rest_catalog_integration(self, project): + """Test Glue CLD with 4-step table creation process""" + result = run_dbt(["run"]) + assert len(result) == 2 + # Run again to test update path + run_dbt(["run"]) From 142dd40812ff3fac7af47967322125ad7c7bb1b0 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 8 Oct 2025 17:08:23 -0700 Subject: [PATCH 2/3] update github actions and add changie --- .github/workflows/_integration-tests.yml | 1 + .../.changes/unreleased/Features-20251008-170801.yaml | 7 +++++++ 2 files changed, 8 insertions(+) create mode 100644 dbt-snowflake/.changes/unreleased/Features-20251008-170801.yaml diff --git a/.github/workflows/_integration-tests.yml b/.github/workflows/_integration-tests.yml index 25e69c8db..23f075c18 100644 --- a/.github/workflows/_integration-tests.yml +++ b/.github/workflows/_integration-tests.yml @@ -338,6 +338,7 @@ jobs: SNOWFLAKE_TEST_QUOTED_DATABASE: ${{ vars.SNOWFLAKE_TEST_QUOTED_DATABASE }} SNOWFLAKE_TEST_OAUTH_CLIENT_ID: ${{ vars.SNOWFLAKE_TEST_OAUTH_CLIENT_ID }} SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE: ${{ vars.SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE }} + SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE: ${{ vars.SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE }} SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET: ${{ secrets.SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET }} SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN: ${{ secrets.SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN }} SNOWFLAKE_TEST_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_TEST_PRIVATE_KEY }} diff --git a/dbt-snowflake/.changes/unreleased/Features-20251008-170801.yaml b/dbt-snowflake/.changes/unreleased/Features-20251008-170801.yaml new file mode 100644 index 000000000..3449c6a07 --- /dev/null +++ b/dbt-snowflake/.changes/unreleased/Features-20251008-170801.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Add basic support for materializing tables in a Glue Catalog Linked Database + with Iceberg REST +time: 2025-10-08T17:08:01.731359-07:00 +custom: + Author: colin-rogers-dbt + Issue: "1377" From f436f98c8b29f2c6dde77ecaab45cccb7b307925 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 14 Oct 2025 10:13:07 -0700 Subject: [PATCH 3/3] clean up code comments --- .../snowflake/macros/relations/table/create.sql | 6 ++---- .../test_iceberg_rest_catalog_integrations.py | 14 -------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql index 7ebab72e8..1ac7f18e6 100644 --- a/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt-snowflake/src/dbt/include/snowflake/macros/relations/table/create.sql @@ -201,8 +201,6 @@ alter iceberg table {{ relation }} resume recluster; - For existing tables, we must DROP the table first before creating the new one Note: Iceberg REST writes only work with Catalog Linked Databases (CLD). - Most CLDs support CTAS. For AWS Glue CLD (which doesn't support CTAS), - a 4-step process is handled at the materialization level (see table.sql). -#} {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} @@ -271,13 +269,13 @@ as ( {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none }} -{# Check if relation exists and drop if necessary (CLD doesn't support CREATE OR REPLACE) #} +{# Step 2a: Check if relation exists and drop if necessary (CLD doesn't support CREATE OR REPLACE) #} {% set existing_relation = adapter.get_relation(database=relation.database, schema=relation.schema, identifier=relation.identifier) %} {% if existing_relation %} drop table if exists {{ existing_relation }}; {% endif %} -{# Create the table with explicit column definitions #} +{# Step 2b: Create the table with explicit column definitions #} create iceberg table {{ relation }} ( {%- for column in sql_columns -%} {% if column.data_type == "FIXED" %} diff --git a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py index 30c91dd29..0ae55254e 100644 --- a/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py +++ b/dbt-snowflake/tests/functional/adapter/catalog_integrations/test_iceberg_rest_catalog_integrations.py @@ -51,7 +51,6 @@ class TestSnowflakeIcebergRestCatalogIntegration(BaseCatalogIntegrationValidation): - """Test for non-Glue Catalog Linked Databases (e.g., Polaris) that support CTAS""" @pytest.fixture(scope="class") def catalogs(self): @@ -108,19 +107,6 @@ def test_basic_iceberg_rest_catalog_integration(self, project): class TestSnowflakeIcebergRestGlueCatalogIntegration(BaseCatalogIntegrationValidation): - """Test for AWS Glue Catalog Linked Database that doesn't support CTAS - - Note: AWS Glue CLDs in Snowflake have several limitations: - 1. Require lowercase unquoted identifiers OR double-quoted identifiers - 2. May not support SHOW SCHEMAS and other metadata operations - 3. Require specific IAM permissions and AWS configuration - - This test is skipped by default. To test Glue CLD functionality: - 1. Set up a properly configured AWS Glue CLD in Snowflake - 2. Ensure proper IAM roles and permissions - 3. Manually create a schema in lowercase - 4. Remove the @pytest.mark.skip decorator and run the test - """ @pytest.fixture(scope="class") def catalogs(self):