From b67971665bb2ab916dd5e0f2e0a9256b4dcc643a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Tue, 17 Jun 2025 00:45:12 +0200 Subject: [PATCH 01/16] feat: Add support for Microsoft Fabric Waerhouse --- sqlmesh/core/config/connection.py | 22 ++ sqlmesh/core/engine_adapter/__init__.py | 4 + .../core/engine_adapter/fabric_warehouse.py | 233 ++++++++++++++++++ 3 files changed, 259 insertions(+) create mode 100644 sqlmesh/core/engine_adapter/fabric_warehouse.py diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index b3ed3bc34f..3452ee5ba8 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1587,6 +1587,28 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} +class FabricWarehouseConnectionConfig(MSSQLConnectionConfig): + """ + Fabric Warehouse Connection Configuration. Inherits most settings from MSSQLConnectionConfig. + """ + + type_: t.Literal["fabric_warehouse"] = Field(alias="type", default="fabric_warehouse") # type: ignore + autocommit: t.Optional[bool] = True + + @property + def _engine_adapter(self) -> t.Type[EngineAdapter]: + from sqlmesh.core.engine_adapter.fabric_warehouse import FabricWarehouseAdapter + + return FabricWarehouseAdapter + + @property + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + return { + "database": self.database, + "catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, + } + + class SparkConnectionConfig(ConnectionConfig): """ Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks. diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index 19332dc005..b876c3b924 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -19,6 +19,7 @@ from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter +from sqlmesh.core.engine_adapter.fabric_warehouse import FabricWarehouseAdapter DIALECT_TO_ENGINE_ADAPTER = { "hive": SparkEngineAdapter, @@ -35,6 +36,7 @@ "trino": TrinoEngineAdapter, "athena": AthenaEngineAdapter, "risingwave": RisingwaveEngineAdapter, + "fabric_warehouse": FabricWarehouseAdapter, } DIALECT_ALIASES = { @@ -45,9 +47,11 @@ def create_engine_adapter( connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any ) -> EngineAdapter: + print(kwargs) dialect = dialect.lower() dialect = DIALECT_ALIASES.get(dialect, dialect) engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect) + print(engine_adapter) if engine_adapter is None: return EngineAdapter(connection_factory, dialect, **kwargs) if engine_adapter is EngineAdapterWithIndexSupport: diff --git a/sqlmesh/core/engine_adapter/fabric_warehouse.py b/sqlmesh/core/engine_adapter/fabric_warehouse.py new file mode 100644 index 0000000000..037f827366 --- /dev/null +++ b/sqlmesh/core/engine_adapter/fabric_warehouse.py @@ -0,0 +1,233 @@ +from __future__ import annotations + +import typing as t +from sqlglot import exp +from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter +from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery + +if t.TYPE_CHECKING: + from sqlmesh.core._typing import SchemaName, TableName + from sqlmesh.core.engine_adapter._typing import QueryOrDF + + +class FabricWarehouseAdapter(MSSQLEngineAdapter): + """ + Adapter for Microsoft Fabric Warehouses. + """ + + DIALECT = "tsql" + SUPPORTS_INDEXES = False + SUPPORTS_TRANSACTIONS = False + + INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT + + def __init__(self, *args: t.Any, **kwargs: t.Any): + self.database = kwargs.get("database") + + super().__init__(*args, **kwargs) + + if not self.database: + raise ValueError( + "The 'database' parameter is required in the connection config for the FabricWarehouseAdapter." + ) + try: + self.execute(f"USE [{self.database}]") + except Exception as e: + raise RuntimeError(f"Failed to set database context to '{self.database}'. Reason: {e}") + + def _get_schema_name(self, name: t.Union[TableName, SchemaName]) -> str: + """Extracts the schema name from a sqlglot object or string.""" + table = exp.to_table(name) + schema_part = table.db + + if isinstance(schema_part, exp.Identifier): + return schema_part.name + if isinstance(schema_part, str): + return schema_part + + if schema_part is None and table.this and table.this.is_identifier: + return table.this.name + + raise ValueError(f"Could not determine schema name from '{name}'") + + def create_schema(self, schema: SchemaName) -> None: + """ + Creates a schema in a Microsoft Fabric Warehouse. + + Overridden to handle Fabric's specific T-SQL requirements. + T-SQL's `CREATE SCHEMA` command does not support `IF NOT EXISTS`, so this + implementation first checks for the schema's existence in the + `INFORMATION_SCHEMA.SCHEMATA` view. + """ + sql = ( + exp.select("1") + .from_(f"{self.database}.INFORMATION_SCHEMA.SCHEMATA") + .where(f"SCHEMA_NAME = '{schema}'") + ) + if self.fetchone(sql): + return + self.execute(f"USE [{self.database}]") + self.execute(f"CREATE SCHEMA [{schema}]") + + def _create_table_from_columns( + self, + table_name: TableName, + columns_to_types: t.Dict[str, exp.DataType], + primary_key: t.Optional[t.Tuple[str, ...]] = None, + exists: bool = True, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + **kwargs: t.Any, + ) -> None: + """ + Creates a table, ensuring the schema exists first and that all + object names are fully qualified with the database. + """ + table_exp = exp.to_table(table_name) + schema_name = self._get_schema_name(table_name) + + self.create_schema(schema_name) + + fully_qualified_table_name = f"[{self.database}].[{schema_name}].[{table_exp.name}]" + + column_defs = ", ".join( + f"[{col}] {kind.sql(dialect=self.dialect)}" for col, kind in columns_to_types.items() + ) + + create_table_sql = f"CREATE TABLE {fully_qualified_table_name} ({column_defs})" + + if not exists: + self.execute(create_table_sql) + return + + if not self.table_exists(table_name): + self.execute(create_table_sql) + + if table_description and self.comments_enabled: + qualified_table_for_comment = self._fully_qualify(table_name) + self._create_table_comment(qualified_table_for_comment, table_description) + if column_descriptions and self.comments_enabled: + self._create_column_comments(qualified_table_for_comment, column_descriptions) + + def table_exists(self, table_name: TableName) -> bool: + """ + Checks if a table exists. + + Overridden to query the uppercase `INFORMATION_SCHEMA` required + by case-sensitive Fabric environments. + """ + table = exp.to_table(table_name) + schema = self._get_schema_name(table_name) + + sql = ( + exp.select("1") + .from_("INFORMATION_SCHEMA.TABLES") + .where(f"TABLE_NAME = '{table.alias_or_name}'") + .where(f"TABLE_SCHEMA = '{schema}'") + ) + + result = self.fetchone(sql, quote_identifiers=True) + + return result[0] == 1 if result else False + + def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table: + """Ensures an object name is prefixed with the configured database.""" + table = exp.to_table(name) + return exp.Table(this=table.this, db=table.db, catalog=exp.to_identifier(self.database)) + + def create_view( + self, + view_name: TableName, + query_or_df: QueryOrDF, + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + replace: bool = True, + materialized: bool = False, + materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, + **create_kwargs: t.Any, + ) -> None: + """ + Creates a view from a query or DataFrame. + + Overridden to ensure that the view name and all tables referenced + in the source query are fully qualified with the database name, + as required by Fabric. + """ + view_schema = self._get_schema_name(view_name) + self.create_schema(view_schema) + + qualified_view_name = self._fully_qualify(view_name) + + if isinstance(query_or_df, exp.Expression): + for table in query_or_df.find_all(exp.Table): + if not table.catalog: + qualified_table = self._fully_qualify(table) + table.replace(qualified_table) + + return super().create_view( + qualified_view_name, + query_or_df, + columns_to_types, + replace, + materialized, + table_description=table_description, + column_descriptions=column_descriptions, + view_properties=view_properties, + **create_kwargs, + ) + + def columns( + self, table_name: TableName, include_pseudo_columns: bool = False + ) -> t.Dict[str, exp.DataType]: + """ + Fetches column names and types for the target table. + + Overridden to query the uppercase `INFORMATION_SCHEMA.COLUMNS` view + required by case-sensitive Fabric environments. + """ + table = exp.to_table(table_name) + schema = self._get_schema_name(table_name) + sql = ( + exp.select("COLUMN_NAME", "DATA_TYPE") + .from_(f"{self.database}.INFORMATION_SCHEMA.COLUMNS") + .where(f"TABLE_NAME = '{table.name}'") + .where(f"TABLE_SCHEMA = '{schema}'") + .order_by("ORDINAL_POSITION") + ) + df = self.fetchdf(sql) + return { + str(row.COLUMN_NAME): exp.DataType.build(str(row.DATA_TYPE), dialect=self.dialect) + for row in df.itertuples() + } + + def _insert_overwrite_by_condition( + self, + table_name: TableName, + source_queries: t.List[SourceQuery], + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + where: t.Optional[exp.Condition] = None, + insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, + **kwargs: t.Any, + ) -> None: + """ + Implements the insert overwrite strategy for Fabric. + + Overridden to enforce a `DELETE`/`INSERT` strategy, as Fabric's + `MERGE` statement has limitations. + """ + + columns_to_types = columns_to_types or self.columns(table_name) + + self.delete_from(table_name, where=where or exp.true()) + + for source_query in source_queries: + with source_query as query: + query = self._order_projections_and_filter(query, columns_to_types) + self._insert_append_query( + table_name, + query, + columns_to_types=columns_to_types, + order_projections=False, + ) From 9a6c5755086afdf634f63ff3b0969cdace7a9ee9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Tue, 17 Jun 2025 00:51:12 +0200 Subject: [PATCH 02/16] removing some print statements --- sqlmesh/core/engine_adapter/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index b876c3b924..27a2be1e32 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -47,11 +47,9 @@ def create_engine_adapter( connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any ) -> EngineAdapter: - print(kwargs) dialect = dialect.lower() dialect = DIALECT_ALIASES.get(dialect, dialect) engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect) - print(engine_adapter) if engine_adapter is None: return EngineAdapter(connection_factory, dialect, **kwargs) if engine_adapter is EngineAdapterWithIndexSupport: From 347d3ed69bf96eaeb736b3569c068963f2fa3b24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Wed, 18 Jun 2025 00:10:54 +0200 Subject: [PATCH 03/16] adding dialect & handling temp views --- sqlmesh/core/config/connection.py | 16 +- sqlmesh/core/engine_adapter/__init__.py | 6 +- sqlmesh/core/engine_adapter/fabric.py | 482 ++++++++++++++++++ .../core/engine_adapter/fabric_warehouse.py | 233 --------- 4 files changed, 497 insertions(+), 240 deletions(-) create mode 100644 sqlmesh/core/engine_adapter/fabric.py delete mode 100644 sqlmesh/core/engine_adapter/fabric_warehouse.py diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 3452ee5ba8..5cbd35487c 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1587,22 +1587,28 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} -class FabricWarehouseConnectionConfig(MSSQLConnectionConfig): +class FabricConnectionConfig(MSSQLConnectionConfig): """ - Fabric Warehouse Connection Configuration. Inherits most settings from MSSQLConnectionConfig. + Fabric Connection Configuration. + + Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'. + It is recommended to use the 'pyodbc' driver for Fabric. """ - type_: t.Literal["fabric_warehouse"] = Field(alias="type", default="fabric_warehouse") # type: ignore + type_: t.Literal["fabric"] = Field(alias="type", default="fabric") autocommit: t.Optional[bool] = True @property def _engine_adapter(self) -> t.Type[EngineAdapter]: - from sqlmesh.core.engine_adapter.fabric_warehouse import FabricWarehouseAdapter + # This is the crucial link to the adapter you already created. + from sqlmesh.core.engine_adapter.fabric import FabricAdapter - return FabricWarehouseAdapter + return FabricAdapter @property def _extra_engine_config(self) -> t.Dict[str, t.Any]: + # This ensures the 'database' name from the config is passed + # to the FabricAdapter's constructor. return { "database": self.database, "catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index 27a2be1e32..c8b8299bd1 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -19,7 +19,7 @@ from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter -from sqlmesh.core.engine_adapter.fabric_warehouse import FabricWarehouseAdapter +from sqlmesh.core.engine_adapter.fabric import FabricAdapter DIALECT_TO_ENGINE_ADAPTER = { "hive": SparkEngineAdapter, @@ -36,7 +36,7 @@ "trino": TrinoEngineAdapter, "athena": AthenaEngineAdapter, "risingwave": RisingwaveEngineAdapter, - "fabric_warehouse": FabricWarehouseAdapter, + "fabric": FabricAdapter, } DIALECT_ALIASES = { @@ -47,9 +47,11 @@ def create_engine_adapter( connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any ) -> EngineAdapter: + print(kwargs) dialect = dialect.lower() dialect = DIALECT_ALIASES.get(dialect, dialect) engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect) + print(engine_adapter) if engine_adapter is None: return EngineAdapter(connection_factory, dialect, **kwargs) if engine_adapter is EngineAdapterWithIndexSupport: diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py new file mode 100644 index 0000000000..4865c3c8f5 --- /dev/null +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -0,0 +1,482 @@ +from __future__ import annotations + +import typing as t +from sqlglot import exp +from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter +from sqlmesh.core.engine_adapter.shared import ( + InsertOverwriteStrategy, + SourceQuery, + DataObject, + DataObjectType, +) +import logging +from sqlmesh.core.dialect import to_schema + +logger = logging.getLogger(__name__) +if t.TYPE_CHECKING: + from sqlmesh.core._typing import SchemaName, TableName + from sqlmesh.core.engine_adapter._typing import QueryOrDF + + +class FabricAdapter(MSSQLEngineAdapter): + """ + Adapter for Microsoft Fabric. + """ + + DIALECT = "fabric" + SUPPORTS_INDEXES = False + SUPPORTS_TRANSACTIONS = False + + INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT + + def __init__(self, *args: t.Any, **kwargs: t.Any): + self.database = kwargs.get("database") + + super().__init__(*args, **kwargs) + + if not self.database: + raise ValueError( + "The 'database' parameter is required in the connection config for the FabricWarehouseAdapter." + ) + try: + self.execute(f"USE [{self.database}]") + except Exception as e: + raise RuntimeError(f"Failed to set database context to '{self.database}'. Reason: {e}") + + def _get_schema_name(self, name: t.Union[str, exp.Table, exp.Identifier]) -> t.Optional[str]: + """ + Safely extracts the schema name from a table or schema name, which can be + a string or a sqlglot expression. + + Fabric requires database names to be explicitly specified in many contexts, + including referencing schemas in INFORMATION_SCHEMA. This function helps + in extracting the schema part correctly from potentially qualified names. + """ + table = exp.to_table(name) + + if table.this and table.this.name.startswith("#"): + return None + + schema_part = table.db + + if not schema_part: + return None + + if isinstance(schema_part, exp.Identifier): + return schema_part.name + if isinstance(schema_part, str): + return schema_part + + raise TypeError(f"Unexpected type for schema part: {type(schema_part)}") + + def _get_data_objects( + self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None + ) -> t.List[DataObject]: + """ + Returns all the data objects that exist in the given schema and database. + + Overridden to query `INFORMATION_SCHEMA.TABLES` with explicit database qualification + and preserved casing using `quoted=True`. + """ + import pandas as pd + + catalog = self.get_current_catalog() + + from_table = exp.Table( + this=exp.to_identifier("TABLES", quoted=True), + db=exp.to_identifier("INFORMATION_SCHEMA", quoted=True), + catalog=exp.to_identifier(self.database), + ) + + query = ( + exp.select( + exp.column("TABLE_NAME").as_("name"), + exp.column("TABLE_SCHEMA").as_("schema_name"), + exp.case() + .when(exp.column("TABLE_TYPE").eq("BASE TABLE"), exp.Literal.string("TABLE")) + .else_(exp.column("TABLE_TYPE")) + .as_("type"), + ) + .from_(from_table) + .where(exp.column("TABLE_SCHEMA").eq(str(to_schema(schema_name).db).strip("[]"))) + ) + if object_names: + query = query.where( + exp.column("TABLE_NAME").isin(*(name.strip("[]") for name in object_names)) + ) + + dataframe: pd.DataFrame = self.fetchdf(query) + + return [ + DataObject( + catalog=catalog, + schema=row.schema_name, + name=row.name, + type=DataObjectType.from_str(row.type), + ) + for row in dataframe.itertuples() + ] + + def create_schema( + self, + schema_name: SchemaName, + ignore_if_exists: bool = True, + warn_on_error: bool = True, + **kwargs: t.Any, + ) -> None: + """ + Creates a schema in a Microsoft Fabric Warehouse. + + Overridden to handle Fabric's specific T-SQL requirements. + T-SQL's `CREATE SCHEMA` command does not support `IF NOT EXISTS` directly + as part of the statement in all contexts, and error messages suggest + issues with batching or preceding statements like USE. + """ + if schema_name is None: + return + + schema_name_str = ( + schema_name.name if isinstance(schema_name, exp.Identifier) else str(schema_name) + ) + + if not schema_name_str: + logger.warning("Attempted to create a schema with an empty name. Skipping.") + return + + schema_name_str = schema_name_str.strip('[]"').rstrip(".") + + if not schema_name_str: + logger.warning( + "Attempted to create a schema with an empty name after sanitization. Skipping." + ) + return + + try: + if self.schema_exists(schema_name_str): + if ignore_if_exists: + return + raise RuntimeError(f"Schema '{schema_name_str}' already exists.") + except Exception as e: + if warn_on_error: + logger.warning(f"Failed to check for existence of schema '{schema_name_str}': {e}") + else: + raise + + try: + create_sql = f"CREATE SCHEMA [{schema_name_str}]" + self.execute(create_sql) + except Exception as e: + if "already exists" in str(e).lower() or "There is already an object named" in str(e): + if ignore_if_exists: + return + raise RuntimeError(f"Schema '{schema_name_str}' already exists.") from e + else: + if warn_on_error: + logger.warning(f"Failed to create schema {schema_name_str}. Reason: {e}") + else: + raise RuntimeError(f"Failed to create schema {schema_name_str}.") from e + + def _create_table_from_columns( + self, + table_name: TableName, + columns_to_types: t.Dict[str, exp.DataType], + primary_key: t.Optional[t.Tuple[str, ...]] = None, + exists: bool = True, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + **kwargs: t.Any, + ) -> None: + """ + Creates a table, ensuring the schema exists first and that all + object names are fully qualified with the database. + """ + table_exp = exp.to_table(table_name) + schema_name = self._get_schema_name(table_name) + + self.create_schema(schema_name) + + fully_qualified_table_name = f"[{self.database}].[{schema_name}].[{table_exp.name}]" + + column_defs = ", ".join( + f"[{col}] {kind.sql(dialect=self.dialect)}" for col, kind in columns_to_types.items() + ) + + create_table_sql = f"CREATE TABLE {fully_qualified_table_name} ({column_defs})" + + if not exists: + self.execute(create_table_sql) + return + + if not self.table_exists(table_name): + self.execute(create_table_sql) + + if table_description and self.comments_enabled: + qualified_table_for_comment = self._fully_qualify(table_name) + self._create_table_comment(qualified_table_for_comment, table_description) + if column_descriptions and self.comments_enabled: + self._create_column_comments(qualified_table_for_comment, column_descriptions) + + def table_exists(self, table_name: TableName) -> bool: + """ + Checks if a table exists. + + Overridden to query the uppercase `INFORMATION_SCHEMA` required + by case-sensitive Fabric environments. + """ + table = exp.to_table(table_name) + schema = self._get_schema_name(table_name) + + sql = ( + exp.select("1") + .from_("INFORMATION_SCHEMA.TABLES") + .where(f"TABLE_NAME = '{table.alias_or_name}'") + .where(f"TABLE_SCHEMA = '{schema}'") + ) + + result = self.fetchone(sql, quote_identifiers=True) + + return result[0] == 1 if result else False + + def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table: + """ + Ensures an object name is prefixed with the configured database and schema. + + Overridden to prevent qualification for temporary objects (starting with # or ##). + Temporary objects should not be qualified with database or schema in T-SQL. + """ + table = exp.to_table(name) + + if ( + table.this + and isinstance(table.this, exp.Identifier) + and (table.this.name.startswith("#")) + ): + temp_identifier = exp.Identifier(this=table.this.this, quoted=True) + return exp.Table(this=temp_identifier) + + schema = self._get_schema_name(name) + + return exp.Table( + this=table.this, + db=exp.to_identifier(schema) if schema else None, + catalog=exp.to_identifier(self.database), + ) + + def create_view( + self, + view_name: TableName, + query_or_df: QueryOrDF, + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + replace: bool = True, + materialized: bool = False, + materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, + **create_kwargs: t.Any, + ) -> None: + """ + Creates a view from a query or DataFrame. + + Overridden to ensure that the view name and all tables referenced + in the source query are fully qualified with the database name, + as required by Fabric. + """ + view_schema = self._get_schema_name(view_name) + self.create_schema(view_schema) + + qualified_view_name = self._fully_qualify(view_name) + + if isinstance(query_or_df, exp.Expression): + for table in query_or_df.find_all(exp.Table): + if not table.catalog: + qualified_table = self._fully_qualify(table) + table.replace(qualified_table) + + return super().create_view( + qualified_view_name, + query_or_df, + columns_to_types, + replace, + materialized, + table_description=table_description, + column_descriptions=column_descriptions, + view_properties=view_properties, + **create_kwargs, + ) + + def columns( + self, table_name: TableName, include_pseudo_columns: bool = False + ) -> t.Dict[str, exp.DataType]: + table = exp.to_table(table_name) + schema = self._get_schema_name(table_name) + + if ( + not schema + and table.this + and isinstance(table.this, exp.Identifier) + and table.this.name.startswith("__temp_") + ): + schema = "dbo" + + if not schema: + logger.warning( + f"Cannot fetch columns for table '{table_name}' without a schema name in Fabric." + ) + return {} + + from_table = exp.Table( + this=exp.to_identifier("COLUMNS", quoted=True), + db=exp.to_identifier("INFORMATION_SCHEMA", quoted=True), + catalog=exp.to_identifier(self.database), + ) + + sql = ( + exp.select( + "COLUMN_NAME", + "DATA_TYPE", + "CHARACTER_MAXIMUM_LENGTH", + "NUMERIC_PRECISION", + "NUMERIC_SCALE", + ) + .from_(from_table) + .where(f"TABLE_NAME = '{table.name.strip('[]')}'") + .where(f"TABLE_SCHEMA = '{schema.strip('[]')}'") + .order_by("ORDINAL_POSITION") + ) + + df = self.fetchdf(sql) + + def build_var_length_col( + column_name: str, + data_type: str, + character_maximum_length: t.Optional[int] = None, + numeric_precision: t.Optional[int] = None, + numeric_scale: t.Optional[int] = None, + ) -> t.Tuple[str, str]: + data_type = data_type.lower() + + char_len_int = ( + int(character_maximum_length) if character_maximum_length is not None else None + ) + prec_int = int(numeric_precision) if numeric_precision is not None else None + scale_int = int(numeric_scale) if numeric_scale is not None else None + + if data_type in self.VARIABLE_LENGTH_DATA_TYPES and char_len_int is not None: + if char_len_int > 0: + return (column_name, f"{data_type}({char_len_int})") + if char_len_int == -1: + return (column_name, f"{data_type}(max)") + if ( + data_type in ("decimal", "numeric") + and prec_int is not None + and scale_int is not None + ): + return (column_name, f"{data_type}({prec_int}, {scale_int})") + if data_type == "float" and prec_int is not None: + return (column_name, f"{data_type}({prec_int})") + + return (column_name, data_type) + + columns_raw = [ + ( + row.COLUMN_NAME, + row.DATA_TYPE, + getattr(row, "CHARACTER_MAXIMUM_LENGTH", None), + getattr(row, "NUMERIC_PRECISION", None), + getattr(row, "NUMERIC_SCALE", None), + ) + for row in df.itertuples() + ] + + columns_processed = [build_var_length_col(*row) for row in columns_raw] + + return { + column_name: exp.DataType.build(data_type, dialect=self.dialect) + for column_name, data_type in columns_processed + } + + def create_schema( + self, + schema_name: SchemaName, + ignore_if_exists: bool = True, + warn_on_error: bool = True, + **kwargs: t.Any, + ) -> None: + if schema_name is None: + return + + schema_exp = to_schema(schema_name) + simple_schema_name_str = None + if schema_exp.db: + simple_schema_name_str = exp.to_identifier(schema_exp.db).name + + if not simple_schema_name_str: + logger.warning( + f"Could not determine simple schema name from '{schema_name}'. Skipping schema creation." + ) + return + + if ignore_if_exists: + try: + if self.schema_exists(simple_schema_name_str): + return + except Exception as e: + if warn_on_error: + logger.warning( + f"Failed to check for existence of schema '{simple_schema_name_str}': {e}" + ) + else: + raise + elif self.schema_exists(simple_schema_name_str): + raise RuntimeError(f"Schema '{simple_schema_name_str}' already exists.") + + try: + create_sql = f"CREATE SCHEMA [{simple_schema_name_str}]" + self.execute(create_sql) + except Exception as e: + error_message = str(e).lower() + if ( + "already exists" in error_message + or "there is already an object named" in error_message + ): + if ignore_if_exists: + return + raise RuntimeError( + f"Schema '{simple_schema_name_str}' already exists due to race condition." + ) from e + else: + if warn_on_error: + logger.warning(f"Failed to create schema {simple_schema_name_str}. Reason: {e}") + else: + raise RuntimeError(f"Failed to create schema {simple_schema_name_str}.") from e + + def _insert_overwrite_by_condition( + self, + table_name: TableName, + source_queries: t.List[SourceQuery], + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + where: t.Optional[exp.Condition] = None, + insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, + **kwargs: t.Any, + ) -> None: + """ + Implements the insert overwrite strategy for Fabric. + + Overridden to enforce a `DELETE`/`INSERT` strategy, as Fabric's + `MERGE` statement has limitations. + """ + + columns_to_types = columns_to_types or self.columns(table_name) + + self.delete_from(table_name, where=where or exp.true()) + + for source_query in source_queries: + with source_query as query: + query = self._order_projections_and_filter(query, columns_to_types) + self._insert_append_query( + table_name, + query, + columns_to_types=columns_to_types, + order_projections=False, + ) diff --git a/sqlmesh/core/engine_adapter/fabric_warehouse.py b/sqlmesh/core/engine_adapter/fabric_warehouse.py deleted file mode 100644 index 037f827366..0000000000 --- a/sqlmesh/core/engine_adapter/fabric_warehouse.py +++ /dev/null @@ -1,233 +0,0 @@ -from __future__ import annotations - -import typing as t -from sqlglot import exp -from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter -from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery - -if t.TYPE_CHECKING: - from sqlmesh.core._typing import SchemaName, TableName - from sqlmesh.core.engine_adapter._typing import QueryOrDF - - -class FabricWarehouseAdapter(MSSQLEngineAdapter): - """ - Adapter for Microsoft Fabric Warehouses. - """ - - DIALECT = "tsql" - SUPPORTS_INDEXES = False - SUPPORTS_TRANSACTIONS = False - - INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT - - def __init__(self, *args: t.Any, **kwargs: t.Any): - self.database = kwargs.get("database") - - super().__init__(*args, **kwargs) - - if not self.database: - raise ValueError( - "The 'database' parameter is required in the connection config for the FabricWarehouseAdapter." - ) - try: - self.execute(f"USE [{self.database}]") - except Exception as e: - raise RuntimeError(f"Failed to set database context to '{self.database}'. Reason: {e}") - - def _get_schema_name(self, name: t.Union[TableName, SchemaName]) -> str: - """Extracts the schema name from a sqlglot object or string.""" - table = exp.to_table(name) - schema_part = table.db - - if isinstance(schema_part, exp.Identifier): - return schema_part.name - if isinstance(schema_part, str): - return schema_part - - if schema_part is None and table.this and table.this.is_identifier: - return table.this.name - - raise ValueError(f"Could not determine schema name from '{name}'") - - def create_schema(self, schema: SchemaName) -> None: - """ - Creates a schema in a Microsoft Fabric Warehouse. - - Overridden to handle Fabric's specific T-SQL requirements. - T-SQL's `CREATE SCHEMA` command does not support `IF NOT EXISTS`, so this - implementation first checks for the schema's existence in the - `INFORMATION_SCHEMA.SCHEMATA` view. - """ - sql = ( - exp.select("1") - .from_(f"{self.database}.INFORMATION_SCHEMA.SCHEMATA") - .where(f"SCHEMA_NAME = '{schema}'") - ) - if self.fetchone(sql): - return - self.execute(f"USE [{self.database}]") - self.execute(f"CREATE SCHEMA [{schema}]") - - def _create_table_from_columns( - self, - table_name: TableName, - columns_to_types: t.Dict[str, exp.DataType], - primary_key: t.Optional[t.Tuple[str, ...]] = None, - exists: bool = True, - table_description: t.Optional[str] = None, - column_descriptions: t.Optional[t.Dict[str, str]] = None, - **kwargs: t.Any, - ) -> None: - """ - Creates a table, ensuring the schema exists first and that all - object names are fully qualified with the database. - """ - table_exp = exp.to_table(table_name) - schema_name = self._get_schema_name(table_name) - - self.create_schema(schema_name) - - fully_qualified_table_name = f"[{self.database}].[{schema_name}].[{table_exp.name}]" - - column_defs = ", ".join( - f"[{col}] {kind.sql(dialect=self.dialect)}" for col, kind in columns_to_types.items() - ) - - create_table_sql = f"CREATE TABLE {fully_qualified_table_name} ({column_defs})" - - if not exists: - self.execute(create_table_sql) - return - - if not self.table_exists(table_name): - self.execute(create_table_sql) - - if table_description and self.comments_enabled: - qualified_table_for_comment = self._fully_qualify(table_name) - self._create_table_comment(qualified_table_for_comment, table_description) - if column_descriptions and self.comments_enabled: - self._create_column_comments(qualified_table_for_comment, column_descriptions) - - def table_exists(self, table_name: TableName) -> bool: - """ - Checks if a table exists. - - Overridden to query the uppercase `INFORMATION_SCHEMA` required - by case-sensitive Fabric environments. - """ - table = exp.to_table(table_name) - schema = self._get_schema_name(table_name) - - sql = ( - exp.select("1") - .from_("INFORMATION_SCHEMA.TABLES") - .where(f"TABLE_NAME = '{table.alias_or_name}'") - .where(f"TABLE_SCHEMA = '{schema}'") - ) - - result = self.fetchone(sql, quote_identifiers=True) - - return result[0] == 1 if result else False - - def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table: - """Ensures an object name is prefixed with the configured database.""" - table = exp.to_table(name) - return exp.Table(this=table.this, db=table.db, catalog=exp.to_identifier(self.database)) - - def create_view( - self, - view_name: TableName, - query_or_df: QueryOrDF, - columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, - replace: bool = True, - materialized: bool = False, - materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, - table_description: t.Optional[str] = None, - column_descriptions: t.Optional[t.Dict[str, str]] = None, - view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, - **create_kwargs: t.Any, - ) -> None: - """ - Creates a view from a query or DataFrame. - - Overridden to ensure that the view name and all tables referenced - in the source query are fully qualified with the database name, - as required by Fabric. - """ - view_schema = self._get_schema_name(view_name) - self.create_schema(view_schema) - - qualified_view_name = self._fully_qualify(view_name) - - if isinstance(query_or_df, exp.Expression): - for table in query_or_df.find_all(exp.Table): - if not table.catalog: - qualified_table = self._fully_qualify(table) - table.replace(qualified_table) - - return super().create_view( - qualified_view_name, - query_or_df, - columns_to_types, - replace, - materialized, - table_description=table_description, - column_descriptions=column_descriptions, - view_properties=view_properties, - **create_kwargs, - ) - - def columns( - self, table_name: TableName, include_pseudo_columns: bool = False - ) -> t.Dict[str, exp.DataType]: - """ - Fetches column names and types for the target table. - - Overridden to query the uppercase `INFORMATION_SCHEMA.COLUMNS` view - required by case-sensitive Fabric environments. - """ - table = exp.to_table(table_name) - schema = self._get_schema_name(table_name) - sql = ( - exp.select("COLUMN_NAME", "DATA_TYPE") - .from_(f"{self.database}.INFORMATION_SCHEMA.COLUMNS") - .where(f"TABLE_NAME = '{table.name}'") - .where(f"TABLE_SCHEMA = '{schema}'") - .order_by("ORDINAL_POSITION") - ) - df = self.fetchdf(sql) - return { - str(row.COLUMN_NAME): exp.DataType.build(str(row.DATA_TYPE), dialect=self.dialect) - for row in df.itertuples() - } - - def _insert_overwrite_by_condition( - self, - table_name: TableName, - source_queries: t.List[SourceQuery], - columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, - where: t.Optional[exp.Condition] = None, - insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, - **kwargs: t.Any, - ) -> None: - """ - Implements the insert overwrite strategy for Fabric. - - Overridden to enforce a `DELETE`/`INSERT` strategy, as Fabric's - `MERGE` statement has limitations. - """ - - columns_to_types = columns_to_types or self.columns(table_name) - - self.delete_from(table_name, where=where or exp.true()) - - for source_query in source_queries: - with source_query as query: - query = self._order_projections_and_filter(query, columns_to_types) - self._insert_append_query( - table_name, - query, - columns_to_types=columns_to_types, - order_projections=False, - ) From 0ff075ce2b3f3ebac0de9f2603101108db99924e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Wed, 18 Jun 2025 11:21:47 +0200 Subject: [PATCH 04/16] isnan error --- sqlmesh/core/config/connection.py | 5 +- sqlmesh/core/engine_adapter/__init__.py | 2 - sqlmesh/core/engine_adapter/fabric.py | 160 ++++++++++-------------- 3 files changed, 65 insertions(+), 102 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 5cbd35487c..cc26e63242 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1595,20 +1595,17 @@ class FabricConnectionConfig(MSSQLConnectionConfig): It is recommended to use the 'pyodbc' driver for Fabric. """ - type_: t.Literal["fabric"] = Field(alias="type", default="fabric") + type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore autocommit: t.Optional[bool] = True @property def _engine_adapter(self) -> t.Type[EngineAdapter]: - # This is the crucial link to the adapter you already created. from sqlmesh.core.engine_adapter.fabric import FabricAdapter return FabricAdapter @property def _extra_engine_config(self) -> t.Dict[str, t.Any]: - # This ensures the 'database' name from the config is passed - # to the FabricAdapter's constructor. return { "database": self.database, "catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index c8b8299bd1..337de39905 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -47,11 +47,9 @@ def create_engine_adapter( connection_factory: t.Callable[[], t.Any], dialect: str, **kwargs: t.Any ) -> EngineAdapter: - print(kwargs) dialect = dialect.lower() dialect = DIALECT_ALIASES.get(dialect, dialect) engine_adapter = DIALECT_TO_ENGINE_ADAPTER.get(dialect) - print(engine_adapter) if engine_adapter is None: return EngineAdapter(connection_factory, dialect, **kwargs) if engine_adapter is EngineAdapterWithIndexSupport: diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 4865c3c8f5..1f21ffbf26 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -43,7 +43,7 @@ def __init__(self, *args: t.Any, **kwargs: t.Any): except Exception as e: raise RuntimeError(f"Failed to set database context to '{self.database}'. Reason: {e}") - def _get_schema_name(self, name: t.Union[str, exp.Table, exp.Identifier]) -> t.Optional[str]: + def _get_schema_name(self, name: t.Union[str, exp.Table]) -> t.Optional[str]: """ Safely extracts the schema name from a table or schema name, which can be a string or a sqlglot expression. @@ -112,14 +112,31 @@ def _get_data_objects( catalog=catalog, schema=row.schema_name, name=row.name, - type=DataObjectType.from_str(row.type), + type=DataObjectType.from_str(str(row.type)), ) for row in dataframe.itertuples() ] + def schema_exists(self, schema_name: SchemaName) -> bool: + """ + Checks if a schema exists. + """ + schema = exp.to_table(schema_name).db + if not schema: + return False + + sql = ( + exp.select("1") + .from_("INFORMATION_SCHEMA.SCHEMATA") + .where(f"SCHEMA_NAME = '{schema}'") + .where(f"CATALOG_NAME = '{self.database}'") + ) + result = self.fetchone(sql, quote_identifiers=True) + return result[0] == 1 if result else False + def create_schema( self, - schema_name: SchemaName, + schema_name: t.Optional[SchemaName], ignore_if_exists: bool = True, warn_on_error: bool = True, **kwargs: t.Any, @@ -128,53 +145,51 @@ def create_schema( Creates a schema in a Microsoft Fabric Warehouse. Overridden to handle Fabric's specific T-SQL requirements. - T-SQL's `CREATE SCHEMA` command does not support `IF NOT EXISTS` directly - as part of the statement in all contexts, and error messages suggest - issues with batching or preceding statements like USE. """ - if schema_name is None: + if not schema_name: return - schema_name_str = ( - schema_name.name if isinstance(schema_name, exp.Identifier) else str(schema_name) - ) - - if not schema_name_str: - logger.warning("Attempted to create a schema with an empty name. Skipping.") - return - - schema_name_str = schema_name_str.strip('[]"').rstrip(".") + schema_exp = to_schema(schema_name) + simple_schema_name_str = exp.to_identifier(schema_exp.db).name if schema_exp.db else None - if not schema_name_str: + if not simple_schema_name_str: logger.warning( - "Attempted to create a schema with an empty name after sanitization. Skipping." + f"Could not determine simple schema name from '{schema_name}'. Skipping schema creation." ) return try: - if self.schema_exists(schema_name_str): + if self.schema_exists(simple_schema_name_str): if ignore_if_exists: return - raise RuntimeError(f"Schema '{schema_name_str}' already exists.") + raise RuntimeError(f"Schema '{simple_schema_name_str}' already exists.") except Exception as e: if warn_on_error: - logger.warning(f"Failed to check for existence of schema '{schema_name_str}': {e}") + logger.warning( + f"Failed to check for existence of schema '{simple_schema_name_str}': {e}" + ) else: raise try: - create_sql = f"CREATE SCHEMA [{schema_name_str}]" + create_sql = f"CREATE SCHEMA [{simple_schema_name_str}]" self.execute(create_sql) except Exception as e: - if "already exists" in str(e).lower() or "There is already an object named" in str(e): + error_message = str(e).lower() + if ( + "already exists" in error_message + or "there is already an object named" in error_message + ): if ignore_if_exists: return - raise RuntimeError(f"Schema '{schema_name_str}' already exists.") from e + raise RuntimeError( + f"Schema '{simple_schema_name_str}' already exists due to race condition." + ) from e else: if warn_on_error: - logger.warning(f"Failed to create schema {schema_name_str}. Reason: {e}") + logger.warning(f"Failed to create schema {simple_schema_name_str}. Reason: {e}") else: - raise RuntimeError(f"Failed to create schema {schema_name_str}.") from e + raise RuntimeError(f"Failed to create schema {simple_schema_name_str}.") from e def _create_table_from_columns( self, @@ -251,7 +266,7 @@ def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table: and isinstance(table.this, exp.Identifier) and (table.this.name.startswith("#")) ): - temp_identifier = exp.Identifier(this=table.this.this, quoted=True) + temp_identifier = exp.Identifier(this=table.this.name, quoted=True) return exp.Table(this=temp_identifier) schema = self._get_schema_name(name) @@ -308,6 +323,8 @@ def create_view( def columns( self, table_name: TableName, include_pseudo_columns: bool = False ) -> t.Dict[str, exp.DataType]: + import numpy as np + table = exp.to_table(table_name) schema = self._get_schema_name(table_name) @@ -346,6 +363,7 @@ def columns( ) df = self.fetchdf(sql) + df = df.replace({np.nan: None}) def build_var_length_col( column_name: str, @@ -356,11 +374,9 @@ def build_var_length_col( ) -> t.Tuple[str, str]: data_type = data_type.lower() - char_len_int = ( - int(character_maximum_length) if character_maximum_length is not None else None - ) - prec_int = int(numeric_precision) if numeric_precision is not None else None - scale_int = int(numeric_scale) if numeric_scale is not None else None + char_len_int = character_maximum_length + prec_int = numeric_precision + scale_int = numeric_scale if data_type in self.VARIABLE_LENGTH_DATA_TYPES and char_len_int is not None: if char_len_int > 0: @@ -378,79 +394,31 @@ def build_var_length_col( return (column_name, data_type) - columns_raw = [ - ( - row.COLUMN_NAME, - row.DATA_TYPE, - getattr(row, "CHARACTER_MAXIMUM_LENGTH", None), - getattr(row, "NUMERIC_PRECISION", None), - getattr(row, "NUMERIC_SCALE", None), + def _to_optional_int(val: t.Any) -> t.Optional[int]: + """Safely convert DataFrame values to Optional[int] for mypy.""" + if val is None: + return None + try: + return int(val) + except (ValueError, TypeError): + return None + + columns_processed = [ + build_var_length_col( + str(row.COLUMN_NAME), + str(row.DATA_TYPE), + _to_optional_int(row.CHARACTER_MAXIMUM_LENGTH), + _to_optional_int(row.NUMERIC_PRECISION), + _to_optional_int(row.NUMERIC_SCALE), ) for row in df.itertuples() ] - columns_processed = [build_var_length_col(*row) for row in columns_raw] - return { column_name: exp.DataType.build(data_type, dialect=self.dialect) for column_name, data_type in columns_processed } - def create_schema( - self, - schema_name: SchemaName, - ignore_if_exists: bool = True, - warn_on_error: bool = True, - **kwargs: t.Any, - ) -> None: - if schema_name is None: - return - - schema_exp = to_schema(schema_name) - simple_schema_name_str = None - if schema_exp.db: - simple_schema_name_str = exp.to_identifier(schema_exp.db).name - - if not simple_schema_name_str: - logger.warning( - f"Could not determine simple schema name from '{schema_name}'. Skipping schema creation." - ) - return - - if ignore_if_exists: - try: - if self.schema_exists(simple_schema_name_str): - return - except Exception as e: - if warn_on_error: - logger.warning( - f"Failed to check for existence of schema '{simple_schema_name_str}': {e}" - ) - else: - raise - elif self.schema_exists(simple_schema_name_str): - raise RuntimeError(f"Schema '{simple_schema_name_str}' already exists.") - - try: - create_sql = f"CREATE SCHEMA [{simple_schema_name_str}]" - self.execute(create_sql) - except Exception as e: - error_message = str(e).lower() - if ( - "already exists" in error_message - or "there is already an object named" in error_message - ): - if ignore_if_exists: - return - raise RuntimeError( - f"Schema '{simple_schema_name_str}' already exists due to race condition." - ) from e - else: - if warn_on_error: - logger.warning(f"Failed to create schema {simple_schema_name_str}. Reason: {e}") - else: - raise RuntimeError(f"Failed to create schema {simple_schema_name_str}.") from e - def _insert_overwrite_by_condition( self, table_name: TableName, From 332ea32caa898c8f4e98b28d4ecfb381a553ef73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Thu, 19 Jun 2025 13:04:54 +0200 Subject: [PATCH 05/16] CTEs no qualify --- sqlmesh/core/engine_adapter/fabric.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 1f21ffbf26..9f37e8b14f 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -303,7 +303,14 @@ def create_view( qualified_view_name = self._fully_qualify(view_name) if isinstance(query_or_df, exp.Expression): + # CTEs should not be qualified with the database name. + cte_names = {cte.alias_or_name for cte in query_or_df.find_all(exp.CTE)} + for table in query_or_df.find_all(exp.Table): + if table.this.name in cte_names: + continue + + # Qualify all other tables that don't already have a catalog. if not table.catalog: qualified_table = self._fully_qualify(table) table.replace(qualified_table) From 585fb7e403b950034ac3cd97c7ec516e5fe54095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Mon, 23 Jun 2025 20:44:43 +0200 Subject: [PATCH 06/16] simplifying --- sqlmesh/core/config/connection.py | 9 +- sqlmesh/core/engine_adapter/fabric.py | 392 +++----------------------- 2 files changed, 40 insertions(+), 361 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index cc26e63242..9e95e9ae78 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -43,7 +43,14 @@ logger = logging.getLogger(__name__) -RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"} +RECOMMENDED_STATE_SYNC_ENGINES = { + "postgres", + "gcp_postgres", + "mysql", + "mssql", + "azuresql", + "fabric", +} FORBIDDEN_STATE_SYNC_ENGINES = { # Do not support row-level operations "spark", diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 9f37e8b14f..a4eb30a91d 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -3,19 +3,10 @@ import typing as t from sqlglot import exp from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter -from sqlmesh.core.engine_adapter.shared import ( - InsertOverwriteStrategy, - SourceQuery, - DataObject, - DataObjectType, -) -import logging -from sqlmesh.core.dialect import to_schema +from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery -logger = logging.getLogger(__name__) if t.TYPE_CHECKING: - from sqlmesh.core._typing import SchemaName, TableName - from sqlmesh.core.engine_adapter._typing import QueryOrDF + from sqlmesh.core._typing import TableName class FabricAdapter(MSSQLEngineAdapter): @@ -26,334 +17,35 @@ class FabricAdapter(MSSQLEngineAdapter): DIALECT = "fabric" SUPPORTS_INDEXES = False SUPPORTS_TRANSACTIONS = False - INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT - def __init__(self, *args: t.Any, **kwargs: t.Any): - self.database = kwargs.get("database") - - super().__init__(*args, **kwargs) - - if not self.database: - raise ValueError( - "The 'database' parameter is required in the connection config for the FabricWarehouseAdapter." - ) - try: - self.execute(f"USE [{self.database}]") - except Exception as e: - raise RuntimeError(f"Failed to set database context to '{self.database}'. Reason: {e}") - - def _get_schema_name(self, name: t.Union[str, exp.Table]) -> t.Optional[str]: - """ - Safely extracts the schema name from a table or schema name, which can be - a string or a sqlglot expression. - - Fabric requires database names to be explicitly specified in many contexts, - including referencing schemas in INFORMATION_SCHEMA. This function helps - in extracting the schema part correctly from potentially qualified names. - """ - table = exp.to_table(name) - - if table.this and table.this.name.startswith("#"): - return None - - schema_part = table.db - - if not schema_part: - return None - - if isinstance(schema_part, exp.Identifier): - return schema_part.name - if isinstance(schema_part, str): - return schema_part - - raise TypeError(f"Unexpected type for schema part: {type(schema_part)}") - - def _get_data_objects( - self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None - ) -> t.List[DataObject]: - """ - Returns all the data objects that exist in the given schema and database. - - Overridden to query `INFORMATION_SCHEMA.TABLES` with explicit database qualification - and preserved casing using `quoted=True`. - """ - import pandas as pd - - catalog = self.get_current_catalog() - - from_table = exp.Table( - this=exp.to_identifier("TABLES", quoted=True), - db=exp.to_identifier("INFORMATION_SCHEMA", quoted=True), - catalog=exp.to_identifier(self.database), - ) - - query = ( - exp.select( - exp.column("TABLE_NAME").as_("name"), - exp.column("TABLE_SCHEMA").as_("schema_name"), - exp.case() - .when(exp.column("TABLE_TYPE").eq("BASE TABLE"), exp.Literal.string("TABLE")) - .else_(exp.column("TABLE_TYPE")) - .as_("type"), - ) - .from_(from_table) - .where(exp.column("TABLE_SCHEMA").eq(str(to_schema(schema_name).db).strip("[]"))) - ) - if object_names: - query = query.where( - exp.column("TABLE_NAME").isin(*(name.strip("[]") for name in object_names)) - ) - - dataframe: pd.DataFrame = self.fetchdf(query) - - return [ - DataObject( - catalog=catalog, - schema=row.schema_name, - name=row.name, - type=DataObjectType.from_str(str(row.type)), - ) - for row in dataframe.itertuples() - ] - - def schema_exists(self, schema_name: SchemaName) -> bool: - """ - Checks if a schema exists. - """ - schema = exp.to_table(schema_name).db - if not schema: - return False - - sql = ( - exp.select("1") - .from_("INFORMATION_SCHEMA.SCHEMATA") - .where(f"SCHEMA_NAME = '{schema}'") - .where(f"CATALOG_NAME = '{self.database}'") - ) - result = self.fetchone(sql, quote_identifiers=True) - return result[0] == 1 if result else False - - def create_schema( - self, - schema_name: t.Optional[SchemaName], - ignore_if_exists: bool = True, - warn_on_error: bool = True, - **kwargs: t.Any, - ) -> None: - """ - Creates a schema in a Microsoft Fabric Warehouse. - - Overridden to handle Fabric's specific T-SQL requirements. - """ - if not schema_name: - return - - schema_exp = to_schema(schema_name) - simple_schema_name_str = exp.to_identifier(schema_exp.db).name if schema_exp.db else None - - if not simple_schema_name_str: - logger.warning( - f"Could not determine simple schema name from '{schema_name}'. Skipping schema creation." - ) - return - - try: - if self.schema_exists(simple_schema_name_str): - if ignore_if_exists: - return - raise RuntimeError(f"Schema '{simple_schema_name_str}' already exists.") - except Exception as e: - if warn_on_error: - logger.warning( - f"Failed to check for existence of schema '{simple_schema_name_str}': {e}" - ) - else: - raise - - try: - create_sql = f"CREATE SCHEMA [{simple_schema_name_str}]" - self.execute(create_sql) - except Exception as e: - error_message = str(e).lower() - if ( - "already exists" in error_message - or "there is already an object named" in error_message - ): - if ignore_if_exists: - return - raise RuntimeError( - f"Schema '{simple_schema_name_str}' already exists due to race condition." - ) from e - else: - if warn_on_error: - logger.warning(f"Failed to create schema {simple_schema_name_str}. Reason: {e}") - else: - raise RuntimeError(f"Failed to create schema {simple_schema_name_str}.") from e - - def _create_table_from_columns( - self, - table_name: TableName, - columns_to_types: t.Dict[str, exp.DataType], - primary_key: t.Optional[t.Tuple[str, ...]] = None, - exists: bool = True, - table_description: t.Optional[str] = None, - column_descriptions: t.Optional[t.Dict[str, str]] = None, - **kwargs: t.Any, - ) -> None: - """ - Creates a table, ensuring the schema exists first and that all - object names are fully qualified with the database. - """ - table_exp = exp.to_table(table_name) - schema_name = self._get_schema_name(table_name) - - self.create_schema(schema_name) - - fully_qualified_table_name = f"[{self.database}].[{schema_name}].[{table_exp.name}]" - - column_defs = ", ".join( - f"[{col}] {kind.sql(dialect=self.dialect)}" for col, kind in columns_to_types.items() - ) - - create_table_sql = f"CREATE TABLE {fully_qualified_table_name} ({column_defs})" - - if not exists: - self.execute(create_table_sql) - return - - if not self.table_exists(table_name): - self.execute(create_table_sql) - - if table_description and self.comments_enabled: - qualified_table_for_comment = self._fully_qualify(table_name) - self._create_table_comment(qualified_table_for_comment, table_description) - if column_descriptions and self.comments_enabled: - self._create_column_comments(qualified_table_for_comment, column_descriptions) - def table_exists(self, table_name: TableName) -> bool: """ Checks if a table exists. - Overridden to query the uppercase `INFORMATION_SCHEMA` required + Querying the uppercase `INFORMATION_SCHEMA` required by case-sensitive Fabric environments. """ table = exp.to_table(table_name) - schema = self._get_schema_name(table_name) - sql = ( exp.select("1") .from_("INFORMATION_SCHEMA.TABLES") .where(f"TABLE_NAME = '{table.alias_or_name}'") - .where(f"TABLE_SCHEMA = '{schema}'") + .where(f"TABLE_SCHEMA = '{table.db}'") ) result = self.fetchone(sql, quote_identifiers=True) return result[0] == 1 if result else False - def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table: - """ - Ensures an object name is prefixed with the configured database and schema. - - Overridden to prevent qualification for temporary objects (starting with # or ##). - Temporary objects should not be qualified with database or schema in T-SQL. - """ - table = exp.to_table(name) - - if ( - table.this - and isinstance(table.this, exp.Identifier) - and (table.this.name.startswith("#")) - ): - temp_identifier = exp.Identifier(this=table.this.name, quoted=True) - return exp.Table(this=temp_identifier) - - schema = self._get_schema_name(name) - - return exp.Table( - this=table.this, - db=exp.to_identifier(schema) if schema else None, - catalog=exp.to_identifier(self.database), - ) - - def create_view( - self, - view_name: TableName, - query_or_df: QueryOrDF, - columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, - replace: bool = True, - materialized: bool = False, - materialized_properties: t.Optional[t.Dict[str, t.Any]] = None, - table_description: t.Optional[str] = None, - column_descriptions: t.Optional[t.Dict[str, str]] = None, - view_properties: t.Optional[t.Dict[str, exp.Expression]] = None, - **create_kwargs: t.Any, - ) -> None: - """ - Creates a view from a query or DataFrame. - - Overridden to ensure that the view name and all tables referenced - in the source query are fully qualified with the database name, - as required by Fabric. - """ - view_schema = self._get_schema_name(view_name) - self.create_schema(view_schema) - - qualified_view_name = self._fully_qualify(view_name) - - if isinstance(query_or_df, exp.Expression): - # CTEs should not be qualified with the database name. - cte_names = {cte.alias_or_name for cte in query_or_df.find_all(exp.CTE)} - - for table in query_or_df.find_all(exp.Table): - if table.this.name in cte_names: - continue - - # Qualify all other tables that don't already have a catalog. - if not table.catalog: - qualified_table = self._fully_qualify(table) - table.replace(qualified_table) - - return super().create_view( - qualified_view_name, - query_or_df, - columns_to_types, - replace, - materialized, - table_description=table_description, - column_descriptions=column_descriptions, - view_properties=view_properties, - **create_kwargs, - ) - def columns( - self, table_name: TableName, include_pseudo_columns: bool = False + self, + table_name: TableName, + include_pseudo_columns: bool = True, ) -> t.Dict[str, exp.DataType]: - import numpy as np + """Fabric doesn't support describe so we query INFORMATION_SCHEMA.""" table = exp.to_table(table_name) - schema = self._get_schema_name(table_name) - - if ( - not schema - and table.this - and isinstance(table.this, exp.Identifier) - and table.this.name.startswith("__temp_") - ): - schema = "dbo" - - if not schema: - logger.warning( - f"Cannot fetch columns for table '{table_name}' without a schema name in Fabric." - ) - return {} - - from_table = exp.Table( - this=exp.to_identifier("COLUMNS", quoted=True), - db=exp.to_identifier("INFORMATION_SCHEMA", quoted=True), - catalog=exp.to_identifier(self.database), - ) sql = ( exp.select( @@ -363,14 +55,14 @@ def columns( "NUMERIC_PRECISION", "NUMERIC_SCALE", ) - .from_(from_table) - .where(f"TABLE_NAME = '{table.name.strip('[]')}'") - .where(f"TABLE_SCHEMA = '{schema.strip('[]')}'") - .order_by("ORDINAL_POSITION") + .from_("INFORMATION_SCHEMA.COLUMNS") + .where(f"TABLE_NAME = '{table.name}'") ) + database_name = table.db + if database_name: + sql = sql.where(f"TABLE_SCHEMA = '{database_name}'") - df = self.fetchdf(sql) - df = df.replace({np.nan: None}) + columns_raw = self.fetchall(sql, quote_identifiers=True) def build_var_length_col( column_name: str, @@ -378,52 +70,32 @@ def build_var_length_col( character_maximum_length: t.Optional[int] = None, numeric_precision: t.Optional[int] = None, numeric_scale: t.Optional[int] = None, - ) -> t.Tuple[str, str]: + ) -> tuple: data_type = data_type.lower() - - char_len_int = character_maximum_length - prec_int = numeric_precision - scale_int = numeric_scale - - if data_type in self.VARIABLE_LENGTH_DATA_TYPES and char_len_int is not None: - if char_len_int > 0: - return (column_name, f"{data_type}({char_len_int})") - if char_len_int == -1: - return (column_name, f"{data_type}(max)") if ( - data_type in ("decimal", "numeric") - and prec_int is not None - and scale_int is not None + data_type in self.VARIABLE_LENGTH_DATA_TYPES + and character_maximum_length is not None + and character_maximum_length > 0 + ): + return (column_name, f"{data_type}({character_maximum_length})") + if ( + data_type in ("varbinary", "varchar", "nvarchar") + and character_maximum_length is not None + and character_maximum_length == -1 ): - return (column_name, f"{data_type}({prec_int}, {scale_int})") - if data_type == "float" and prec_int is not None: - return (column_name, f"{data_type}({prec_int})") + return (column_name, f"{data_type}(max)") + if data_type in ("decimal", "numeric"): + return (column_name, f"{data_type}({numeric_precision}, {numeric_scale})") + if data_type == "float": + return (column_name, f"{data_type}({numeric_precision})") return (column_name, data_type) - def _to_optional_int(val: t.Any) -> t.Optional[int]: - """Safely convert DataFrame values to Optional[int] for mypy.""" - if val is None: - return None - try: - return int(val) - except (ValueError, TypeError): - return None - - columns_processed = [ - build_var_length_col( - str(row.COLUMN_NAME), - str(row.DATA_TYPE), - _to_optional_int(row.CHARACTER_MAXIMUM_LENGTH), - _to_optional_int(row.NUMERIC_PRECISION), - _to_optional_int(row.NUMERIC_SCALE), - ) - for row in df.itertuples() - ] + columns = [build_var_length_col(*row) for row in columns_raw] return { column_name: exp.DataType.build(data_type, dialect=self.dialect) - for column_name, data_type in columns_processed + for column_name, data_type in columns } def _insert_overwrite_by_condition( @@ -448,7 +120,7 @@ def _insert_overwrite_by_condition( for source_query in source_queries: with source_query as query: - query = self._order_projections_and_filter(query, columns_to_types) + query = self._order_projections_and_filter(query, columns_to_types, where=where) self._insert_append_query( table_name, query, From 1bbe90e633b90d0e0fd3b7683f3094858f29f6d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Mon, 23 Jun 2025 22:27:59 +0200 Subject: [PATCH 07/16] docs & tests --- docs/integrations/engines/fabric.md | 30 +++++++++ docs/integrations/overview.md | 1 + mkdocs.yml | 1 + pyproject.toml | 1 + sqlmesh/core/config/connection.py | 2 +- sqlmesh/core/engine_adapter/fabric.py | 4 +- tests/core/engine_adapter/test_fabric.py | 83 ++++++++++++++++++++++++ 7 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 docs/integrations/engines/fabric.md create mode 100644 tests/core/engine_adapter/test_fabric.py diff --git a/docs/integrations/engines/fabric.md b/docs/integrations/engines/fabric.md new file mode 100644 index 0000000000..aca9c32eed --- /dev/null +++ b/docs/integrations/engines/fabric.md @@ -0,0 +1,30 @@ +# Fabric + +## Local/Built-in Scheduler +**Engine Adapter Type**: `fabric` + +### Installation +#### Microsoft Entra ID / Azure Active Directory Authentication: +``` +pip install "sqlmesh[mssql-odbc]" +``` + +### Connection options + +| Option | Description | Type | Required | +| ----------------- | ------------------------------------------------------------ | :----------: | :------: | +| `type` | Engine type name - must be `fabric` | string | Y | +| `host` | The hostname of the Fabric Warehouse server | string | Y | +| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N | +| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N | +| `port` | The port number of the Fabric Warehouse server | int | N | +| `database` | The target database | string | N | +| `charset` | The character set used for the connection | string | N | +| `timeout` | The query timeout in seconds. Default: no timeout | int | N | +| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | +| `appname` | The application name to use for the connection | string | N | +| `conn_properties` | The list of connection properties | list[string] | N | +| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | +| `driver` | The driver to use for the connection. Default: pyodbc | string | N | +| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | +| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | \ No newline at end of file diff --git a/docs/integrations/overview.md b/docs/integrations/overview.md index 9f829ceab7..c23fe0fc47 100644 --- a/docs/integrations/overview.md +++ b/docs/integrations/overview.md @@ -17,6 +17,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects: * [ClickHouse](./engines/clickhouse.md) * [Databricks](./engines/databricks.md) * [DuckDB](./engines/duckdb.md) +* [Fabric](./engines/fabric.md) * [MotherDuck](./engines/motherduck.md) * [MSSQL](./engines/mssql.md) * [MySQL](./engines/mysql.md) diff --git a/mkdocs.yml b/mkdocs.yml index 56ec348a04..b7ab52e858 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -84,6 +84,7 @@ nav: - integrations/engines/clickhouse.md - integrations/engines/databricks.md - integrations/engines/duckdb.md + - integrations/engines/fabric.md - integrations/engines/motherduck.md - integrations/engines/mssql.md - integrations/engines/mysql.md diff --git a/pyproject.toml b/pyproject.toml index ea20c21e74..c8eeaec3e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -249,6 +249,7 @@ markers = [ "clickhouse_cloud: test for Clickhouse (cloud mode)", "databricks: test for Databricks", "duckdb: test for DuckDB", + "fabric: test for Fabric", "motherduck: test for MotherDuck", "mssql: test for MSSQL", "mysql: test for MySQL", diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 9e95e9ae78..a6aaa96b4a 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -49,7 +49,6 @@ "mysql", "mssql", "azuresql", - "fabric", } FORBIDDEN_STATE_SYNC_ENGINES = { # Do not support row-level operations @@ -1603,6 +1602,7 @@ class FabricConnectionConfig(MSSQLConnectionConfig): """ type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore + driver: t.Literal["pyodbc"] = "pyodbc" autocommit: t.Optional[bool] = True @property diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index a4eb30a91d..44cc8bcfb3 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -31,8 +31,10 @@ def table_exists(self, table_name: TableName) -> bool: exp.select("1") .from_("INFORMATION_SCHEMA.TABLES") .where(f"TABLE_NAME = '{table.alias_or_name}'") - .where(f"TABLE_SCHEMA = '{table.db}'") ) + database_name = table.db + if database_name: + sql = sql.where(f"TABLE_SCHEMA = '{database_name}'") result = self.fetchone(sql, quote_identifiers=True) diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py new file mode 100644 index 0000000000..623bbe6653 --- /dev/null +++ b/tests/core/engine_adapter/test_fabric.py @@ -0,0 +1,83 @@ +# type: ignore + +import typing as t + +import pytest +from sqlglot import exp, parse_one + +from sqlmesh.core.engine_adapter import FabricAdapter +from tests.core.engine_adapter import to_sql_calls + +pytestmark = [pytest.mark.engine, pytest.mark.fabric] + + +@pytest.fixture +def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter: + return make_mocked_engine_adapter(FabricAdapter) + + +def test_columns(adapter: FabricAdapter): + adapter.cursor.fetchall.return_value = [ + ("decimal_ps", "decimal", None, 5, 4), + ("decimal", "decimal", None, 18, 0), + ("float", "float", None, 53, None), + ("char_n", "char", 10, None, None), + ("varchar_n", "varchar", 10, None, None), + ("nvarchar_max", "nvarchar", -1, None, None), + ] + + assert adapter.columns("db.table") == { + "decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect), + "decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect), + "float": exp.DataType.build("float(53)", dialect=adapter.dialect), + "char_n": exp.DataType.build("char(10)", dialect=adapter.dialect), + "varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect), + "nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect), + } + + # Verify that the adapter queries the uppercase INFORMATION_SCHEMA + adapter.cursor.execute.assert_called_once_with( + """SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" + ) + + +def test_table_exists(adapter: FabricAdapter): + adapter.cursor.fetchone.return_value = (1,) + assert adapter.table_exists("db.table") + # Verify that the adapter queries the uppercase INFORMATION_SCHEMA + adapter.cursor.execute.assert_called_once_with( + """SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" + ) + + adapter.cursor.fetchone.return_value = None + assert not adapter.table_exists("db.table") + + +def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): + adapter.insert_overwrite_by_time_partition( + "test_table", + parse_one("SELECT a, b FROM tbl"), + start="2022-01-01", + end="2022-01-02", + time_column="b", + time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")), + columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, + ) + + # Fabric adapter should use DELETE/INSERT strategy, not MERGE. + assert to_sql_calls(adapter) == [ + """DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", + """INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a], [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", + ] + + +def test_replace_query(adapter: FabricAdapter): + adapter.cursor.fetchone.return_value = (1,) + adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"}) + + # This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT + assert to_sql_calls(adapter) == [ + """SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""", + "TRUNCATE TABLE [test_table];", + "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", + ] From 689557028b08f2130eb44fcb53b838a7bd4a9779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Mon, 23 Jun 2025 23:29:03 +0200 Subject: [PATCH 08/16] connection tests --- docs/guides/configuration.md | 1 + sqlmesh/core/config/__init__.py | 1 + sqlmesh/core/engine_adapter/fabric.py | 30 +++++----- tests/core/test_connection_config.py | 83 +++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 17 deletions(-) diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index 361171d937..06aa3298ce 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -598,6 +598,7 @@ These pages describe the connection configuration options for each execution eng * [BigQuery](../integrations/engines/bigquery.md) * [Databricks](../integrations/engines/databricks.md) * [DuckDB](../integrations/engines/duckdb.md) +* [Fabric](../integrations/engines/fabric.md) * [MotherDuck](../integrations/engines/motherduck.md) * [MySQL](../integrations/engines/mysql.md) * [MSSQL](../integrations/engines/mssql.md) diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index af84818858..65435376a0 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -10,6 +10,7 @@ ConnectionConfig as ConnectionConfig, DatabricksConnectionConfig as DatabricksConnectionConfig, DuckDBConnectionConfig as DuckDBConnectionConfig, + FabricConnectionConfig as FabricConnectionConfig, GCPPostgresConnectionConfig as GCPPostgresConnectionConfig, MotherDuckConnectionConfig as MotherDuckConnectionConfig, MSSQLConnectionConfig as MSSQLConnectionConfig, diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 44cc8bcfb3..f0a025607a 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -4,6 +4,7 @@ from sqlglot import exp from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery +from sqlmesh.core.engine_adapter.base import EngineAdapter if t.TYPE_CHECKING: from sqlmesh.core._typing import TableName @@ -110,22 +111,17 @@ def _insert_overwrite_by_condition( **kwargs: t.Any, ) -> None: """ - Implements the insert overwrite strategy for Fabric. + Implements the insert overwrite strategy for Fabric using DELETE and INSERT. - Overridden to enforce a `DELETE`/`INSERT` strategy, as Fabric's - `MERGE` statement has limitations. + This method is overridden to avoid the MERGE statement from the parent + MSSQLEngineAdapter, which is not fully supported in Fabric. """ - - columns_to_types = columns_to_types or self.columns(table_name) - - self.delete_from(table_name, where=where or exp.true()) - - for source_query in source_queries: - with source_query as query: - query = self._order_projections_and_filter(query, columns_to_types, where=where) - self._insert_append_query( - table_name, - query, - columns_to_types=columns_to_types, - order_projections=False, - ) + return EngineAdapter._insert_overwrite_by_condition( + self, + table_name=table_name, + source_queries=source_queries, + columns_to_types=columns_to_types, + where=where, + insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT, + **kwargs, + ) diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index ba33cb010b..daa2fc77d3 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -12,6 +12,7 @@ ConnectionConfig, DatabricksConnectionConfig, DuckDBAttachOptions, + FabricConnectionConfig, DuckDBConnectionConfig, GCPPostgresConnectionConfig, MotherDuckConnectionConfig, @@ -1392,3 +1393,85 @@ def test_mssql_pymssql_connection_factory(): # Clean up the mock module if "pymssql" in sys.modules: del sys.modules["pymssql"] + + +def test_fabric_connection_config_defaults(make_config): + """Test Fabric connection config defaults to pyodbc and autocommit=True.""" + config = make_config(type="fabric", host="localhost", check_import=False) + assert isinstance(config, FabricConnectionConfig) + assert config.driver == "pyodbc" + assert config.autocommit is True + + # Ensure it creates the FabricAdapter + from sqlmesh.core.engine_adapter.fabric import FabricAdapter + + assert isinstance(config.create_engine_adapter(), FabricAdapter) + + +def test_fabric_connection_config_parameter_validation(make_config): + """Test Fabric connection config parameter validation.""" + # Test that FabricConnectionConfig correctly handles pyodbc-specific parameters. + config = make_config( + type="fabric", + host="localhost", + driver_name="ODBC Driver 18 for SQL Server", + trust_server_certificate=True, + encrypt=False, + odbc_properties={"Authentication": "ActiveDirectoryServicePrincipal"}, + check_import=False, + ) + assert isinstance(config, FabricConnectionConfig) + assert config.driver == "pyodbc" # Driver is fixed to pyodbc + assert config.driver_name == "ODBC Driver 18 for SQL Server" + assert config.trust_server_certificate is True + assert config.encrypt is False + assert config.odbc_properties == {"Authentication": "ActiveDirectoryServicePrincipal"} + + # Test that specifying a different driver for Fabric raises an error + with pytest.raises(ConfigError, match=r"Input should be 'pyodbc'"): + make_config(type="fabric", host="localhost", driver="pymssql", check_import=False) + + +def test_fabric_pyodbc_connection_string_generation(): + """Test that the Fabric pyodbc connection gets invoked with the correct ODBC connection string.""" + with patch("pyodbc.connect") as mock_pyodbc_connect: + # Create a Fabric config + config = FabricConnectionConfig( + host="testserver.datawarehouse.fabric.microsoft.com", + port=1433, + database="testdb", + user="testuser", + password="testpass", + driver_name="ODBC Driver 18 for SQL Server", + trust_server_certificate=True, + encrypt=True, + login_timeout=30, + check_import=False, + ) + + # Get the connection factory with kwargs and call it + factory_with_kwargs = config._connection_factory_with_kwargs + connection = factory_with_kwargs() + + # Verify pyodbc.connect was called with the correct connection string + mock_pyodbc_connect.assert_called_once() + call_args = mock_pyodbc_connect.call_args + + # Check the connection string (first argument) + conn_str = call_args[0][0] + expected_parts = [ + "DRIVER={ODBC Driver 18 for SQL Server}", + "SERVER=testserver.datawarehouse.fabric.microsoft.com,1433", + "DATABASE=testdb", + "Encrypt=YES", + "TrustServerCertificate=YES", + "Connection Timeout=30", + "UID=testuser", + "PWD=testpass", + ] + + for part in expected_parts: + assert part in conn_str + + # Check autocommit parameter, should default to True for Fabric + assert call_args[1]["autocommit"] is True From 9c0a2dd36de66e993f2bd6845b4e8d9046efce82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Tue, 24 Jun 2025 15:08:59 +0200 Subject: [PATCH 09/16] remove table_exist and columns --- sqlmesh/core/engine_adapter/fabric.py | 81 ------------------------ tests/core/engine_adapter/test_fabric.py | 30 +++++++-- 2 files changed, 24 insertions(+), 87 deletions(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index f0a025607a..5725d3060a 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -20,87 +20,6 @@ class FabricAdapter(MSSQLEngineAdapter): SUPPORTS_TRANSACTIONS = False INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT - def table_exists(self, table_name: TableName) -> bool: - """ - Checks if a table exists. - - Querying the uppercase `INFORMATION_SCHEMA` required - by case-sensitive Fabric environments. - """ - table = exp.to_table(table_name) - sql = ( - exp.select("1") - .from_("INFORMATION_SCHEMA.TABLES") - .where(f"TABLE_NAME = '{table.alias_or_name}'") - ) - database_name = table.db - if database_name: - sql = sql.where(f"TABLE_SCHEMA = '{database_name}'") - - result = self.fetchone(sql, quote_identifiers=True) - - return result[0] == 1 if result else False - - def columns( - self, - table_name: TableName, - include_pseudo_columns: bool = True, - ) -> t.Dict[str, exp.DataType]: - """Fabric doesn't support describe so we query INFORMATION_SCHEMA.""" - - table = exp.to_table(table_name) - - sql = ( - exp.select( - "COLUMN_NAME", - "DATA_TYPE", - "CHARACTER_MAXIMUM_LENGTH", - "NUMERIC_PRECISION", - "NUMERIC_SCALE", - ) - .from_("INFORMATION_SCHEMA.COLUMNS") - .where(f"TABLE_NAME = '{table.name}'") - ) - database_name = table.db - if database_name: - sql = sql.where(f"TABLE_SCHEMA = '{database_name}'") - - columns_raw = self.fetchall(sql, quote_identifiers=True) - - def build_var_length_col( - column_name: str, - data_type: str, - character_maximum_length: t.Optional[int] = None, - numeric_precision: t.Optional[int] = None, - numeric_scale: t.Optional[int] = None, - ) -> tuple: - data_type = data_type.lower() - if ( - data_type in self.VARIABLE_LENGTH_DATA_TYPES - and character_maximum_length is not None - and character_maximum_length > 0 - ): - return (column_name, f"{data_type}({character_maximum_length})") - if ( - data_type in ("varbinary", "varchar", "nvarchar") - and character_maximum_length is not None - and character_maximum_length == -1 - ): - return (column_name, f"{data_type}(max)") - if data_type in ("decimal", "numeric"): - return (column_name, f"{data_type}({numeric_precision}, {numeric_scale})") - if data_type == "float": - return (column_name, f"{data_type}({numeric_precision})") - - return (column_name, data_type) - - columns = [build_var_length_col(*row) for row in columns_raw] - - return { - column_name: exp.DataType.build(data_type, dialect=self.dialect) - for column_name, data_type in columns - } - def _insert_overwrite_by_condition( self, table_name: TableName, diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 623bbe6653..80aea0c989 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -53,7 +53,9 @@ def test_table_exists(adapter: FabricAdapter): assert not adapter.table_exists("db.table") -def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): +def test_insert_overwrite_by_time_partition( + adapter: FabricAdapter, assert_exp_eq +): # Add assert_exp_eq fixture adapter.insert_overwrite_by_time_partition( "test_table", parse_one("SELECT a, b FROM tbl"), @@ -64,11 +66,27 @@ def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, ) - # Fabric adapter should use DELETE/INSERT strategy, not MERGE. - assert to_sql_calls(adapter) == [ - """DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", - """INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a], [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", - ] + # Get the list of generated SQL strings + actual_sql_calls = to_sql_calls(adapter) + + # There should be two calls: DELETE and INSERT + assert len(actual_sql_calls) == 2 + + # Assert the DELETE statement is correct (string comparison is fine for this simple one) + assert ( + actual_sql_calls[0] + == "DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';" + ) + + # Assert the INSERT statement is semantically correct + expected_insert_sql = """ + INSERT INTO [test_table] ([a], [b]) + SELECT [a], [b] FROM (SELECT [a], [b] FROM [tbl]) AS [_subquery] + WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02'; + """ + + # Use assert_exp_eq to compare the parsed SQL expressions + assert_exp_eq(actual_sql_calls[1], expected_insert_sql) def test_replace_query(adapter: FabricAdapter): From f40fc4d0d44e6835da7d9ede4aee96e51506e1d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Wed, 25 Jun 2025 08:52:33 +0200 Subject: [PATCH 10/16] updated tests --- sqlmesh/core/config/connection.py | 4 +++- tests/core/engine_adapter/test_fabric.py | 30 +++++------------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index a6aaa96b4a..16ae80424b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1596,12 +1596,14 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: class FabricConnectionConfig(MSSQLConnectionConfig): """ Fabric Connection Configuration. - Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'. It is recommended to use the 'pyodbc' driver for Fabric. """ type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore + DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" + DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" + DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 driver: t.Literal["pyodbc"] = "pyodbc" autocommit: t.Optional[bool] = True diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 80aea0c989..709df816d2 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -53,9 +53,7 @@ def test_table_exists(adapter: FabricAdapter): assert not adapter.table_exists("db.table") -def test_insert_overwrite_by_time_partition( - adapter: FabricAdapter, assert_exp_eq -): # Add assert_exp_eq fixture +def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): adapter.insert_overwrite_by_time_partition( "test_table", parse_one("SELECT a, b FROM tbl"), @@ -66,27 +64,11 @@ def test_insert_overwrite_by_time_partition( columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, ) - # Get the list of generated SQL strings - actual_sql_calls = to_sql_calls(adapter) - - # There should be two calls: DELETE and INSERT - assert len(actual_sql_calls) == 2 - - # Assert the DELETE statement is correct (string comparison is fine for this simple one) - assert ( - actual_sql_calls[0] - == "DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';" - ) - - # Assert the INSERT statement is semantically correct - expected_insert_sql = """ - INSERT INTO [test_table] ([a], [b]) - SELECT [a], [b] FROM (SELECT [a], [b] FROM [tbl]) AS [_subquery] - WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02'; - """ - - # Use assert_exp_eq to compare the parsed SQL expressions - assert_exp_eq(actual_sql_calls[1], expected_insert_sql) + # Fabric adapter should use DELETE/INSERT strategy, not MERGE. + assert to_sql_calls(adapter) == [ + """DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", + """INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", + ] def test_replace_query(adapter: FabricAdapter): From 5cc30ab63aa95fa0fa48f47b0a4b576807fcb2a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Wed, 25 Jun 2025 10:54:41 +0200 Subject: [PATCH 11/16] mypy --- sqlmesh/core/config/connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 16ae80424b..1505e26080 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1601,9 +1601,9 @@ class FabricConnectionConfig(MSSQLConnectionConfig): """ type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore - DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" - DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" - DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 + DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore + DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore + DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore driver: t.Literal["pyodbc"] = "pyodbc" autocommit: t.Optional[bool] = True From d5f7aa77ee15525e1c0247cb58947c37c0dddef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredh=C3=B8i?= Date: Wed, 25 Jun 2025 11:10:04 +0200 Subject: [PATCH 12/16] ruff --- sqlmesh/core/config/connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 1505e26080..e9bab2185b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1601,9 +1601,9 @@ class FabricConnectionConfig(MSSQLConnectionConfig): """ type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore - DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore - DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore - DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore + DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore + DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore + DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore driver: t.Literal["pyodbc"] = "pyodbc" autocommit: t.Optional[bool] = True From 50fe5e4f881ed949bbb0879c767b0c3202ebb168 Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Wed, 25 Jun 2025 16:11:25 +0200 Subject: [PATCH 13/16] Update fabric.md --- docs/integrations/engines/fabric.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/integrations/engines/fabric.md b/docs/integrations/engines/fabric.md index aca9c32eed..1dd47fbe11 100644 --- a/docs/integrations/engines/fabric.md +++ b/docs/integrations/engines/fabric.md @@ -3,6 +3,8 @@ ## Local/Built-in Scheduler **Engine Adapter Type**: `fabric` +NOTE: Fabric Warehouse is not recommended to be used for the SQLMesh [state connection](../../reference/configuration.md#connections). + ### Installation #### Microsoft Entra ID / Azure Active Directory Authentication: ``` @@ -27,4 +29,4 @@ pip install "sqlmesh[mssql-odbc]" | `autocommit` | Is autocommit mode enabled. Default: false | bool | N | | `driver` | The driver to use for the connection. Default: pyodbc | string | N | | `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | -| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | \ No newline at end of file +| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | From 3a06c909d59d8c19d856d82897739e1368dc650c Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Wed, 2 Jul 2025 13:28:52 +0200 Subject: [PATCH 14/16] Update sqlmesh/core/engine_adapter/fabric.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mattias Thalén --- sqlmesh/core/engine_adapter/fabric.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 5725d3060a..97322641bd 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -10,7 +10,9 @@ from sqlmesh.core._typing import TableName -class FabricAdapter(MSSQLEngineAdapter): +from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin + +class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter): """ Adapter for Microsoft Fabric. """ From 145b69b62a4dae082b19ff7006bc5f3cd0376ba3 Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Wed, 2 Jul 2025 14:39:03 +0200 Subject: [PATCH 15/16] Update fabric.py --- sqlmesh/core/engine_adapter/fabric.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 97322641bd..d7b862d50a 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -12,6 +12,7 @@ from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin + class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter): """ Adapter for Microsoft Fabric. From e9396e83138f15dbbffcbfb8465437b5b2914e2f Mon Sep 17 00:00:00 2001 From: Andreas <65893109+fresioAS@users.noreply.github.com> Date: Thu, 10 Jul 2025 16:53:22 +0200 Subject: [PATCH 16/16] Update sqlmesh/core/config/connection.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mattias Thalén --- sqlmesh/core/config/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 988ad4769c..262efcbd8b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1665,7 +1665,7 @@ def _engine_adapter(self) -> t.Type[EngineAdapter]: def _extra_engine_config(self) -> t.Dict[str, t.Any]: return { "database": self.database, - "catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, + "catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY, }