-
Notifications
You must be signed in to change notification settings - Fork 236
feat: Add support for Microsoft Fabric Warehouse #4751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
fresioAS
wants to merge
19
commits into
TobikoData:main
Choose a base branch
from
fresioAS:add_fabric_warehouse
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
b679716
feat: Add support for Microsoft Fabric Waerhouse
fresioAS 9a6c575
removing some print statements
fresioAS 347d3ed
adding dialect & handling temp views
fresioAS 0ff075c
isnan error
fresioAS 332ea32
CTEs no qualify
fresioAS 585fb7e
simplifying
fresioAS 1bbe90e
docs & tests
fresioAS 6895570
connection tests
fresioAS 9c0a2dd
remove table_exist and columns
fresioAS f40fc4d
updated tests
fresioAS 5cc30ab
mypy
fresioAS d5f7aa7
ruff
fresioAS 873d9f5
Merge branch 'main' into add_fabric_warehouse
fresioAS 50fe5e4
Update fabric.md
fresioAS ae7197b
Merge branch 'main' into add_fabric_warehouse
fresioAS 3a06c90
Update sqlmesh/core/engine_adapter/fabric.py
fresioAS 15a9603
Merge branch 'main' into add_fabric_warehouse
fresioAS 145b69b
Update fabric.py
fresioAS 9c37b9e
Merge branch 'main' into add_fabric_warehouse
fresioAS File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Fabric | ||
|
||
## Local/Built-in Scheduler | ||
**Engine Adapter Type**: `fabric` | ||
|
||
NOTE: Fabric Warehouse is not recommended to be used for the SQLMesh [state connection](../../reference/configuration.md#connections). | ||
|
||
### Installation | ||
#### Microsoft Entra ID / Azure Active Directory Authentication: | ||
``` | ||
pip install "sqlmesh[mssql-odbc]" | ||
``` | ||
|
||
### Connection options | ||
|
||
| Option | Description | Type | Required | | ||
| ----------------- | ------------------------------------------------------------ | :----------: | :------: | | ||
| `type` | Engine type name - must be `fabric` | string | Y | | ||
| `host` | The hostname of the Fabric Warehouse server | string | Y | | ||
| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N | | ||
| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N | | ||
| `port` | The port number of the Fabric Warehouse server | int | N | | ||
| `database` | The target database | string | N | | ||
| `charset` | The character set used for the connection | string | N | | ||
| `timeout` | The query timeout in seconds. Default: no timeout | int | N | | ||
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N | | ||
| `appname` | The application name to use for the connection | string | N | | ||
| `conn_properties` | The list of connection properties | list[string] | N | | ||
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N | | ||
| `driver` | The driver to use for the connection. Default: pyodbc | string | N | | ||
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N | | ||
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,13 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"} | ||
RECOMMENDED_STATE_SYNC_ENGINES = { | ||
"postgres", | ||
"gcp_postgres", | ||
"mysql", | ||
"mssql", | ||
"azuresql", | ||
} | ||
FORBIDDEN_STATE_SYNC_ENGINES = { | ||
# Do not support row-level operations | ||
"spark", | ||
|
@@ -1635,6 +1641,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: | |
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY} | ||
|
||
|
||
class FabricConnectionConfig(MSSQLConnectionConfig): | ||
""" | ||
Fabric Connection Configuration. | ||
Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'. | ||
It is recommended to use the 'pyodbc' driver for Fabric. | ||
""" | ||
|
||
type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore | ||
DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore | ||
DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore | ||
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore | ||
driver: t.Literal["pyodbc"] = "pyodbc" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This driver literal doesn't seem to work. If I don't manually specify it in my Fabric connection config:
Then the
|
||
autocommit: t.Optional[bool] = True | ||
|
||
@property | ||
def _engine_adapter(self) -> t.Type[EngineAdapter]: | ||
from sqlmesh.core.engine_adapter.fabric import FabricAdapter | ||
|
||
return FabricAdapter | ||
|
||
@property | ||
def _extra_engine_config(self) -> t.Dict[str, t.Any]: | ||
return { | ||
"database": self.database, | ||
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG, | ||
} | ||
|
||
|
||
class SparkConnectionConfig(ConnectionConfig): | ||
""" | ||
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks. | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from __future__ import annotations | ||
|
||
import typing as t | ||
from sqlglot import exp | ||
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter | ||
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery | ||
from sqlmesh.core.engine_adapter.base import EngineAdapter | ||
|
||
if t.TYPE_CHECKING: | ||
from sqlmesh.core._typing import TableName | ||
|
||
|
||
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin | ||
|
||
|
||
class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter): | ||
""" | ||
Adapter for Microsoft Fabric. | ||
""" | ||
|
||
DIALECT = "fabric" | ||
SUPPORTS_INDEXES = False | ||
SUPPORTS_TRANSACTIONS = False | ||
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT | ||
|
||
def _insert_overwrite_by_condition( | ||
self, | ||
table_name: TableName, | ||
source_queries: t.List[SourceQuery], | ||
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, | ||
where: t.Optional[exp.Condition] = None, | ||
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, | ||
**kwargs: t.Any, | ||
) -> None: | ||
""" | ||
Implements the insert overwrite strategy for Fabric using DELETE and INSERT. | ||
|
||
This method is overridden to avoid the MERGE statement from the parent | ||
MSSQLEngineAdapter, which is not fully supported in Fabric. | ||
""" | ||
return EngineAdapter._insert_overwrite_by_condition( | ||
self, | ||
table_name=table_name, | ||
source_queries=source_queries, | ||
columns_to_types=columns_to_types, | ||
where=where, | ||
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT, | ||
**kwargs, | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# type: ignore | ||
|
||
import typing as t | ||
|
||
import pytest | ||
from sqlglot import exp, parse_one | ||
|
||
from sqlmesh.core.engine_adapter import FabricAdapter | ||
from tests.core.engine_adapter import to_sql_calls | ||
|
||
pytestmark = [pytest.mark.engine, pytest.mark.fabric] | ||
|
||
|
||
@pytest.fixture | ||
def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter: | ||
return make_mocked_engine_adapter(FabricAdapter) | ||
|
||
|
||
def test_columns(adapter: FabricAdapter): | ||
adapter.cursor.fetchall.return_value = [ | ||
("decimal_ps", "decimal", None, 5, 4), | ||
("decimal", "decimal", None, 18, 0), | ||
("float", "float", None, 53, None), | ||
("char_n", "char", 10, None, None), | ||
("varchar_n", "varchar", 10, None, None), | ||
("nvarchar_max", "nvarchar", -1, None, None), | ||
] | ||
|
||
assert adapter.columns("db.table") == { | ||
"decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect), | ||
"decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect), | ||
"float": exp.DataType.build("float(53)", dialect=adapter.dialect), | ||
"char_n": exp.DataType.build("char(10)", dialect=adapter.dialect), | ||
"varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect), | ||
"nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect), | ||
} | ||
|
||
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA | ||
adapter.cursor.execute.assert_called_once_with( | ||
"""SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" | ||
) | ||
|
||
|
||
def test_table_exists(adapter: FabricAdapter): | ||
adapter.cursor.fetchone.return_value = (1,) | ||
assert adapter.table_exists("db.table") | ||
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA | ||
adapter.cursor.execute.assert_called_once_with( | ||
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';""" | ||
) | ||
|
||
adapter.cursor.fetchone.return_value = None | ||
assert not adapter.table_exists("db.table") | ||
|
||
|
||
def test_insert_overwrite_by_time_partition(adapter: FabricAdapter): | ||
adapter.insert_overwrite_by_time_partition( | ||
"test_table", | ||
parse_one("SELECT a, b FROM tbl"), | ||
start="2022-01-01", | ||
end="2022-01-02", | ||
time_column="b", | ||
time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")), | ||
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}, | ||
) | ||
|
||
# Fabric adapter should use DELETE/INSERT strategy, not MERGE. | ||
assert to_sql_calls(adapter) == [ | ||
"""DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", | ||
"""INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""", | ||
] | ||
|
||
|
||
def test_replace_query(adapter: FabricAdapter): | ||
adapter.cursor.fetchone.return_value = (1,) | ||
adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"}) | ||
|
||
# This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT | ||
assert to_sql_calls(adapter) == [ | ||
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""", | ||
"TRUNCATE TABLE [test_table];", | ||
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", | ||
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.