41
41
from sqlmesh .utils .concurrency import NodeExecutionFailedError
42
42
from sqlmesh .utils .errors import PlanError , SQLMeshError
43
43
from sqlmesh .utils .dag import DAG
44
- from sqlmesh .utils import CorrelationId
45
44
from sqlmesh .utils .date import now
46
45
47
46
logger = logging .getLogger (__name__ )
@@ -73,7 +72,7 @@ class BuiltInPlanEvaluator(PlanEvaluator):
73
72
def __init__ (
74
73
self ,
75
74
state_sync : StateSync ,
76
- create_scheduler : t .Callable [[t .Iterable [Snapshot ], t . Optional [ CorrelationId ] ], Scheduler ],
75
+ create_scheduler : t .Callable [[t .Iterable [Snapshot ], SnapshotEvaluator ], Scheduler ],
77
76
default_catalog : t .Optional [str ],
78
77
console : t .Optional [Console ] = None ,
79
78
):
@@ -231,9 +230,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
231
230
self .console .log_success ("SKIP: No model batches to execute" )
232
231
return
233
232
234
- scheduler = self .create_scheduler (
235
- stage .all_snapshots .values (), CorrelationId .from_plan_id (plan .plan_id )
236
- )
233
+ scheduler = self .create_scheduler (stage .all_snapshots .values (), self .snapshot_evaluator )
237
234
errors , _ = scheduler .run_merged_intervals (
238
235
merged_intervals = stage .snapshot_to_intervals ,
239
236
deployability_index = stage .deployability_index ,
@@ -254,7 +251,7 @@ def visit_audit_only_run_stage(
254
251
return
255
252
256
253
# If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
257
- scheduler = self .create_scheduler (audit_snapshots , CorrelationId . from_plan_id ( plan . plan_id ) )
254
+ scheduler = self .create_scheduler (audit_snapshots , self . snapshot_evaluator )
258
255
completion_status = scheduler .audit (
259
256
plan .environment ,
260
257
plan .start ,
0 commit comments