Skip to content

Commit da8f70b

Browse files
committed
chore: update to version and add session mode support
This commit updates the Databricks adapter to version 1.10.15+session, introducing support for session mode execution. Key changes include: - Added `DatabricksSessionHandle` and `SessionCursorWrapper` for handling SparkSession-based execution. - Enhanced `DatabricksCredentials` to manage connection methods and validate session mode configurations. - Updated connection management to support session mode, including automatic selection of submission methods for Python models. - Improve SparkSession retrieval in Databricks adapter. This commit enhances the `DatabricksSessionHandle` and `SessionPythonJobHelper` classes to improve the retrieval of the existing SparkSession. It introduces multiple methods to obtain the SparkSession, ensuring compatibility with various Databricks environments. Additionally, it refactors method signatures for consistency and readability.
1 parent ec229f8 commit da8f70b

9 files changed

Lines changed: 1508 additions & 248 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version = "1.11.4"
1+
version = "1.10.15-5"

dbt/adapters/databricks/connections.py

Lines changed: 184 additions & 83 deletions
Large diffs are not rendered by default.

dbt/adapters/databricks/credentials.py

Lines changed: 161 additions & 57 deletions
Large diffs are not rendered by default.

dbt/adapters/databricks/handle.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,47 @@ def translate_bindings(bindings: Optional[Sequence[Any]]) -> Optional[Sequence[A
288288
return list(map(lambda x: float(x) if isinstance(x, decimal.Decimal) else x, bindings))
289289
return None
290290

291+
@staticmethod
292+
def format_bindings_for_sql(bindings: Optional[Sequence[Any]]) -> Optional[Sequence[str]]:
293+
"""
294+
Format bindings as SQL literals for string substitution in session mode.
295+
296+
This method properly quotes string values and handles special cases to ensure
297+
SQL injection safety and correct SQL syntax. Used when executing SQL via
298+
SparkSession.sql() which doesn't support parameterized queries.
299+
300+
Args:
301+
bindings: Sequence of binding values (strings, numbers, None, etc.)
302+
303+
Returns:
304+
Sequence of SQL literal strings, or None if bindings is None/empty
305+
"""
306+
if not bindings:
307+
return None
308+
309+
formatted = []
310+
for value in bindings:
311+
if value is None:
312+
formatted.append("NULL")
313+
elif isinstance(value, bool):
314+
formatted.append("TRUE" if value else "FALSE")
315+
elif isinstance(value, str):
316+
# Escape single quotes by doubling them, then wrap in quotes
317+
escaped = value.replace("'", "''")
318+
formatted.append(f"'{escaped}'")
319+
elif isinstance(value, (int, float, decimal.Decimal)):
320+
# Numbers don't need quotes
321+
if isinstance(value, decimal.Decimal):
322+
formatted.append(str(float(value)))
323+
else:
324+
formatted.append(str(value))
325+
else:
326+
# For other types, convert to string and quote
327+
escaped = str(value).replace("'", "''")
328+
formatted.append(f"'{escaped}'")
329+
330+
return formatted
331+
291332
@staticmethod
292333
def clean_sql(sql: str) -> str:
293334
cleaned = sql.strip()

dbt/adapters/databricks/impl.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
AllPurposeClusterPythonJobHelper,
5858
JobClusterPythonJobHelper,
5959
ServerlessClusterPythonJobHelper,
60+
SessionPythonJobHelper,
6061
WorkflowPythonJobHelper,
6162
)
6263
from dbt.adapters.databricks.relation import (
@@ -801,6 +802,7 @@ def python_submission_helpers(self) -> dict[str, type[PythonJobHelper]]:
801802
"all_purpose_cluster": AllPurposeClusterPythonJobHelper,
802803
"serverless_cluster": ServerlessClusterPythonJobHelper,
803804
"workflow_job": WorkflowPythonJobHelper,
805+
"session": SessionPythonJobHelper,
804806
}
805807

806808
@log_code_execution
@@ -809,6 +811,17 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str) -> AdapterRe
809811
"user_folder_for_python",
810812
self.behavior.use_user_folder_for_python.setting, # type: ignore[attr-defined]
811813
)
814+
815+
# Auto-select session submission when in session mode
816+
from dbt.adapters.databricks.credentials import DatabricksCredentials
817+
from dbt.adapters.databricks.logging import logger
818+
819+
creds = cast(DatabricksCredentials, self.config.credentials)
820+
if creds.is_session_mode:
821+
if parsed_model["config"].get("submission_method") is None:
822+
parsed_model["config"]["submission_method"] = "session"
823+
logger.debug("Auto-selected 'session' submission method for Python model")
824+
812825
return super().submit_python_job(parsed_model, compiled_code)
813826

814827
@available

0 commit comments

Comments
 (0)