-
Notifications
You must be signed in to change notification settings - Fork 238
feat: Add support for Microsoft Fabric Warehouse #4751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
b679716
9a6c575
347d3ed
0ff075c
332ea32
585fb7e
1bbe90e
6895570
9c0a2dd
f40fc4d
5cc30ab
d5f7aa7
873d9f5
50fe5e4
ae7197b
3a06c90
15a9603
145b69b
9c37b9e
ecf3e7b
481c9a7
ec87592
9127bda
deb9321
4412fc9
6ac197e
173e0ac
933a765
e154241
2bdd417
cbe3bdc
0080583
bded0d0
51753f1
55d7314
1f37a4b
5cb0e4f
8253545
6a54905
6f1a575
c2d10a2
cd9f261
1eb623a
5113ef4
d17677e
c89351a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Fabric | ||
|
||
## Local/Built-in Scheduler | ||
**Engine Adapter Type**: `fabric` | ||
|
||
### Installation | ||
#### Microsoft Entra ID / Azure Active Directory Authentication: | ||
``` | ||
pip install "sqlmesh[mssql-odbc]" | ||
``` | ||
|
||
### Connection options | ||
|
||
| Option | Description | Type | Required | | ||
| ----------------- | ------------------------------------------------------------ | :----------: | :------: | | ||
| `type` | Engine type name - must be `fabric` | string | Y | | ||
| `host` | The hostname of the Fabric Warehouse server | string | Y | | ||
| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N | | ||
| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N | | ||
| `port` | The port number of the Fabric Warehouse server | int | N | | ||
| `database` | The target database | string | N | | ||
| `charset` | The character set used for the connection | string | N | | ||
| `timeout` | The query timeout in seconds. Default: no timeout | int | N | | ||
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | | ||
| `appname` | The application name to use for the connection | string | N | | ||
| `conn_properties` | The list of connection properties | list[string] | N | | ||
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | | ||
| `driver` | The driver to use for the connection. Default: pyodbc | string | N | | ||
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | | ||
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,13 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"} | ||
RECOMMENDED_STATE_SYNC_ENGINES = { | ||
"postgres", | ||
"gcp_postgres", | ||
"mysql", | ||
"mssql", | ||
"azuresql", | ||
} | ||
FORBIDDEN_STATE_SYNC_ENGINES = { | ||
# Do not support row-level operations | ||
"spark", | ||
|
@@ -1635,6 +1641,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: | |
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} | ||
|
||
|
||
class FabricConnectionConfig(MSSQLConnectionConfig): | ||
""" | ||
Fabric Connection Configuration. | ||
Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'. | ||
It is recommended to use the 'pyodbc' driver for Fabric. | ||
""" | ||
|
||
type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore | ||
DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore | ||
DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore | ||
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore | ||
georgesittas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
driver: t.Literal["pyodbc"] = "pyodbc" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This driver literal doesn't seem to work. If I don't manually specify it in my Fabric connection config:
Then the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattiasthalen has this been addressed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not in this PR, but it's there in my branch. 4412fc9 Not sure if that's the way to do it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i will update this PR later today There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice to see you back @fresioAS! There's an issue in sqlglot's tsql dialect. The ddl for varchar without precision creates a varchar(1), that affects one of the integration tests. I'm just about to leave for vacation and won't have time to handle it. But I'll be back on Sunday. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattiasthalen Ok. will take a look this week. Summer time. I'm off on vacation again on Monday 😄 |
||
autocommit: t.Optional[bool] = True | ||
|
||
@property | ||
def _engine_adapter(self) -> t.Type[EngineAdapter]: | ||
from sqlmesh.core.engine_adapter.fabric import FabricAdapter | ||
|
||
return FabricAdapter | ||
|
||
@property | ||
def _extra_engine_config(self) -> t.Dict[str, t.Any]: | ||
return { | ||
"database": self.database, | ||
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, | ||
fresioAS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
|
||
class SparkConnectionConfig(ConnectionConfig): | ||
""" | ||
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from __future__ import annotations | ||
|
||
import typing as t | ||
from sqlglot import exp | ||
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter | ||
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery | ||
from sqlmesh.core.engine_adapter.base import EngineAdapter | ||
|
||
if t.TYPE_CHECKING: | ||
from sqlmesh.core._typing import TableName | ||
|
||
|
||
class FabricAdapter(MSSQLEngineAdapter): | ||
fresioAS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Adapter for Microsoft Fabric. | ||
""" | ||
|
||
DIALECT = "fabric" | ||
SUPPORTS_INDEXES = False | ||
SUPPORTS_TRANSACTIONS = False | ||
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT | ||
|
||
def _insert_overwrite_by_condition( | ||
self, | ||
table_name: TableName, | ||
source_queries: t.List[SourceQuery], | ||
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, | ||
where: t.Optional[exp.Condition] = None, | ||
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, | ||
**kwargs: t.Any, | ||
) -> None: | ||
""" | ||
Implements the insert overwrite strategy for Fabric using DELETE and INSERT. | ||
|
||
This method is overridden to avoid the MERGE statement from the parent | ||
MSSQLEngineAdapter, which is not fully supported in Fabric. | ||
""" | ||
return EngineAdapter._insert_overwrite_by_condition( | ||
self, | ||
table_name=table_name, | ||
source_queries=source_queries, | ||
columns_to_types=columns_to_types, | ||
where=where, | ||
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT, | ||
**kwargs, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# type: ignore | ||
|
||
import typing as t | ||
|
||
import pytest | ||
from sqlglot import exp, parse_one | ||
|
||
from sqlmesh.core.engine_adapter import FabricAdapter | ||
from tests.core.engine_adapter import to_sql_calls | ||
|
||
pytestmark = [pytest.mark.engine, pytest.mark.fabric] | ||
|
||
|
||
@pytest.fixture | ||
def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter: | ||
return make_mocked_engine_adapter(FabricAdapter) | ||
|
||
|
||
def test_columns(adapter: FabricAdapter): | ||
adapter.cursor.fetchall.return_value = [ | ||
("decimal_ps", "decimal", None, 5, 4), | ||
("decimal", "decimal", None, 18, 0), | ||
("float", "float", None, 53, None), | ||
("char_n", "char", 10, None, None), | ||
("varchar_n", "varchar", 10, None, None), | ||
("nvarchar_max", "nvarchar", -1, None, None), | ||
] | ||
|
||
assert adapter.columns("db.table") == { | ||
"decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect), | ||
"decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect), | ||
"float": exp.DataType.build("float(53)", dialect=adapter.dialect), | ||
"char_n": exp.DataType.build("char(10)", dialect=adapter.dialect), | ||
"varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect), | ||
"nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect), | ||
} | ||
|
||
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA | ||
adapter.cursor.execute.assert_called_once_with( | ||
"""SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" | ||
) | ||
|
||
|
||
def test_table_exists(adapter: FabricAdapter): | ||
adapter.cursor.fetchone.return_value = (1,) | ||
assert adapter.table_exists("db.table") | ||
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA | ||
adapter.cursor.execute.assert_called_once_with( | ||
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" | ||
) | ||
|
||
adapter.cursor.fetchone.return_value = None | ||
assert not adapter.table_exists("db.table") | ||
|
||
|
||
def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): | ||
adapter.insert_overwrite_by_time_partition( | ||
"test_table", | ||
parse_one("SELECT a, b FROM tbl"), | ||
start="2022-01-01", | ||
end="2022-01-02", | ||
time_column="b", | ||
time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")), | ||
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, | ||
) | ||
|
||
# Fabric adapter should use DELETE/INSERT strategy, not MERGE. | ||
assert to_sql_calls(adapter) == [ | ||
"""DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", | ||
"""INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", | ||
] | ||
|
||
|
||
def test_replace_query(adapter: FabricAdapter): | ||
adapter.cursor.fetchone.return_value = (1,) | ||
adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"}) | ||
|
||
# This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT | ||
assert to_sql_calls(adapter) == [ | ||
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""", | ||
"TRUNCATE TABLE [test_table];", | ||
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", | ||
] |
Uh oh!
There was an error while loading. Please reload this page.