Skip to content

Commit 8debff7

Browse files
committed
PR Feedback 1
1 parent fc2ed0d commit 8debff7

File tree

5 files changed

+29
-25
lines changed

5 files changed

+29
-25
lines changed

sqlmesh/core/context.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,14 @@ def engine_adapter(self) -> EngineAdapter:
448448
@property
449449
def snapshot_evaluator(self) -> SnapshotEvaluator:
450450
if not self._snapshot_evaluator:
451-
self._snapshot_evaluator = self._create_snapshot_evaluator(log_level=logging.INFO)
451+
self._snapshot_evaluator = SnapshotEvaluator(
452+
{
453+
gateway: adapter.with_settings(log_level=logging.INFO)
454+
for gateway, adapter in self.engine_adapters.items()
455+
},
456+
ddl_concurrent_tasks=self.concurrent_tasks,
457+
selected_gateway=self.selected_gateway,
458+
)
452459

453460
return self._snapshot_evaluator
454461

@@ -1909,7 +1916,7 @@ def _table_diff(
19091916
)
19101917

19111918
return TableDiff(
1912-
adapter=adapter.with_settings(logger.getEffectiveLevel()),
1919+
adapter=adapter.with_settings(log_level=logger.getEffectiveLevel()),
19131920
source=source,
19141921
target=target,
19151922
on=on,
@@ -2960,16 +2967,6 @@ def load_model_tests(
29602967

29612968
return model_tests
29622969

2963-
def _create_snapshot_evaluator(self, **kwargs: t.Any) -> SnapshotEvaluator:
2964-
return SnapshotEvaluator(
2965-
{
2966-
gateway: adapter.with_settings(**kwargs)
2967-
for gateway, adapter in self.engine_adapters.items()
2968-
},
2969-
ddl_concurrent_tasks=self.concurrent_tasks,
2970-
selected_gateway=self.selected_gateway,
2971-
)
2972-
29732970

29742971
class Context(GenericContext[Config]):
29752972
CONFIG_TYPE = Config

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ def __init__(
147147
self._multithreaded = multithreaded
148148
self.correlation_id = correlation_id
149149

150-
def with_settings(self, log_level: int = logging.DEBUG, **kwargs: t.Any) -> EngineAdapter:
150+
def with_settings(self, **kwargs: t.Any) -> EngineAdapter:
151151
adapter = self.__class__(
152152
self._connection_pool,
153153
dialect=self.dialect,
154154
sql_gen_kwargs=self._sql_gen_kwargs,
155155
default_catalog=self._default_catalog,
156-
execute_log_level=log_level,
156+
execute_log_level=kwargs.pop("log_level", self._execute_log_level),
157157
register_comments=self._register_comments,
158158
null_connection=self._extra_config.pop("null_connection", True),
159159
multithreaded=self._multithreaded,

sqlmesh/core/plan/evaluator.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,10 @@ def evaluate(
8989
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
9090
) -> None:
9191
self._circuit_breaker = circuit_breaker
92+
self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id(
93+
CorrelationId.from_plan_id(plan.plan_id)
94+
)
9295

93-
self.set_correlation_id(CorrelationId.from_plan_id(plan.plan_id))
9496
self.console.start_plan_evaluation(plan)
9597
analytics.collector.on_plan_apply_start(
9698
plan=plan,
@@ -350,13 +352,6 @@ def visit_finalize_environment_stage(
350352
) -> None:
351353
self.state_sync.finalize(plan.environment)
352354

353-
def set_correlation_id(self, correlation_id: CorrelationId) -> None:
354-
for key, adapter in self.snapshot_evaluator.adapters.items():
355-
if correlation_id != adapter.correlation_id:
356-
self.snapshot_evaluator.adapters[key] = adapter.with_settings(
357-
correlation_id=correlation_id
358-
)
359-
360355
def _promote_snapshots(
361356
self,
362357
plan: EvaluatablePlan,

sqlmesh/core/snapshot/evaluator.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
SnapshotTableCleanupTask,
6262
)
6363
from sqlmesh.core.snapshot.definition import parent_snapshots_by_name
64-
from sqlmesh.utils import random_id
64+
from sqlmesh.utils import random_id, CorrelationId
6565
from sqlmesh.utils.concurrency import (
6666
concurrent_apply_to_snapshots,
6767
concurrent_apply_to_values,
@@ -1190,6 +1190,18 @@ def _execute_create(
11901190
)
11911191
adapter.execute(snapshot.model.render_post_statements(**create_render_kwargs))
11921192

1193+
def set_correlation_id(self, correlation_id: CorrelationId) -> SnapshotEvaluator:
1194+
return SnapshotEvaluator(
1195+
{
1196+
gateway: adapter.with_settings(
1197+
log_level=adapter._execute_log_level, correlation_id=correlation_id
1198+
)
1199+
for gateway, adapter in self.adapters.items()
1200+
},
1201+
self.ddl_concurrent_tasks,
1202+
self.selected_gateway,
1203+
)
1204+
11931205

11941206
def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy:
11951207
klass: t.Type

tests/core/test_table_diff.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,9 @@ def test_generated_sql(sushi_context_fixed_date: Context, mocker: MockerFixture)
337337

338338
# make with_settings() return the current instance of engine_adapter so we can still spy on _execute
339339
mocker.patch.object(
340-
engine_adapter, "with_settings", new_callable=lambda: lambda _: engine_adapter
340+
engine_adapter, "with_settings", new_callable=lambda: lambda **kwargs: engine_adapter
341341
)
342-
assert engine_adapter.with_settings(1) == engine_adapter
342+
assert engine_adapter.with_settings() == engine_adapter
343343

344344
spy_execute = mocker.spy(engine_adapter, "_execute")
345345
mocker.patch("sqlmesh.core.engine_adapter.base.random_id", return_value="abcdefgh")

0 commit comments

Comments
 (0)