Skip to content

feat: Add support for Microsoft Fabric Warehouse #4751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b679716
feat: Add support for Microsoft Fabric Waerhouse
fresioAS Jun 16, 2025
9a6c575
removing some print statements
fresioAS Jun 16, 2025
347d3ed
adding dialect & handling temp views
fresioAS Jun 17, 2025
0ff075c
isnan error
fresioAS Jun 18, 2025
332ea32
CTEs no qualify
fresioAS Jun 19, 2025
585fb7e
simplifying
fresioAS Jun 23, 2025
1bbe90e
docs & tests
fresioAS Jun 23, 2025
6895570
connection tests
fresioAS Jun 23, 2025
9c0a2dd
remove table_exist and columns
fresioAS Jun 24, 2025
f40fc4d
updated tests
fresioAS Jun 25, 2025
5cc30ab
mypy
fresioAS Jun 25, 2025
d5f7aa7
ruff
fresioAS Jun 25, 2025
873d9f5
Merge branch 'main' into add_fabric_warehouse
fresioAS Jun 25, 2025
50fe5e4
Update fabric.md
fresioAS Jun 25, 2025
ae7197b
Merge branch 'main' into add_fabric_warehouse
fresioAS Jun 27, 2025
3a06c90
Update sqlmesh/core/engine_adapter/fabric.py
fresioAS Jul 2, 2025
15a9603
Merge branch 'main' into add_fabric_warehouse
fresioAS Jul 2, 2025
145b69b
Update fabric.py
fresioAS Jul 2, 2025
9c37b9e
Merge branch 'main' into add_fabric_warehouse
fresioAS Jul 3, 2025
ecf3e7b
Add Fabric to integration tests
erindru Jul 3, 2025
481c9a7
Merge remote-tracking branch 'fresioAS/add_fabric_warehouse' into fea…
mattiasthalen Jul 10, 2025
ec87592
Merge remote-tracking branch 'erindru/fresioas-sqlmesh-fabric-integra…
mattiasthalen Jul 10, 2025
9127bda
feat(tests): add fabric timestamp handling in dialects test
mattiasthalen Jul 10, 2025
deb9321
fix: update catalog support configuration in FabricConnectionConfig
mattiasthalen Jul 10, 2025
4412fc9
fix(mssql): update driver selection logic to allow enforcing pyodbc i…
mattiasthalen Jul 10, 2025
6ac197e
fix(fabric): Skip test_value_normalization for TIMESTAMPTZ
mattiasthalen Jul 11, 2025
173e0ac
Manually set sqlglot to dev branch.
mattiasthalen Jul 11, 2025
933a765
Merge branch 'main' into feat/add-fabric-engine
mattiasthalen Jul 11, 2025
e154241
feat: Add support for Microsoft Fabric Waerhouse
fresioAS Jun 16, 2025
2bdd417
removing some print statements
fresioAS Jun 16, 2025
cbe3bdc
adding dialect & handling temp views
fresioAS Jun 17, 2025
0080583
isnan error
fresioAS Jun 18, 2025
bded0d0
CTEs no qualify
fresioAS Jun 19, 2025
51753f1
simplifying
fresioAS Jun 23, 2025
55d7314
docs & tests
fresioAS Jun 23, 2025
1f37a4b
connection tests
fresioAS Jun 23, 2025
5cb0e4f
remove table_exist and columns
fresioAS Jun 24, 2025
8253545
updated tests
fresioAS Jun 25, 2025
6a54905
mypy
fresioAS Jun 25, 2025
6f1a575
ruff
fresioAS Jun 25, 2025
c2d10a2
Update fabric.md
fresioAS Jun 25, 2025
cd9f261
Update sqlmesh/core/engine_adapter/fabric.py
fresioAS Jul 2, 2025
1eb623a
Update fabric.py
fresioAS Jul 2, 2025
5113ef4
Update sqlmesh/core/config/connection.py
fresioAS Jul 10, 2025
d17677e
Add Fabric to integration tests
erindru Jul 3, 2025
c89351a
added mattiasthalen fork changes
fresioAS Jul 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ These pages describe the connection configuration options for each execution eng
* [BigQuery](../integrations/engines/bigquery.md)
* [Databricks](../integrations/engines/databricks.md)
* [DuckDB](../integrations/engines/duckdb.md)
* [Fabric](../integrations/engines/fabric.md)
* [MotherDuck](../integrations/engines/motherduck.md)
* [MySQL](../integrations/engines/mysql.md)
* [MSSQL](../integrations/engines/mssql.md)
Expand Down
30 changes: 30 additions & 0 deletions docs/integrations/engines/fabric.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Fabric

