Skip to content

Commit 054afe5

Browse files
authored
Chore: Update the plan evaluator to use stages (#4732)
1 parent 509d870 commit 054afe5

File tree

11 files changed

+1231
-559
lines changed

11 files changed

+1231
-559
lines changed

sqlmesh/core/plan/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,5 @@
88
from sqlmesh.core.plan.evaluator import (
99
BuiltInPlanEvaluator as BuiltInPlanEvaluator,
1010
PlanEvaluator as PlanEvaluator,
11-
update_intervals_for_new_snapshots as update_intervals_for_new_snapshots,
1211
)
1312
from sqlmesh.core.plan.explainer import PlanExplainer as PlanExplainer

sqlmesh/core/plan/evaluator.py

Lines changed: 193 additions & 362 deletions
Large diffs are not rendered by default.

sqlmesh/core/plan/explainer.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from sqlmesh.core.snapshot.definition import (
1818
SnapshotInfoMixin,
1919
)
20-
from sqlmesh.utils import Verbosity, rich as srich
20+
from sqlmesh.utils import Verbosity, rich as srich, to_snake_case
2121
from sqlmesh.utils.date import to_ts
2222
from sqlmesh.utils.errors import SQLMeshError
2323

@@ -73,7 +73,7 @@ def __init__(
7373
def explain(self, stages: t.List[stages.PlanStage]) -> None:
7474
tree = Tree("[bold]Explained plan[/bold]")
7575
for stage in stages:
76-
handler_name = f"visit_{_to_snake_case(stage.__class__.__name__)}"
76+
handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}"
7777
if not hasattr(self, handler_name):
7878
logger.error("Unexpected stage: %s", stage.__class__.__name__)
7979
continue
@@ -97,6 +97,9 @@ def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateSta
9797
"[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]"
9898
)
9999
for snapshot in stage.snapshots:
100+
if snapshot.snapshot_id not in stage.snapshots_with_missing_intervals:
101+
continue
102+
100103
is_deployable = (
101104
stage.deployability_index.is_deployable(snapshot)
102105
if self.environment_naming_info.name != c.PROD
@@ -114,7 +117,9 @@ def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateSta
114117

115118
if snapshot.is_view:
116119
create_tree = Tree("Create view if it doesn't exist")
117-
elif snapshot.is_forward_only and snapshot.previous_versions:
120+
elif (
121+
snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed
122+
):
118123
prod_table = snapshot.table_name(True)
119124
create_tree = Tree(
120125
f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist"
@@ -224,7 +229,7 @@ def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage
224229
"[bold]Delete views in the virtual layer for models that were removed[/bold]"
225230
)
226231
for snapshot in stage.demoted_snapshots:
227-
display_name = self._display_name(snapshot)
232+
display_name = self._display_name(snapshot, stage.demoted_environment_naming_info)
228233
demote_tree.add(display_name)
229234

230235
if stage.promoted_snapshots:
@@ -233,14 +238,31 @@ def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage
233238
tree.add(self._limit_tree(demote_tree))
234239
return tree
235240

241+
def visit_create_snapshot_records_stage(
242+
self, stage: stages.CreateSnapshotRecordsStage
243+
) -> t.Optional[Tree]:
244+
return None
245+
236246
def visit_environment_record_update_stage(
237247
self, stage: stages.EnvironmentRecordUpdateStage
238248
) -> t.Optional[Tree]:
239249
return None
240250

241-
def _display_name(self, snapshot: SnapshotInfoMixin) -> str:
251+
def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]:
252+
return None
253+
254+
def visit_finalize_environment_stage(
255+
self, stage: stages.FinalizeEnvironmentStage
256+
) -> t.Optional[Tree]:
257+
return None
258+
259+
def _display_name(
260+
self,
261+
snapshot: SnapshotInfoMixin,
262+
environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
263+
) -> str:
242264
return snapshot.display_name(
243-
self.environment_naming_info,
265+
environment_naming_info or self.environment_naming_info,
244266
self.default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
245267
dialect=self.dialect,
246268
)
@@ -273,9 +295,3 @@ def _get_explainer_console(
273295
verbosity=console.verbosity,
274296
console=console.console,
275297
)
276-
277-
278-
def _to_snake_case(name: str) -> str:
279-
return "".join(
280-
f"_{c.lower()}" if c.isupper() and idx != 0 else c.lower() for idx, c in enumerate(name)
281-
)

0 commit comments

Comments
 (0)