Skip to content

Commit 61455f2

Browse files
authored
Feat: Allow specifying a minimum number of intervals to include for each model in a plan (#4780)
1 parent 6c98222 commit 61455f2

File tree

15 files changed

+583
-81
lines changed

15 files changed

+583
-81
lines changed

examples/multi/repo_1/linter/__init__.py

Whitespace-only changes.

sqlmesh/cli/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
520520
help="Explain the plan instead of applying it.",
521521
default=None,
522522
)
523+
@click.option(
524+
"--min-intervals",
525+
default=0,
526+
help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date",
527+
)
523528
@opt.verbose
524529
@click.pass_context
525530
@error_handler

sqlmesh/core/console.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,8 +2073,8 @@ def _prompt_backfill(
20732073
if not plan_builder.override_end:
20742074
if plan.provided_end:
20752075
blank_meaning = f"'{time_like_to_str(plan.provided_end)}'"
2076-
elif plan.interval_end_per_model:
2077-
max_end = max(plan.interval_end_per_model.values())
2076+
elif plan.end_override_per_model:
2077+
max_end = max(plan.end_override_per_model.values())
20782078
blank_meaning = f"'{time_like_to_str(max_end)}'"
20792079
else:
20802080
blank_meaning = "now"

sqlmesh/core/context.py

Lines changed: 113 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from pathlib import Path
4747
from shutil import rmtree
4848
from types import MappingProxyType
49+
from datetime import datetime
4950

5051
from sqlglot import Dialect, exp
5152
from sqlglot.helper import first
@@ -125,6 +126,8 @@
125126
format_tz_datetime,
126127
now_timestamp,
127128
now,
129+
to_datetime,
130+
make_exclusive,
128131
)
129132
from sqlmesh.utils.errors import (
130133
CircuitBreakerError,
@@ -1222,6 +1225,7 @@ def plan(
12221225
diff_rendered: t.Optional[bool] = None,
12231226
skip_linter: t.Optional[bool] = None,
12241227
explain: t.Optional[bool] = None,
1228+
min_intervals: t.Optional[int] = None,
12251229
) -> Plan:
12261230
"""Interactively creates a plan.
12271231
@@ -1268,6 +1272,8 @@ def plan(
12681272
diff_rendered: Whether the diff should compare raw vs rendered models
12691273
skip_linter: Linter runs by default so this will skip it if enabled
12701274
explain: Whether to explain the plan instead of applying it.
1275+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
1276+
on every model when checking for missing intervals
12711277
12721278
Returns:
12731279
The populated Plan object.
@@ -1296,6 +1302,7 @@ def plan(
12961302
diff_rendered=diff_rendered,
12971303
skip_linter=skip_linter,
12981304
explain=explain,
1305+
min_intervals=min_intervals,
12991306
)
13001307

13011308
plan = plan_builder.build()
@@ -1345,6 +1352,7 @@ def plan_builder(
13451352
diff_rendered: t.Optional[bool] = None,
13461353
skip_linter: t.Optional[bool] = None,
13471354
explain: t.Optional[bool] = None,
1355+
min_intervals: t.Optional[int] = None,
13481356
) -> PlanBuilder:
13491357
"""Creates a plan builder.
13501358
@@ -1381,6 +1389,8 @@ def plan_builder(
13811389
enable_preview: Indicates whether to enable preview for forward-only models in development environments.
13821390
run: Whether to run latest intervals as part of the plan application.
13831391
diff_rendered: Whether the diff should compare raw vs rendered models
1392+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
1393+
on every model when checking for missing intervals
13841394
13851395
Returns:
13861396
The plan builder.
@@ -1408,6 +1418,7 @@ def plan_builder(
14081418
"run": run,
14091419
"diff_rendered": diff_rendered,
14101420
"skip_linter": skip_linter,
1421+
"min_intervals": min_intervals,
14111422
}
14121423
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
14131424
k: v for k, v in kwargs.items() if v is not None
@@ -1530,6 +1541,16 @@ def plan_builder(
15301541
# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
15311542
self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values())
15321543

1544+
start_override_per_model = self._calculate_start_override_per_model(
1545+
min_intervals,
1546+
start or default_start,
1547+
end or default_end,
1548+
execution_time or now(),
1549+
backfill_models,
1550+
snapshots,
1551+
max_interval_end_per_model,
1552+
)
1553+
15331554
return self.PLAN_BUILDER_TYPE(
15341555
context_diff=context_diff,
15351556
start=start,
@@ -1560,7 +1581,8 @@ def plan_builder(
15601581
),
15611582
end_bounded=not run,
15621583
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1563-
interval_end_per_model=max_interval_end_per_model,
1584+
start_override_per_model=start_override_per_model,
1585+
end_override_per_model=max_interval_end_per_model,
15641586
console=self.console,
15651587
user_provided_flags=user_provided_flags,
15661588
explain=explain or False,
@@ -2850,15 +2872,15 @@ def _plan_preview_enabled(self) -> bool:
28502872
def _get_plan_default_start_end(
28512873
self,
28522874
snapshots: t.Dict[str, Snapshot],
2853-
max_interval_end_per_model: t.Dict[str, int],
2875+
max_interval_end_per_model: t.Dict[str, datetime],
28542876
backfill_models: t.Optional[t.Set[str]],
28552877
modified_model_names: t.Set[str],
28562878
execution_time: t.Optional[TimeLike] = None,
28572879
) -> t.Tuple[t.Optional[int], t.Optional[int]]:
28582880
if not max_interval_end_per_model:
28592881
return None, None
28602882

2861-
default_end = max(max_interval_end_per_model.values())
2883+
default_end = to_timestamp(max(max_interval_end_per_model.values()))
28622884
default_start: t.Optional[int] = None
28632885
# Infer the default start by finding the smallest interval start that corresponds to the default end.
28642886
for model_name in backfill_models or modified_model_names or max_interval_end_per_model:
@@ -2887,19 +2909,101 @@ def _get_plan_default_start_end(
28872909

28882910
return default_start, default_end
28892911

2912+
def _calculate_start_override_per_model(
2913+
self,
2914+
min_intervals: t.Optional[int],
2915+
plan_start: t.Optional[TimeLike],
2916+
plan_end: t.Optional[TimeLike],
2917+
plan_execution_time: TimeLike,
2918+
backfill_model_fqns: t.Optional[t.Set[str]],
2919+
snapshots_by_model_fqn: t.Dict[str, Snapshot],
2920+
end_override_per_model: t.Optional[t.Dict[str, datetime]],
2921+
) -> t.Dict[str, datetime]:
2922+
if not min_intervals or not backfill_model_fqns or not plan_start:
2923+
# If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number
2924+
# If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number
2925+
# If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis
2926+
return {}
2927+
2928+
start_overrides: t.Dict[str, datetime] = {}
2929+
end_override_per_model = end_override_per_model or {}
2930+
2931+
plan_execution_time_dt = to_datetime(plan_execution_time)
2932+
plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt)
2933+
plan_end_dt = to_datetime(
2934+
plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt
2935+
)
2936+
2937+
# we need to take the DAG into account so that parent models can be expanded to cover at least as much as their children
2938+
# for example, A(hourly) <- B(daily)
2939+
# if min_intervals=1, A would have 1 hour and B would have 1 day
2940+
# but B depends on A so in order for B to have 1 valid day, A needs to be expanded to 24 hours
2941+
backfill_dag: DAG[str] = DAG()
2942+
for fqn in backfill_model_fqns:
2943+
backfill_dag.add(
2944+
fqn,
2945+
[
2946+
p.name
2947+
for p in snapshots_by_model_fqn[fqn].parents
2948+
if p.name in backfill_model_fqns
2949+
],
2950+
)
2951+
2952+
# start from the leaf nodes and work back towards the root because the min_start at the root node is determined by the calculated starts in the leaf nodes
2953+
reversed_dag = backfill_dag.reversed
2954+
graph = reversed_dag.graph
2955+
2956+
for model_fqn in reversed_dag:
2957+
# Get the earliest start from all immediate children of this snapshot
2958+
# this works because topological ordering guarantees that they've already been visited
2959+
# and we always set a start override
2960+
min_child_start = min(
2961+
[start_overrides[immediate_child_fqn] for immediate_child_fqn in graph[model_fqn]],
2962+
default=plan_start_dt,
2963+
)
2964+
2965+
snapshot = snapshots_by_model_fqn.get(model_fqn)
2966+
2967+
if not snapshot:
2968+
continue
2969+
2970+
starting_point = end_override_per_model.get(model_fqn, plan_end_dt)
2971+
if node_end := snapshot.node.end:
2972+
# if we dont do this, if the node end is a *date* (as opposed to a timestamp)
2973+
# we end up incorrectly winding back an extra day
2974+
node_end_dt = make_exclusive(node_end)
2975+
2976+
if node_end_dt < plan_end_dt:
2977+
# if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals
2978+
# instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid
2979+
starting_point = node_end_dt
2980+
2981+
snapshot_start = snapshot.node.cron_floor(starting_point)
2982+
2983+
for _ in range(min_intervals):
2984+
# wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date
2985+
snapshot_start = snapshot.node.cron_prev(snapshot_start)
2986+
2987+
start_overrides[model_fqn] = min(min_child_start, snapshot_start)
2988+
2989+
return start_overrides
2990+
28902991
def _get_max_interval_end_per_model(
28912992
self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]]
2892-
) -> t.Dict[str, int]:
2993+
) -> t.Dict[str, datetime]:
28932994
models_for_interval_end = (
28942995
self._get_models_for_interval_end(snapshots, backfill_models)
28952996
if backfill_models is not None
28962997
else None
28972998
)
2898-
return self.state_sync.max_interval_end_per_model(
2899-
c.PROD,
2900-
models=models_for_interval_end,
2901-
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
2902-
)
2999+
return {
3000+
model_fqn: to_datetime(ts)
3001+
for model_fqn, ts in self.state_sync.max_interval_end_per_model(
3002+
c.PROD,
3003+
models=models_for_interval_end,
3004+
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
3005+
).items()
3006+
}
29033007

