diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index 52ebdf7793..9d44cd9f62 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -767,6 +767,7 @@ These pages describe the connection configuration options for each execution eng * [BigQuery](../integrations/engines/bigquery.md) * [Databricks](../integrations/engines/databricks.md) * [DuckDB](../integrations/engines/duckdb.md) +* [Fabric](../integrations/engines/fabric.md) * [MotherDuck](../integrations/engines/motherduck.md) * [MySQL](../integrations/engines/mysql.md) * [MSSQL](../integrations/engines/mssql.md) diff --git a/docs/integrations/engines/fabric.md b/docs/integrations/engines/fabric.md new file mode 100644 index 0000000000..1dd47fbe11 --- /dev/null +++ b/docs/integrations/engines/fabric.md @@ -0,0 +1,32 @@ +# Fabric + +## Local/Built-in Scheduler +**Engine Adapter Type**: `fabric` + +NOTE: Fabric Warehouse is not recommended to be used for the SQLMesh [state connection](../../reference/configuration.md#connections). + +### Installation +#### Microsoft Entra ID / Azure Active Directory Authentication: +``` +pip install "sqlmesh[mssql-odbc]" +``` + +### Connection options + +| Option | Description | Type | Required | +| ----------------- | ------------------------------------------------------------ | :----------: | :------: | +| `type` | Engine type name - must be `fabric` | string | Y | +| `host` | The hostname of the Fabric Warehouse server | string | Y | +| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N | +| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N | +| `port` | The port number of the Fabric Warehouse server | int | N | +| `database` | The target database | string | N | +| `charset` | The character set used for the connection | string | N | +| `timeout` | The query timeout in seconds. Default: no timeout | int | N | +| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | +| `appname` | The application name to use for the connection | string | N | +| `conn_properties` | The list of connection properties | list[string] | N | +| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | +| `driver` | The driver to use for the connection. Default: pyodbc | string | N | +| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | +| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | diff --git a/docs/integrations/overview.md b/docs/integrations/overview.md index 5e850afbf6..94b9289d21 100644 --- a/docs/integrations/overview.md +++ b/docs/integrations/overview.md @@ -17,6 +17,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects (e * [ClickHouse](./engines/clickhouse.md) (clickhouse) * [Databricks](./engines/databricks.md) (databricks) * [DuckDB](./engines/duckdb.md) (duckdb) +* [Fabric](./engines/fabric.md) (fabric) * [MotherDuck](./engines/motherduck.md) (motherduck) * [MSSQL](./engines/mssql.md) (mssql) * [MySQL](./engines/mysql.md) (mysql) diff --git a/mkdocs.yml b/mkdocs.yml index 56ec348a04..b7ab52e858 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -84,6 +84,7 @@ nav: - integrations/engines/clickhouse.md - integrations/engines/databricks.md - integrations/engines/duckdb.md + - integrations/engines/fabric.md - integrations/engines/motherduck.md - integrations/engines/mssql.md - integrations/engines/mysql.md diff --git a/pyproject.toml b/pyproject.toml index 046fbee025..c02c5e1565 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -252,6 +252,7 @@ markers = [ "clickhouse_cloud: test for Clickhouse (cloud mode)", "databricks: test for Databricks", "duckdb: test for DuckDB", + "fabric: test for Fabric", "motherduck: test for MotherDuck", "mssql: test for MSSQL", "mysql: test for MySQL", diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index af84818858..65435376a0 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -10,6 +10,7 @@ ConnectionConfig as ConnectionConfig, DatabricksConnectionConfig as DatabricksConnectionConfig, DuckDBConnectionConfig as DuckDBConnectionConfig, + FabricConnectionConfig as FabricConnectionConfig, GCPPostgresConnectionConfig as GCPPostgresConnectionConfig, MotherDuckConnectionConfig as MotherDuckConnectionConfig, MSSQLConnectionConfig as MSSQLConnectionConfig, diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 2c897cd8a5..262efcbd8b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -43,7 +43,13 @@ logger = logging.getLogger(__name__) -RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"} +RECOMMENDED_STATE_SYNC_ENGINES = { + "postgres", + "gcp_postgres", + "mysql", + "mssql", + "azuresql", +} FORBIDDEN_STATE_SYNC_ENGINES = { # Do not support row-level operations "spark", @@ -1635,6 +1641,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} +class FabricConnectionConfig(MSSQLConnectionConfig): + """ + Fabric Connection Configuration. + Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'. + It is recommended to use the 'pyodbc' driver for Fabric. + """ + + type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore + DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore + DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore + DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore + driver: t.Literal["pyodbc"] = "pyodbc" + autocommit: t.Optional[bool] = True + + @property + def _engine_adapter(self) -> t.Type[EngineAdapter]: + from sqlmesh.core.engine_adapter.fabric import FabricAdapter + + return FabricAdapter + + @property + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + return { + "database": self.database, + "catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY, + } + + class SparkConnectionConfig(ConnectionConfig): """ Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks. diff --git a/sqlmesh/core/engine_adapter/__init__.py b/sqlmesh/core/engine_adapter/__init__.py index 19332dc005..337de39905 100644 --- a/sqlmesh/core/engine_adapter/__init__.py +++ b/sqlmesh/core/engine_adapter/__init__.py @@ -19,6 +19,7 @@ from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter +from sqlmesh.core.engine_adapter.fabric import FabricAdapter DIALECT_TO_ENGINE_ADAPTER = { "hive": SparkEngineAdapter, @@ -35,6 +36,7 @@ "trino": TrinoEngineAdapter, "athena": AthenaEngineAdapter, "risingwave": RisingwaveEngineAdapter, + "fabric": FabricAdapter, } DIALECT_ALIASES = { diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py new file mode 100644 index 0000000000..d7b862d50a --- /dev/null +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import typing as t +from sqlglot import exp +from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter +from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery +from sqlmesh.core.engine_adapter.base import EngineAdapter + +if t.TYPE_CHECKING: + from sqlmesh.core._typing import TableName + + +from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin + + +class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter): + """ + Adapter for Microsoft Fabric. + """ + + DIALECT = "fabric" + SUPPORTS_INDEXES = False + SUPPORTS_TRANSACTIONS = False + INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT + + def _insert_overwrite_by_condition( + self, + table_name: TableName, + source_queries: t.List[SourceQuery], + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + where: t.Optional[exp.Condition] = None, + insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, + **kwargs: t.Any, + ) -> None: + """ + Implements the insert overwrite strategy for Fabric using DELETE and INSERT. + + This method is overridden to avoid the MERGE statement from the parent + MSSQLEngineAdapter, which is not fully supported in Fabric. + """ + return EngineAdapter._insert_overwrite_by_condition( + self, + table_name=table_name, + source_queries=source_queries, + columns_to_types=columns_to_types, + where=where, + insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT, + **kwargs, + ) diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py new file mode 100644 index 0000000000..709df816d2 --- /dev/null +++ b/tests/core/engine_adapter/test_fabric.py @@ -0,0 +1,83 @@ +# type: ignore + +import typing as t + +import pytest +from sqlglot import exp, parse_one + +from sqlmesh.core.engine_adapter import FabricAdapter +from tests.core.engine_adapter import to_sql_calls + +pytestmark = [pytest.mark.engine, pytest.mark.fabric] + + +@pytest.fixture +def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter: + return make_mocked_engine_adapter(FabricAdapter) + + +def test_columns(adapter: FabricAdapter): + adapter.cursor.fetchall.return_value = [ + ("decimal_ps", "decimal", None, 5, 4), + ("decimal", "decimal", None, 18, 0), + ("float", "float", None, 53, None), + ("char_n", "char", 10, None, None), + ("varchar_n", "varchar", 10, None, None), + ("nvarchar_max", "nvarchar", -1, None, None), + ] + + assert adapter.columns("db.table") == { + "decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect), + "decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect), + "float": exp.DataType.build("float(53)", dialect=adapter.dialect), + "char_n": exp.DataType.build("char(10)", dialect=adapter.dialect), + "varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect), + "nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect), + } + + # Verify that the adapter queries the uppercase INFORMATION_SCHEMA + adapter.cursor.execute.assert_called_once_with( + """SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" + ) + + +def test_table_exists(adapter: FabricAdapter): + adapter.cursor.fetchone.return_value = (1,) + assert adapter.table_exists("db.table") + # Verify that the adapter queries the uppercase INFORMATION_SCHEMA + adapter.cursor.execute.assert_called_once_with( + """SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" + ) + + adapter.cursor.fetchone.return_value = None + assert not adapter.table_exists("db.table") + + +def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): + adapter.insert_overwrite_by_time_partition( + "test_table", + parse_one("SELECT a, b FROM tbl"), + start="2022-01-01", + end="2022-01-02", + time_column="b", + time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")), + columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, + ) + + # Fabric adapter should use DELETE/INSERT strategy, not MERGE. + assert to_sql_calls(adapter) == [ + """DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", + """INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", + ] + + +def test_replace_query(adapter: FabricAdapter): + adapter.cursor.fetchone.return_value = (1,) + adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"}) + + # This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT + assert to_sql_calls(adapter) == [ + """SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""", + "TRUNCATE TABLE [test_table];", + "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", + ] diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index 0d7df3d724..49b41f150a 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -12,6 +12,7 @@ ConnectionConfig, DatabricksConnectionConfig, DuckDBAttachOptions, + FabricConnectionConfig, DuckDBConnectionConfig, GCPPostgresConnectionConfig, MotherDuckConnectionConfig, @@ -1417,3 +1418,85 @@ def test_mssql_pymssql_connection_factory(): # Clean up the mock module if "pymssql" in sys.modules: del sys.modules["pymssql"] + + +def test_fabric_connection_config_defaults(make_config): + """Test Fabric connection config defaults to pyodbc and autocommit=True.""" + config = make_config(type="fabric", host="localhost", check_import=False) + assert isinstance(config, FabricConnectionConfig) + assert config.driver == "pyodbc" + assert config.autocommit is True + + # Ensure it creates the FabricAdapter + from sqlmesh.core.engine_adapter.fabric import FabricAdapter + + assert isinstance(config.create_engine_adapter(), FabricAdapter) + + +def test_fabric_connection_config_parameter_validation(make_config): + """Test Fabric connection config parameter validation.""" + # Test that FabricConnectionConfig correctly handles pyodbc-specific parameters. + config = make_config( + type="fabric", + host="localhost", + driver_name="ODBC Driver 18 for SQL Server", + trust_server_certificate=True, + encrypt=False, + odbc_properties={"Authentication": "ActiveDirectoryServicePrincipal"}, + check_import=False, + ) + assert isinstance(config, FabricConnectionConfig) + assert config.driver == "pyodbc" # Driver is fixed to pyodbc + assert config.driver_name == "ODBC Driver 18 for SQL Server" + assert config.trust_server_certificate is True + assert config.encrypt is False + assert config.odbc_properties == {"Authentication": "ActiveDirectoryServicePrincipal"} + + # Test that specifying a different driver for Fabric raises an error + with pytest.raises(ConfigError, match=r"Input should be 'pyodbc'"): + make_config(type="fabric", host="localhost", driver="pymssql", check_import=False) + + +def test_fabric_pyodbc_connection_string_generation(): + """Test that the Fabric pyodbc connection gets invoked with the correct ODBC connection string.""" + with patch("pyodbc.connect") as mock_pyodbc_connect: + # Create a Fabric config + config = FabricConnectionConfig( + host="testserver.datawarehouse.fabric.microsoft.com", + port=1433, + database="testdb", + user="testuser", + password="testpass", + driver_name="ODBC Driver 18 for SQL Server", + trust_server_certificate=True, + encrypt=True, + login_timeout=30, + check_import=False, + ) + + # Get the connection factory with kwargs and call it + factory_with_kwargs = config._connection_factory_with_kwargs + connection = factory_with_kwargs() + + # Verify pyodbc.connect was called with the correct connection string + mock_pyodbc_connect.assert_called_once() + call_args = mock_pyodbc_connect.call_args + + # Check the connection string (first argument) + conn_str = call_args[0][0] + expected_parts = [ + "DRIVER={ODBC Driver 18 for SQL Server}", + "SERVER=testserver.datawarehouse.fabric.microsoft.com,1433", + "DATABASE=testdb", + "Encrypt=YES", + "TrustServerCertificate=YES", + "Connection Timeout=30", + "UID=testuser", + "PWD=testpass", + ] + + for part in expected_parts: + assert part in conn_str + + # Check autocommit parameter, should default to True for Fabric + assert call_args[1]["autocommit"] is True