Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/_integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ jobs:
SNOWFLAKE_TEST_QUOTED_DATABASE: ${{ vars.SNOWFLAKE_TEST_QUOTED_DATABASE }}
SNOWFLAKE_TEST_OAUTH_CLIENT_ID: ${{ vars.SNOWFLAKE_TEST_OAUTH_CLIENT_ID }}
SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE: ${{ vars.SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE }}
SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE: ${{ vars.SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE }}
SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET: ${{ secrets.SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET }}
SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN: ${{ secrets.SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN }}
SNOWFLAKE_TEST_PRIVATE_KEY: ${{ secrets.SNOWFLAKE_TEST_PRIVATE_KEY }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Add basic support for materializing tables in a Glue Catalog Linked Database
with Iceberg REST
time: 2025-10-08T17:08:01.731359-07:00
custom:
Author: colin-rogers-dbt
Issue: "1377"
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class IcebergRestCatalogRelation:
catalog_name: Optional[str] = constants.DEFAULT_ICEBERG_REST_CATALOG.name
table_format: Optional[str] = constants.ICEBERG_TABLE_FORMAT
catalog_linked_database: Optional[str] = None
catalog_linked_database_type: Optional[str] = None # e.g., 'glue' for AWS Glue
external_volume: Optional[str] = None
file_format: Optional[str] = None
target_file_size: Optional[str] = None
Expand All @@ -33,6 +34,7 @@ class IcebergRestCatalogIntegration(CatalogIntegration):
allows_writes = True
auto_refresh = None
catalog_linked_database: Optional[str] = None
catalog_linked_database_type: Optional[str] = None
max_data_extension_time_in_days: Optional[int] = None
target_file_size: Optional[str] = None

Expand All @@ -43,6 +45,9 @@ def __init__(self, config: CatalogIntegrationConfig) -> None:
self.external_volume: Optional[str] = config.external_volume
if adapter_properties := config.adapter_properties:
self.catalog_linked_database = adapter_properties.get("catalog_linked_database")
self.catalog_linked_database_type = adapter_properties.get(
"catalog_linked_database_type"
)
self.auto_refresh = adapter_properties.get("auto_refresh")
self.target_file_size = adapter_properties.get("target_file_size")
self.max_data_extension_time_in_days = adapter_properties.get(
Expand Down Expand Up @@ -79,6 +84,7 @@ def build_relation(self, model: RelationConfig) -> IcebergRestCatalogRelation:
catalog_name=self.name,
external_volume=None,
catalog_linked_database=self.catalog_linked_database,
catalog_linked_database_type=self.catalog_linked_database_type,
auto_refresh=parse_model.auto_refresh(model) or self.auto_refresh,
target_file_size=parse_model.target_file_size(model) or self.target_file_size,
max_data_extension_time_in_days=parse_model.max_data_extension_time_in_days(model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
{%- elif catalog_relation.catalog_type == 'BUILT_IN' -%}
{{ snowflake__create_table_built_in_sql(relation, compiled_code) }}
{%- elif catalog_relation.catalog_type == 'ICEBERG_REST' -%}
{{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }}
{%- if catalog_relation.catalog_linked_database_type is defined and
catalog_relation.catalog_linked_database_type == 'glue' -%}
{{ snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) }}
{%- else -%}
{{ snowflake__create_table_iceberg_rest_sql(relation, compiled_code) }}
{%- endif -%}
{%- else -%}
{% do exceptions.raise_compiler_error('Unexpected model config for: ' ~ relation) %}
{%- endif -%}
Expand Down Expand Up @@ -186,14 +191,16 @@ alter iceberg table {{ relation }} resume recluster;

{% macro snowflake__create_table_iceberg_rest_sql(relation, compiled_code) -%}
{#-
Implements CREATE ICEBERG TABLE ... CATALOG('catalog_name') (external REST catalog):
Implements CREATE ICEBERG TABLE for Iceberg REST catalogs with Catalog Linked Databases.
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-rest

Limitations:
- Iceberg does not support temporary tables (use a standard Snowflake table)
- Iceberg REST does not support CREATE OR REPLACE
- Iceberg catalogs do not support table renaming operations
- For existing tables, we must DROP the table first before creating the new one

Note: Iceberg REST writes only work with Catalog Linked Databases (CLD).
-#}

{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}
Expand Down Expand Up @@ -222,16 +229,12 @@ alter iceberg table {{ relation }} resume recluster;

{% endif %}

{# Create the table (works for both new and replacement scenarios) #}
{#- All Iceberg REST writes use CLD and support CTAS (except Glue, handled in table.sql) -#}
create iceberg table {{ relation }}
{%- if contract_config.enforced %}
{{ get_table_columns_and_constraints() }}
{%- endif %}
{{ optional('external_volume', catalog_relation.external_volume, "'") }}
{%- if not catalog_relation|attr('catalog_linked_database') -%}
catalog = '{{ catalog_relation.catalog_name }}' -- external REST catalog name
{{ optional('base_location', catalog_relation.base_location, "'") }}
{%- endif %}
{{ optional('target_file_size', catalog_relation.target_file_size, "'") }}
{{ optional('auto_refresh', catalog_relation.auto_refresh) }}
{{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}}
Expand All @@ -245,6 +248,63 @@ as (
{%- endmacro %}


{% macro snowflake__create_table_iceberg_rest_with_glue(relation, compiled_code, catalog_relation) -%}
{#-
Creates an Iceberg table for Catalog Linked Databases (e.g., AWS Glue) with explicit column definitions.
This is used when CTAS is not supported.

This macro is specifically for CLD where we need to create the table with an explicit schema
because CTAS is not available.
-#}

{# Step 1: Get the schema from the compiled query #}
{% set sql_columns = get_column_schema_from_query(compiled_code) %}

{# Step 2: Create the iceberg table in the CLD with explicit column definitions #}

{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- set row_access_policy = config.get('row_access_policy', default=none) -%}
{%- set table_tag = config.get('table_tag', default=none) -%}

{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}

{# Step 2a: Check if relation exists and drop if necessary (CLD doesn't support CREATE OR REPLACE) #}
{% set existing_relation = adapter.get_relation(database=relation.database, schema=relation.schema, identifier=relation.identifier) %}
{% if existing_relation %}
drop table if exists {{ existing_relation }};
{% endif %}

{# Step 2b: Create the table with explicit column definitions #}
create iceberg table {{ relation }} (
{%- for column in sql_columns -%}
{% if column.data_type == "FIXED" %}
{%- set data_type = "INT" -%}
{% elif "character varying" in column.data_type %}
{%- set data_type = "STRING" -%}
{% else %}
{%- set data_type = column.data_type -%}
{% endif %}
{{ adapter.quote(column.name.lower()) }} {{ data_type }}
{%- if not loop.last %}, {% endif -%}
{% endfor -%}
)
{{ optional('external_volume', catalog_relation.external_volume, "'") }}
{{ optional('target_file_size', catalog_relation.target_file_size, "'") }}
{{ optional('auto_refresh', catalog_relation.auto_refresh) }}
{{ optional('max_data_extension_time_in_days', catalog_relation.max_data_extension_time_in_days)}}
{% if row_access_policy -%} with row access policy {{ row_access_policy }} {%- endif %}
{% if table_tag -%} with tag ({{ table_tag }}) {%- endif %}
{% if copy_grants -%} copy grants {%- endif %}
;

{# Step 3: Insert data from the view (in regular DB) into the table (in CLD) #}
insert into {{ relation }}
{{ compiled_code }};

{%- endmacro %}


{% macro py_write_table(compiled_code, target_relation) %}

{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def catalogs(self):
"catalog_linked_database": os.getenv(
"SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE"
),
# No catalog_linked_database_type means standard CTAS is used
"max_data_extension_time_in_days": 1,
"target_file_size": "AUTO",
"auto_refresh": "true",
Expand Down Expand Up @@ -103,3 +104,99 @@ def test_basic_iceberg_rest_catalog_integration(self, project):
result = run_dbt(["run"])
assert len(result) == 4
run_dbt(["run"])


class TestSnowflakeIcebergRestGlueCatalogIntegration(BaseCatalogIntegrationValidation):

@pytest.fixture(scope="class")
def catalogs(self):
return {
"catalogs": [
{
"name": "glue_iceberg_rest_catalog",
"active_write_integration": "glue_iceberg_rest_catalog_integration",
"write_integrations": [
{
"name": "glue_iceberg_rest_catalog_integration",
"catalog_type": "iceberg_rest",
"table_format": "iceberg",
"adapter_properties": {
"catalog_linked_database": os.getenv(
"SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE"
),
"catalog_linked_database_type": "glue", # Glue requires 4-step process
"max_data_extension_time_in_days": 1,
"target_file_size": "AUTO",
"auto_refresh": "true",
},
}
],
},
]
}

@pytest.fixture(scope="class")
def project_config_update(self):
# Force quoting for Glue CLD compatibility
return {
"quoting": {
"database": False,
"schema": True,
"identifier": True,
}
}

@pytest.fixture(scope="class", autouse=True)
def setup_glue_schema(self, project):
"""Pre-create schema with quoted lowercase identifier for Glue CLD"""
adapter = project.adapter
glue_database = os.getenv("SNOWFLAKE_TEST_CATALOG_LINKED_DATABASE_GLUE")
schema_name = project.test_schema.lower()

# Create schema with quoted identifier to preserve lowercase
create_schema_sql = f'CREATE SCHEMA IF NOT EXISTS {glue_database}."{schema_name}"'
adapter.execute(create_schema_sql, fetch=False)

yield

# Cleanup: drop schema after test
drop_schema_sql = f'DROP SCHEMA IF EXISTS {glue_database}."{schema_name}"'
try:
adapter.execute(drop_schema_sql, fetch=False)
except:
pass # Ignore cleanup errors

# AWS Glue requires lowercase identifiers and alphanumeric characters only
@pytest.fixture(scope="class")
def unique_schema(self, request, prefix) -> str:
test_file = request.module.__name__
# We only want the last part of the name
test_file = test_file.split(".")[-1]
unique_schema = f"{prefix}_{test_file}_glue"
# Remove underscores and convert to lowercase for Glue compatibility
return unique_schema.replace("_", "").lower()

@pytest.fixture(scope="class")
def models(self):
# Use different catalog name for Glue models
return {
"models": {
"glue_basic_iceberg_table.sql": """
{{ config(materialized='table',
catalog_name='glue_iceberg_rest_catalog') }}
select 1 as id, 'test' as name, 1.0 as price, '2021-01-01' as test_date
""",
"glue_iceberg_table_with_catalog_config.sql": """
{{ config(materialized='table', catalog_name='glue_iceberg_rest_catalog',
target_file_size='16MB', max_data_extension_time_in_days=1, auto_refresh='true') }}
select 1 as id
""",
}
}

def test_glue_iceberg_rest_catalog_integration(self, project):
"""Test Glue CLD with 4-step table creation process"""
result = run_dbt(["run"])
assert len(result) == 2
# Run again to test update path
run_dbt(["run"])
Loading