29043008
@staticmethod
29053009
def _get_models_for_interval_end(

sqlmesh/core/node.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class IntervalUnit(str, Enum):
3131
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
3232
based on the cron schedule of a node. The minimum time delta between a sample set of dates
3333
is used to determine which unit a node's schedule is.
34+
35+
It's designed to align with common partitioning schemes, hence why there is no WEEK unit
36+
because generally tables are not partitioned by week
3437
"""
3538

3639
YEAR = "year"

sqlmesh/core/plan/builder.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import typing as t
66
from collections import defaultdict
77
from functools import cached_property
8+
from datetime import datetime
89

910

1011
from sqlmesh.core.console import PlanBuilderConsole, get_console
@@ -85,7 +86,8 @@ class PlanBuilder:
8586
ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
8687
environment state, or to use whatever snapshots are in the current environment state even if
8788
the environment is not finalized.
88-
interval_end_per_model: The mapping from model FQNs to target end dates.
89+
start_override_per_model: A mapping of model FQNs to target start dates.
90+
end_override_per_model: A mapping of model FQNs to target end dates.
8991
explain: Whether to explain the plan instead of applying it.
9092
"""
9193

@@ -117,7 +119,8 @@ def __init__(
117119
end_bounded: bool = False,
118120
ensure_finalized_snapshots: bool = False,
119121
explain: bool = False,
120-
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
122+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
123+
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
121124
console: t.Optional[PlanBuilderConsole] = None,
122125
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
123126
):
@@ -133,7 +136,8 @@ def __init__(
133136
self._enable_preview = enable_preview
134137
self._end_bounded = end_bounded
135138
self._ensure_finalized_snapshots = ensure_finalized_snapshots
136-
self._interval_end_per_model = interval_end_per_model
139+
self._start_override_per_model = start_override_per_model
140+
self._end_override_per_model = end_override_per_model
137141
self._environment_ttl = environment_ttl
138142
self._categorizer_config = categorizer_config or CategorizerConfig()
139143
self._auto_categorization_enabled = auto_categorization_enabled
@@ -280,7 +284,11 @@ def build(self) -> Plan:
280284
self._adjust_new_snapshot_intervals()
281285

282286
deployability_index = (
283-
DeployabilityIndex.create(self._context_diff.snapshots.values(), start=self._start)
287+
DeployabilityIndex.create(
288+
self._context_diff.snapshots.values(),
289+
start=self._start,
290+
start_override_per_model=self._start_override_per_model,
291+
)
284292
if self._is_dev
285293
else DeployabilityIndex.all_deployable()
286294
)
@@ -291,11 +299,11 @@ def build(self) -> Plan:
291299
)
292300
models_to_backfill = self._build_models_to_backfill(dag, restatements)
293301

294-
interval_end_per_model = self._interval_end_per_model
295-
if interval_end_per_model and self.override_end:
302+
end_override_per_model = self._end_override_per_model
303+
if end_override_per_model and self.override_end:
296304
# If the end date was provided explicitly by a user, then interval end for each individual
297305
# model should be ignored.
298-
interval_end_per_model = None
306+
end_override_per_model = None
299307

300308
# this deliberately uses the passed in self._execution_time and not self.execution_time cached property
301309
# the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
@@ -322,7 +330,8 @@ def build(self) -> Plan:
322330
indirectly_modified=indirectly_modified,
323331
deployability_index=deployability_index,
324332
restatements=restatements,
325-
interval_end_per_model=interval_end_per_model,
333+
start_override_per_model=self._start_override_per_model,
334+
end_override_per_model=end_override_per_model,
326335
selected_models_to_backfill=self._backfill_models,
327336
models_to_backfill=models_to_backfill,
328337
effective_from=self._effective_from,

sqlmesh/core/plan/definition.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class Plan(PydanticModel, frozen=True):
5757

5858
deployability_index: DeployabilityIndex
5959
restatements: t.Dict[SnapshotId, Interval]
60-
interval_end_per_model: t.Optional[t.Dict[str, int]]
60+
start_override_per_model: t.Optional[t.Dict[str, datetime]]
61+
end_override_per_model: t.Optional[t.Dict[str, datetime]]
6162

6263
selected_models_to_backfill: t.Optional[t.Set[str]] = None
6364
"""Models that have been explicitly selected for backfill by a user."""
@@ -177,7 +178,8 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
177178
execution_time=self.execution_time,
178179
restatements=self.restatements,
179180
deployability_index=self.deployability_index,
180-
interval_end_per_model=self.interval_end_per_model,
181+
start_override_per_model=self.start_override_per_model,
182+
end_override_per_model=self.end_override_per_model,
181183
end_bounded=self.end_bounded,
182184
).items()
183185
if snapshot.is_model and missing
@@ -265,7 +267,8 @@ def to_evaluatable(self) -> EvaluatablePlan:
265267
removed_snapshots=sorted(self.context_diff.removed_snapshots),
266268
requires_backfill=self.requires_backfill,
267269
models_to_backfill=self.models_to_backfill,
268-
interval_end_per_model=self.interval_end_per_model,
270+
start_override_per_model=self.start_override_per_model,
271+
end_override_per_model=self.end_override_per_model,
269272
execution_time=self.execution_time,
270273
disabled_restatement_models={
271274
s.name
@@ -303,7 +306,8 @@ class EvaluatablePlan(PydanticModel):
303306
removed_snapshots: t.List[SnapshotId]
304307
requires_backfill: bool
305308
models_to_backfill: t.Optional[t.Set[str]] = None
306-
interval_end_per_model: t.Optional[t.Dict[str, int]] = None
309+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None
310+
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None
307311
execution_time: t.Optional[TimeLike] = None
308312
disabled_restatement_models: t.Set[str]
309313
environment_statements: t.Optional[t.List[EnvironmentStatements]] = None

sqlmesh/core/plan/evaluator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ def visit_audit_only_run_stage(
256256
plan.end,
257257
execution_time=plan.execution_time,
258258
end_bounded=plan.end_bounded,
259-
interval_end_per_model=plan.interval_end_per_model,
259+
start_override_per_model=plan.start_override_per_model,
260+
end_override_per_model=plan.end_override_per_model,
260261
)
261262

262263
if completion_status.is_failure:

sqlmesh/core/plan/stages.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,8 @@ def _missing_intervals(
524524
},
525525
deployability_index=deployability_index,
526526
end_bounded=plan.end_bounded,
527-
interval_end_per_model=plan.interval_end_per_model,
527+
start_override_per_model=plan.start_override_per_model,
528+
end_override_per_model=plan.end_override_per_model,
528529
)
529530

530531
def _get_audit_only_snapshots(

0 commit comments

Comments
 (0)