-
Notifications
You must be signed in to change notification settings - Fork 238
feat: Add support for Microsoft Fabric Warehouse #4751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 19 commits
b679716
9a6c575
347d3ed
0ff075c
332ea32
585fb7e
1bbe90e
6895570
9c0a2dd
f40fc4d
5cc30ab
d5f7aa7
873d9f5
50fe5e4
ae7197b
3a06c90
15a9603
145b69b
9c37b9e
e9396e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Fabric | ||
|
||
## Local/Built-in Scheduler | ||
**Engine Adapter Type**: `fabric` | ||
|
||
NOTE: Fabric Warehouse is not recommended to be used for the SQLMesh [state connection](../../reference/configuration.md#connections). | ||
|
||
### Installation | ||
#### Microsoft Entra ID / Azure Active Directory Authentication: | ||
``` | ||
pip install "sqlmesh[mssql-odbc]" | ||
``` | ||
|
||
### Connection options | ||
|
||
| Option | Description | Type | Required | | ||
| ----------------- | ------------------------------------------------------------ | :----------: | :------: | | ||
| `type` | Engine type name - must be `fabric` | string | Y | | ||
| `host` | The hostname of the Fabric Warehouse server | string | Y | | ||
| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N | | ||
| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N | | ||
| `port` | The port number of the Fabric Warehouse server | int | N | | ||
| `database` | The target database | string | N | | ||
| `charset` | The character set used for the connection | string | N | | ||
| `timeout` | The query timeout in seconds. Default: no timeout | int | N | | ||
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | | ||
| `appname` | The application name to use for the connection | string | N | | ||
| `conn_properties` | The list of connection properties | list[string] | N | | ||
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | | ||
| `driver` | The driver to use for the connection. Default: pyodbc | string | N | | ||
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | | ||
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,13 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"} | ||
RECOMMENDED_STATE_SYNC_ENGINES = { | ||
"postgres", | ||
"gcp_postgres", | ||
"mysql", | ||
"mssql", | ||
"azuresql", | ||
} | ||
FORBIDDEN_STATE_SYNC_ENGINES = { | ||
# Do not support row-level operations | ||
"spark", | ||
|
@@ -1635,6 +1641,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: | |
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} | ||
|
||
|
||
class FabricConnectionConfig(MSSQLConnectionConfig): | ||
""" | ||
Fabric Connection Configuration. | ||
Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'. | ||
It is recommended to use the 'pyodbc' driver for Fabric. | ||
""" | ||
|
||
type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore | ||
DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore | ||
DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore | ||
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore | ||
georgesittas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
driver: t.Literal["pyodbc"] = "pyodbc" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This driver literal doesn't seem to work. If I don't manually specify it in my Fabric connection config:
Then the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattiasthalen has this been addressed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not in this PR, but it's there in my branch. 4412fc9 Not sure if that's the way to do it |
||
autocommit: t.Optional[bool] = True | ||
|
||
@property | ||
def _engine_adapter(self) -> t.Type[EngineAdapter]: | ||
from sqlmesh.core.engine_adapter.fabric import FabricAdapter | ||
|
||
return FabricAdapter | ||
|
||
@property | ||
def _extra_engine_config(self) -> t.Dict[str, t.Any]: | ||
return { | ||
"database": self.database, | ||
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, | ||
fresioAS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
|
||
class SparkConnectionConfig(ConnectionConfig): | ||
""" | ||
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from __future__ import annotations | ||
|
||
import typing as t | ||
from sqlglot import exp | ||
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter | ||
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery | ||
from sqlmesh.core.engine_adapter.base import EngineAdapter | ||
|
||
if t.TYPE_CHECKING: | ||
from sqlmesh.core._typing import TableName | ||
|
||
|
||
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin | ||
|
||
|
||
class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter): | ||
""" | ||
Adapter for Microsoft Fabric. | ||
""" | ||
|
||
DIALECT = "fabric" | ||
SUPPORTS_INDEXES = False | ||
SUPPORTS_TRANSACTIONS = False | ||
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT | ||
|
||
def _insert_overwrite_by_condition( | ||
self, | ||
table_name: TableName, | ||
source_queries: t.List[SourceQuery], | ||
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, | ||
where: t.Optional[exp.Condition] = None, | ||
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, | ||
**kwargs: t.Any, | ||
) -> None: | ||
""" | ||
Implements the insert overwrite strategy for Fabric using DELETE and INSERT. | ||
|
||
This method is overridden to avoid the MERGE statement from the parent | ||
MSSQLEngineAdapter, which is not fully supported in Fabric. | ||
""" | ||
return EngineAdapter._insert_overwrite_by_condition( | ||
self, | ||
table_name=table_name, | ||
source_queries=source_queries, | ||
columns_to_types=columns_to_types, | ||
where=where, | ||
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT, | ||
**kwargs, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# type: ignore | ||
|
||
import typing as t | ||
|
||
import pytest | ||
from sqlglot import exp, parse_one | ||
|
||
from sqlmesh.core.engine_adapter import FabricAdapter | ||
from tests.core.engine_adapter import to_sql_calls | ||
|
||
pytestmark = [pytest.mark.engine, pytest.mark.fabric] | ||
|
||
|
||
@pytest.fixture | ||
def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter: | ||
return make_mocked_engine_adapter(FabricAdapter) | ||
|
||
|
||
def test_columns(adapter: FabricAdapter): | ||
adapter.cursor.fetchall.return_value = [ | ||
("decimal_ps", "decimal", None, 5, 4), | ||
("decimal", "decimal", None, 18, 0), | ||
("float", "float", None, 53, None), | ||
("char_n", "char", 10, None, None), | ||
("varchar_n", "varchar", 10, None, None), | ||
("nvarchar_max", "nvarchar", -1, None, None), | ||
] | ||
|
||
assert adapter.columns("db.table") == { | ||
"decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect), | ||
"decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect), | ||
"float": exp.DataType.build("float(53)", dialect=adapter.dialect), | ||
"char_n": exp.DataType.build("char(10)", dialect=adapter.dialect), | ||
"varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect), | ||
"nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect), | ||
} | ||
|
||
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA | ||
adapter.cursor.execute.assert_called_once_with( | ||
"""SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" | ||
) | ||
|
||
|
||
def test_table_exists(adapter: FabricAdapter): | ||
adapter.cursor.fetchone.return_value = (1,) | ||
assert adapter.table_exists("db.table") | ||
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA | ||
adapter.cursor.execute.assert_called_once_with( | ||
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" | ||
) | ||
|
||
adapter.cursor.fetchone.return_value = None | ||
assert not adapter.table_exists("db.table") | ||
|
||
|
||
def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): | ||
adapter.insert_overwrite_by_time_partition( | ||
"test_table", | ||
parse_one("SELECT a, b FROM tbl"), | ||
start="2022-01-01", | ||
end="2022-01-02", | ||
time_column="b", | ||
time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")), | ||
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, | ||
) | ||
|
||
# Fabric adapter should use DELETE/INSERT strategy, not MERGE. | ||
assert to_sql_calls(adapter) == [ | ||
"""DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", | ||
"""INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", | ||
] | ||
|
||
|
||
def test_replace_query(adapter: FabricAdapter): | ||
adapter.cursor.fetchone.return_value = (1,) | ||
adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"}) | ||
|
||
# This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT | ||
assert to_sql_calls(adapter) == [ | ||
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""", | ||
"TRUNCATE TABLE [test_table];", | ||
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", | ||
] |
Uh oh!
There was an error while loading. Please reload this page.