Skip to content

Commit 39caa0f

Browse files
authored
Feat: Tag BigQuery queries with their correlation ID as label (#4861)
1 parent e2a406f commit 39caa0f

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ def _job_params(self) -> t.Dict[str, t.Any]:
134134
}
135135
if self._extra_config.get("maximum_bytes_billed"):
136136
params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed")
137+
if self.correlation_id:
138+
# BigQuery label keys must be lowercase
139+
key = self.correlation_id.job_type.value.lower()
140+
params["labels"] = {key: self.correlation_id.job_id}
137141
return params
138142

139143
@property
@@ -204,6 +208,11 @@ def _begin_session(self, properties: SessionProperties) -> None:
204208
"Invalid value for `session_properties.query_label`. Must be an array or tuple."
205209
)
206210

211+
if self.correlation_id:
212+
parsed_query_label.append(
213+
(self.correlation_id.job_type.value.lower(), self.correlation_id.job_id)
214+
)
215+
207216
if parsed_query_label:
208217
query_label_str = ",".join([":".join(label) for label in parsed_query_label])
209218
query = f'SET @@query_label = "{query_label_str}";SELECT 1;'

tests/core/engine_adapter/integration/test_integration_bigquery.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
from sqlmesh.core.engine_adapter.shared import DataObject
1212
import sqlmesh.core.dialect as d
1313
from sqlmesh.core.model import SqlModel, load_sql_based_model
14-
from sqlmesh.core.plan import Plan
14+
from sqlmesh.core.plan import Plan, BuiltInPlanEvaluator
1515
from sqlmesh.core.table_diff import TableDiff
16+
from sqlmesh.utils import CorrelationId
1617
from tests.core.engine_adapter.integration import TestContext
1718
from pytest import FixtureRequest
1819
from tests.core.engine_adapter.integration import (
@@ -447,3 +448,33 @@ def test_materialized_view_evaluation(ctx: TestContext, engine_adapter: BigQuery
447448

448449
df = engine_adapter.fetchdf(f"SELECT * FROM {mview_name.sql(dialect=ctx.dialect)}")
449450
assert df["col"][0] == 2
451+
452+
453+
def test_correlation_id_in_job_labels(ctx: TestContext):
454+
model_name = ctx.table("test")
455+
456+
sqlmesh = ctx.create_context()
457+
sqlmesh.upsert_model(
458+
load_sql_based_model(d.parse(f"MODEL (name {model_name}, kind FULL); SELECT 1 AS col"))
459+
)
460+
461+
# Create a plan evaluator and a plan to evaluate
462+
plan_evaluator = BuiltInPlanEvaluator(
463+
sqlmesh.state_sync,
464+
sqlmesh.snapshot_evaluator,
465+
sqlmesh.create_scheduler,
466+
sqlmesh.default_catalog,
467+
)
468+
plan: Plan = sqlmesh.plan_builder("prod", skip_tests=True).build()
469+
470+
# Evaluate the plan and retrieve the plan evaluator's adapter
471+
plan_evaluator.evaluate(plan.to_evaluatable())
472+
adapter = t.cast(BigQueryEngineAdapter, plan_evaluator.snapshot_evaluator.adapter)
473+
474+
# Case 1: Ensure that the correlation id is set in the underlying adapter
475+
assert adapter.correlation_id is not None
476+
477+
# Case 2: Ensure that the correlation id is set in the job labels
478+
labels = adapter._job_params.get("labels")
479+
correlation_id = CorrelationId.from_plan_id(plan.plan_id)
480+
assert labels == {correlation_id.job_type.value.lower(): correlation_id.job_id}

0 commit comments

Comments
 (0)