## Local/Built-in Scheduler
**Engine Adapter Type**: `fabric`

### Installation
#### Microsoft Entra ID / Azure Active Directory Authentication:
```
pip install "sqlmesh[mssql-odbc]"
```

### Connection options

| Option | Description | Type | Required |
| ----------------- | ------------------------------------------------------------ | :----------: | :------: |
| `type` | Engine type name - must be `fabric` | string | Y |
| `host` | The hostname of the Fabric Warehouse server | string | Y |
| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N |
| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N |
| `port` | The port number of the Fabric Warehouse server | int | N |
| `database` | The target database | string | N |
| `charset` | The character set used for the connection | string | N |
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
| `appname` | The application name to use for the connection | string | N |
| `conn_properties` | The list of connection properties | list[string] | N |
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
| `driver` | The driver to use for the connection. Default: pyodbc | string | N |
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |
1 change: 1 addition & 0 deletions docs/integrations/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects (e
* [ClickHouse](./engines/clickhouse.md) (clickhouse)
* [Databricks](./engines/databricks.md) (databricks)
* [DuckDB](./engines/duckdb.md) (duckdb)
* [Fabric](./engines/fabric.md) (fabric)
* [MotherDuck](./engines/motherduck.md) (motherduck)
* [MSSQL](./engines/mssql.md) (mssql)
* [MySQL](./engines/mysql.md) (mysql)
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ nav:
- integrations/engines/clickhouse.md
- integrations/engines/databricks.md
- integrations/engines/duckdb.md
- integrations/engines/fabric.md
- integrations/engines/motherduck.md
- integrations/engines/mssql.md
- integrations/engines/mysql.md
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ markers = [
"clickhouse_cloud: test for Clickhouse (cloud mode)",
"databricks: test for Databricks",
"duckdb: test for DuckDB",
"fabric: test for Fabric",
"motherduck: test for MotherDuck",
"mssql: test for MSSQL",
"mysql: test for MySQL",
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ConnectionConfig as ConnectionConfig,
DatabricksConnectionConfig as DatabricksConnectionConfig,
DuckDBConnectionConfig as DuckDBConnectionConfig,
FabricConnectionConfig as FabricConnectionConfig,
GCPPostgresConnectionConfig as GCPPostgresConnectionConfig,
MotherDuckConnectionConfig as MotherDuckConnectionConfig,
MSSQLConnectionConfig as MSSQLConnectionConfig,
Expand Down
36 changes: 35 additions & 1 deletion sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@

logger = logging.getLogger(__name__)

RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"}
RECOMMENDED_STATE_SYNC_ENGINES = {
"postgres",
"gcp_postgres",
"mysql",
"mssql",
"azuresql",
}
FORBIDDEN_STATE_SYNC_ENGINES = {
# Do not support row-level operations
"spark",
Expand Down Expand Up @@ -1635,6 +1641,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY}


class FabricConnectionConfig(MSSQLConnectionConfig):
"""
Fabric Connection Configuration.
Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'.
It is recommended to use the 'pyodbc' driver for Fabric.
"""

type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore
DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore
DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore
driver: t.Literal["pyodbc"] = "pyodbc"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This driver literal doesn't seem to work. If I don't manually specify it in my Fabric connection config:

gateways:
  fabric:
    connection:
      type: fabric
      #driver: pyodbc

Then the pymssql check still happens and SQLMesh aborts with:

Error: Failed to import the 'pymssql' engine library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattiasthalen has this been addressed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR, but it's there in my branch. 4412fc9

Not sure if that's the way to do it ☺️

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will update this PR later today

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to see you back @fresioAS! ☺️

There's an issue in sqlglot's tsql dialect. The ddl for varchar without precision creates a varchar(1), that affects one of the integration tests.

I'm just about to leave for vacation and won't have time to handle it. But I'll be back on Sunday.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattiasthalen Ok. will take a look this week. Summer time. I'm off on vacation again on Monday 😄

autocommit: t.Optional[bool] = True

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
from sqlmesh.core.engine_adapter.fabric import FabricAdapter

return FabricAdapter

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {
"database": self.database,
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG,
}


class SparkConnectionConfig(ConnectionConfig):
"""
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks.
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter
from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter
from sqlmesh.core.engine_adapter.fabric import FabricAdapter

DIALECT_TO_ENGINE_ADAPTER = {
"hive": SparkEngineAdapter,
Expand All @@ -35,6 +36,7 @@
"trino": TrinoEngineAdapter,
"athena": AthenaEngineAdapter,
"risingwave": RisingwaveEngineAdapter,
"fabric": FabricAdapter,
}

DIALECT_ALIASES = {
Expand Down
46 changes: 46 additions & 0 deletions sqlmesh/core/engine_adapter/fabric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

import typing as t
from sqlglot import exp
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery
from sqlmesh.core.engine_adapter.base import EngineAdapter

if t.TYPE_CHECKING:
from sqlmesh.core._typing import TableName


class FabricAdapter(MSSQLEngineAdapter):
"""
Adapter for Microsoft Fabric.
"""

DIALECT = "fabric"
SUPPORTS_INDEXES = False
SUPPORTS_TRANSACTIONS = False
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT

def _insert_overwrite_by_condition(
self,
table_name: TableName,
source_queries: t.List[SourceQuery],
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
where: t.Optional[exp.Condition] = None,
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
**kwargs: t.Any,
) -> None:
"""
Implements the insert overwrite strategy for Fabric using DELETE and INSERT.

This method is overridden to avoid the MERGE statement from the parent
MSSQLEngineAdapter, which is not fully supported in Fabric.
"""
return EngineAdapter._insert_overwrite_by_condition(
self,
table_name=table_name,
source_queries=source_queries,
columns_to_types=columns_to_types,
where=where,
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT,
**kwargs,
)
83 changes: 83 additions & 0 deletions tests/core/engine_adapter/test_fabric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# type: ignore

import typing as t

import pytest
from sqlglot import exp, parse_one

from sqlmesh.core.engine_adapter import FabricAdapter
from tests.core.engine_adapter import to_sql_calls

pytestmark = [pytest.mark.engine, pytest.mark.fabric]


@pytest.fixture
def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter:
return make_mocked_engine_adapter(FabricAdapter)


def test_columns(adapter: FabricAdapter):
adapter.cursor.fetchall.return_value = [
("decimal_ps", "decimal", None, 5, 4),
("decimal", "decimal", None, 18, 0),
("float", "float", None, 53, None),
("char_n", "char", 10, None, None),
("varchar_n", "varchar", 10, None, None),
("nvarchar_max", "nvarchar", -1, None, None),
]

assert adapter.columns("db.table") == {
"decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect),
"decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect),
"float": exp.DataType.build("float(53)", dialect=adapter.dialect),
"char_n": exp.DataType.build("char(10)", dialect=adapter.dialect),
"varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect),
"nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect),
}

# Verify that the adapter queries the uppercase INFORMATION_SCHEMA
adapter.cursor.execute.assert_called_once_with(
"""SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';"""
)


def test_table_exists(adapter: FabricAdapter):
adapter.cursor.fetchone.return_value = (1,)
assert adapter.table_exists("db.table")
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA
adapter.cursor.execute.assert_called_once_with(
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';"""
)

adapter.cursor.fetchone.return_value = None
assert not adapter.table_exists("db.table")


def test_insert_overwrite_by_time_partition(adapter: FabricAdapter):
adapter.insert_overwrite_by_time_partition(
"test_table",
parse_one("SELECT a, b FROM tbl"),
start="2022-01-01",
end="2022-01-02",
time_column="b",
time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")),
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")},
)

# Fabric adapter should use DELETE/INSERT strategy, not MERGE.
assert to_sql_calls(adapter) == [
"""DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""",
"""INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""",
]


def test_replace_query(adapter: FabricAdapter):
adapter.cursor.fetchone.return_value = (1,)
adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"})

# This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT
assert to_sql_calls(adapter) == [
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""",
"TRUNCATE TABLE [test_table];",
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];",
]
83 changes: 83 additions & 0 deletions tests/core/test_connection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ConnectionConfig,
DatabricksConnectionConfig,
DuckDBAttachOptions,
FabricConnectionConfig,
DuckDBConnectionConfig,
GCPPostgresConnectionConfig,
MotherDuckConnectionConfig,
Expand Down Expand Up @@ -1417,3 +1418,85 @@ def test_mssql_pymssql_connection_factory():
# Clean up the mock module
if "pymssql" in sys.modules:
del sys.modules["pymssql"]


def test_fabric_connection_config_defaults(make_config):
"""Test Fabric connection config defaults to pyodbc and autocommit=True."""
config = make_config(type="fabric", host="localhost", check_import=False)
assert isinstance(config, FabricConnectionConfig)
assert config.driver == "pyodbc"
assert config.autocommit is True

# Ensure it creates the FabricAdapter
from sqlmesh.core.engine_adapter.fabric import FabricAdapter

assert isinstance(config.create_engine_adapter(), FabricAdapter)


def test_fabric_connection_config_parameter_validation(make_config):
"""Test Fabric connection config parameter validation."""
# Test that FabricConnectionConfig correctly handles pyodbc-specific parameters.
config = make_config(
type="fabric",
host="localhost",
driver_name="ODBC Driver 18 for SQL Server",
trust_server_certificate=True,
encrypt=False,
odbc_properties={"Authentication": "ActiveDirectoryServicePrincipal"},
check_import=False,
)
assert isinstance(config, FabricConnectionConfig)
assert config.driver == "pyodbc" # Driver is fixed to pyodbc
assert config.driver_name == "ODBC Driver 18 for SQL Server"
assert config.trust_server_certificate is True
assert config.encrypt is False
assert config.odbc_properties == {"Authentication": "ActiveDirectoryServicePrincipal"}

# Test that specifying a different driver for Fabric raises an error
with pytest.raises(ConfigError, match=r"Input should be 'pyodbc'"):
make_config(type="fabric", host="localhost", driver="pymssql", check_import=False)


def test_fabric_pyodbc_connection_string_generation():
"""Test that the Fabric pyodbc connection gets invoked with the correct ODBC connection string."""
with patch("pyodbc.connect") as mock_pyodbc_connect:
# Create a Fabric config
config = FabricConnectionConfig(
host="testserver.datawarehouse.fabric.microsoft.com",
port=1433,
database="testdb",
user="testuser",
password="testpass",
driver_name="ODBC Driver 18 for SQL Server",
trust_server_certificate=True,
encrypt=True,
login_timeout=30,
check_import=False,
)

# Get the connection factory with kwargs and call it
factory_with_kwargs = config._connection_factory_with_kwargs
connection = factory_with_kwargs()

# Verify pyodbc.connect was called with the correct connection string
mock_pyodbc_connect.assert_called_once()
call_args = mock_pyodbc_connect.call_args

# Check the connection string (first argument)
conn_str = call_args[0][0]
expected_parts = [
"DRIVER={ODBC Driver 18 for SQL Server}",
"SERVER=testserver.datawarehouse.fabric.microsoft.com,1433",
"DATABASE=testdb",
"Encrypt=YES",
"TrustServerCertificate=YES",
"Connection Timeout=30",
"UID=testuser",
"PWD=testpass",
]

for part in expected_parts:
assert part in conn_str

# Check autocommit parameter, should default to True for Fabric
assert call_args[1]["autocommit"] is True