Skip to content

Commit 0ed5945

Browse files
committed
Feat: Allow specifying a minimum number of intervals per model to include in a plan
1 parent f43a4c3 commit 0ed5945

File tree

12 files changed

+324
-10
lines changed

12 files changed

+324
-10
lines changed

examples/multi/repo_1/linter/__init__.py

Whitespace-only changes.

sqlmesh/cli/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
513513
help="Explain the plan instead of applying it.",
514514
default=None,
515515
)
516+
@click.option(
517+
"--min-intervals",
518+
default=0,
519+
help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date",
520+
)
516521
@opt.verbose
517522
@click.pass_context
518523
@error_handler

sqlmesh/core/context.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from pathlib import Path
4747
from shutil import rmtree
4848
from types import MappingProxyType
49+
from datetime import datetime
4950

5051
from sqlglot import Dialect, exp
5152
from sqlglot.helper import first
@@ -126,6 +127,8 @@
126127
format_tz_datetime,
127128
now_timestamp,
128129
now,
130+
to_datetime,
131+
make_exclusive,
129132
)
130133
from sqlmesh.utils.errors import (
131134
CircuitBreakerError,
@@ -1215,6 +1218,7 @@ def plan(
12151218
diff_rendered: t.Optional[bool] = None,
12161219
skip_linter: t.Optional[bool] = None,
12171220
explain: t.Optional[bool] = None,
1221+
min_intervals: t.Optional[int] = None,
12181222
) -> Plan:
12191223
"""Interactively creates a plan.
12201224
@@ -1261,6 +1265,8 @@ def plan(
12611265
diff_rendered: Whether the diff should compare raw vs rendered models
12621266
skip_linter: Linter runs by default so this will skip it if enabled
12631267
explain: Whether to explain the plan instead of applying it.
1268+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
1269+
on every model when checking for missing intervals
12641270
12651271
Returns:
12661272
The populated Plan object.
@@ -1289,6 +1295,7 @@ def plan(
12891295
diff_rendered=diff_rendered,
12901296
skip_linter=skip_linter,
12911297
explain=explain,
1298+
min_intervals=min_intervals,
12921299
)
12931300

12941301
plan = plan_builder.build()
@@ -1338,6 +1345,7 @@ def plan_builder(
13381345
diff_rendered: t.Optional[bool] = None,
13391346
skip_linter: t.Optional[bool] = None,
13401347
explain: t.Optional[bool] = None,
1348+
min_intervals: t.Optional[int] = None,
13411349
) -> PlanBuilder:
13421350
"""Creates a plan builder.
13431351
@@ -1374,6 +1382,8 @@ def plan_builder(
13741382
enable_preview: Indicates whether to enable preview for forward-only models in development environments.
13751383
run: Whether to run latest intervals as part of the plan application.
13761384
diff_rendered: Whether the diff should compare raw vs rendered models
1385+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
1386+
on every model when checking for missing intervals
13771387
13781388
Returns:
13791389
The plan builder.
@@ -1401,6 +1411,7 @@ def plan_builder(
14011411
"run": run,
14021412
"diff_rendered": diff_rendered,
14031413
"skip_linter": skip_linter,
1414+
"min_intervals": min_intervals,
14041415
}
14051416
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
14061417
k: v for k, v in kwargs.items() if v is not None
@@ -1523,6 +1534,15 @@ def plan_builder(
15231534
# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
15241535
self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values())
15251536

1537+
start_override_per_model = self._calculate_start_override_per_model(
1538+
min_intervals,
1539+
start or default_start,
1540+
end or default_end,
1541+
execution_time or now(),
1542+
backfill_models,
1543+
snapshots,
1544+
)
1545+
15261546
return self.PLAN_BUILDER_TYPE(
15271547
context_diff=context_diff,
15281548
start=start,
@@ -1553,6 +1573,7 @@ def plan_builder(
15531573
),
15541574
end_bounded=not run,
15551575
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1576+
start_override_per_model=start_override_per_model,
15561577
interval_end_per_model=max_interval_end_per_model,
15571578
console=self.console,
15581579
user_provided_flags=user_provided_flags,
@@ -2864,6 +2885,58 @@ def _get_plan_default_start_end(
28642885

28652886
return default_start, default_end
28662887

2888+
def _calculate_start_override_per_model(
2889+
self,
2890+
min_intervals: t.Optional[int],
2891+
plan_start: t.Optional[TimeLike],
2892+
plan_end: t.Optional[TimeLike],
2893+
plan_execution_time: TimeLike,
2894+
backfill_model_fqns: t.Optional[t.Set[str]],
2895+
snapshots_by_model_fqn: t.Dict[str, Snapshot],
2896+
) -> t.Dict[str, datetime]:
2897+
if not min_intervals or not backfill_model_fqns or not plan_start:
2898+
# If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number
2899+
# If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number
2900+
# If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis
2901+
return {}
2902+
2903+
start_overrides = {}
2904+
2905+
plan_execution_time_dt = to_datetime(plan_execution_time)
2906+
plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt)
2907+
plan_end_dt = to_datetime(
2908+
plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt
2909+
)
2910+
2911+
for model_fqn in backfill_model_fqns:
2912+
snapshot = snapshots_by_model_fqn.get(model_fqn)
2913+
if not snapshot:
2914+
continue
2915+
2916+
starting_point = plan_end_dt
2917+
if node_end := snapshot.node.end:
2918+
# if we dont do this, if the node end is a date (as opposed to a timestamp)
2919+
# we end up incorrectly winding back an extra day
2920+
node_end_dt = make_exclusive(node_end)
2921+
2922+
if node_end_dt < plan_end_dt:
2923+
# if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals
2924+
# instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid
2925+
starting_point = node_end_dt
2926+
2927+
snapshot_start = snapshot.node.cron_floor(starting_point)
2928+
2929+
for _ in range(min_intervals):
2930+
# wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date
2931+
snapshot_start = snapshot.node.cron_prev(snapshot_start)
2932+
2933+
# only consider this an override if the wound-back start date is earlier than the plan start date
2934+
# if it isnt then the plan already covers :min_intervals intervals for this snapshot
2935+
if snapshot_start < plan_start_dt:
2936+
start_overrides[model_fqn] = snapshot_start
2937+
2938+
return start_overrides
2939+
28672940
def _get_max_interval_end_per_model(
28682941
self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]]
28692942
) -> t.Dict[str, int]:

sqlmesh/core/node.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class IntervalUnit(str, Enum):
3131
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
3232
based on the cron schedule of a node. The minimum time delta between a sample set of dates
3333
is used to determine which unit a node's schedule is.
34+
35+
It's designed to align with common partitioning schemes, hence why there is no WEEK unit
36+
because generally tables are not partitioned by week
3437
"""
3538

3639
YEAR = "year"

sqlmesh/core/plan/builder.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import typing as t
66
from collections import defaultdict
77
from functools import cached_property
8+
from datetime import datetime
89

910

1011
from sqlmesh.core.console import PlanBuilderConsole, get_console
@@ -85,6 +86,7 @@ class PlanBuilder:
8586
ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
8687
environment state, or to use whatever snapshots are in the current environment state even if
8788
the environment is not finalized.
89+
start_override_per_model: The mapping from model FQN's to model-specific start dates
8890
interval_end_per_model: The mapping from model FQNs to target end dates.
8991
explain: Whether to explain the plan instead of applying it.
9092
"""
@@ -117,6 +119,7 @@ def __init__(
117119
end_bounded: bool = False,
118120
ensure_finalized_snapshots: bool = False,
119121
explain: bool = False,
122+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
120123
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
121124
console: t.Optional[PlanBuilderConsole] = None,
122125
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
@@ -133,6 +136,7 @@ def __init__(
133136
self._enable_preview = enable_preview
134137
self._end_bounded = end_bounded
135138
self._ensure_finalized_snapshots = ensure_finalized_snapshots
139+
self._start_override_per_model = start_override_per_model
136140
self._interval_end_per_model = interval_end_per_model
137141
self._environment_ttl = environment_ttl
138142
self._categorizer_config = categorizer_config or CategorizerConfig()
@@ -280,7 +284,11 @@ def build(self) -> Plan:
280284
self._adjust_new_snapshot_intervals()
281285

282286
deployability_index = (
283-
DeployabilityIndex.create(self._context_diff.snapshots.values(), start=self._start)
287+
DeployabilityIndex.create(
288+
self._context_diff.snapshots.values(),
289+
start=self._start,
290+
start_override_per_model=self._start_override_per_model,
291+
)
284292
if self._is_dev
285293
else DeployabilityIndex.all_deployable()
286294
)
@@ -322,6 +330,7 @@ def build(self) -> Plan:
322330
indirectly_modified=indirectly_modified,
323331
deployability_index=deployability_index,
324332
restatements=restatements,
333+
start_override_per_model=self._start_override_per_model,
325334
interval_end_per_model=interval_end_per_model,
326335
selected_models_to_backfill=self._backfill_models,
327336
models_to_backfill=models_to_backfill,

sqlmesh/core/plan/definition.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Plan(PydanticModel, frozen=True):
5757

5858
deployability_index: DeployabilityIndex
5959
restatements: t.Dict[SnapshotId, Interval]
60+
start_override_per_model: t.Optional[t.Dict[str, datetime]]
6061
interval_end_per_model: t.Optional[t.Dict[str, int]]
6162

6263
selected_models_to_backfill: t.Optional[t.Set[str]] = None
@@ -177,6 +178,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
177178
execution_time=self.execution_time,
178179
restatements=self.restatements,
179180
deployability_index=self.deployability_index,
181+
start_override_per_model=self.start_override_per_model,
180182
interval_end_per_model=self.interval_end_per_model,
181183
end_bounded=self.end_bounded,
182184
).items()
@@ -265,6 +267,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
265267
removed_snapshots=sorted(self.context_diff.removed_snapshots),
266268
requires_backfill=self.requires_backfill,
267269
models_to_backfill=self.models_to_backfill,
270+
start_override_per_model=self.start_override_per_model,
268271
interval_end_per_model=self.interval_end_per_model,
269272
execution_time=self.execution_time,
270273
disabled_restatement_models={
@@ -303,6 +306,7 @@ class EvaluatablePlan(PydanticModel):
303306
removed_snapshots: t.List[SnapshotId]
304307
requires_backfill: bool
305308
models_to_backfill: t.Optional[t.Set[str]] = None
309+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None
306310
interval_end_per_model: t.Optional[t.Dict[str, int]] = None
307311
execution_time: t.Optional[TimeLike] = None
308312
disabled_restatement_models: t.Set[str]

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ def _missing_intervals(
524524
},
525525
deployability_index=deployability_index,
526526
end_bounded=plan.end_bounded,
527+
start_override_per_model=plan.start_override_per_model,
527528
interval_end_per_model=plan.interval_end_per_model,
528529
)
529530

sqlmesh/core/scheduler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22
import logging
33
import typing as t
4+
from datetime import datetime
45
from sqlglot import exp
56
from sqlmesh.core import constants as c
67
from sqlmesh.core.console import Console, get_console
@@ -704,6 +705,7 @@ def merged_missing_intervals(
704705
execution_time: t.Optional[TimeLike] = None,
705706
deployability_index: t.Optional[DeployabilityIndex] = None,
706707
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
708+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
707709
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
708710
ignore_cron: bool = False,
709711
end_bounded: bool = False,
@@ -722,6 +724,8 @@ def merged_missing_intervals(
722724
execution_time: The date/time reference to use for execution time. Defaults to now.
723725
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
724726
restatements: A set of snapshot names being restated.
727+
start_override_per_model: A mapping of model FQNs to start dates, where the start date for calculating intervals
728+
should be different from the plan start date
725729
interval_end_per_model: The mapping from model FQNs to target end dates.
726730
ignore_cron: Whether to ignore the node's cron schedule.
727731
end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
@@ -737,6 +741,7 @@ def merged_missing_intervals(
737741
deployability_index=deployability_index,
738742
execution_time=execution_time or now_timestamp(),
739743
restatements=restatements,
744+
start_override_per_model=start_override_per_model,
740745
interval_end_per_model=interval_end_per_model,
741746
ignore_cron=ignore_cron,
742747
end_bounded=end_bounded,
@@ -751,6 +756,7 @@ def compute_interval_params(
751756
deployability_index: t.Optional[DeployabilityIndex] = None,
752757
execution_time: t.Optional[TimeLike] = None,
753758
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
759+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
754760
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
755761
ignore_cron: bool = False,
756762
end_bounded: bool = False,
@@ -769,6 +775,8 @@ def compute_interval_params(
769775
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
770776
execution_time: The date/time reference to use for execution time.
771777
restatements: A dict of snapshot names being restated and their intervals.
778+
start_override_per_model: A mapping of model FQNs to start dates, where the start date for calculating intervals
779+
should be different from the plan start date
772780
interval_end_per_model: The mapping from model FQNs to target end dates.
773781
ignore_cron: Whether to ignore the node's cron schedule.
774782
end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
@@ -786,6 +794,7 @@ def compute_interval_params(
786794
execution_time=execution_time,
787795
restatements=restatements,
788796
deployability_index=deployability_index,
797+
start_override_per_model=start_override_per_model,
789798
interval_end_per_model=interval_end_per_model,
790799
ignore_cron=ignore_cron,
791800
end_bounded=end_bounded,

sqlmesh/core/snapshot/definition.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,14 +1458,16 @@ def none_deployable(cls) -> DeployabilityIndex:
14581458
def create(
14591459
cls,
14601460
snapshots: t.Dict[SnapshotId, Snapshot] | t.Collection[Snapshot],
1461-
start: t.Optional[TimeLike] = None,
1461+
start: t.Optional[TimeLike] = None, # plan start
1462+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
14621463
) -> DeployabilityIndex:
14631464
if not isinstance(snapshots, dict):
14641465
snapshots = {s.snapshot_id: s for s in snapshots}
14651466

14661467
deployability_mapping: t.Dict[SnapshotId, bool] = {}
14671468
children_deployability_mapping: t.Dict[SnapshotId, bool] = {}
14681469
representative_shared_version_ids: t.Set[SnapshotId] = set()
1470+
start_override_per_model = start_override_per_model or {}
14691471

14701472
start_date_cache: t.Optional[t.Dict[str, datetime]] = {}
14711473

@@ -1488,12 +1490,12 @@ def create(
14881490
snapshot.is_model and snapshot.model.auto_restatement_cron is not None
14891491
)
14901492

1493+
snapshot_start = start_override_per_model.get(
1494+
node.name, start_date(snapshot, snapshots.values(), cache=start_date_cache)
1495+
)
1496+
14911497
is_valid_start = (
1492-
snapshot.is_valid_start(
1493-
start, start_date(snapshot, snapshots.values(), start_date_cache)
1494-
)
1495-
if start is not None
1496-
else True
1498+
snapshot.is_valid_start(start, snapshot_start) if start is not None else True
14971499
)
14981500

14991501
if (
@@ -1789,6 +1791,7 @@ def missing_intervals(
17891791
execution_time: t.Optional[TimeLike] = None,
17901792
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
17911793
deployability_index: t.Optional[DeployabilityIndex] = None,
1794+
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
17921795
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
17931796
ignore_cron: bool = False,
17941797
end_bounded: bool = False,
@@ -1806,13 +1809,16 @@ def missing_intervals(
18061809
else earliest_start_date(snapshots, cache=cache, relative_to=end_date)
18071810
)
18081811
restatements = restatements or {}
1812+
start_override_per_model = start_override_per_model or {}
18091813
interval_end_per_model = interval_end_per_model or {}
18101814
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
18111815

18121816
for snapshot in snapshots.values():
18131817
if not snapshot.evaluatable:
18141818
continue
1815-
snapshot_start_date = start_dt
1819+
1820+
snapshot_start_override = start_override_per_model.get(snapshot.name, None)
1821+
snapshot_start_date = snapshot_start_override or start_dt
18161822
snapshot_end_date: TimeLike = end_date
18171823

18181824
restated_interval = restatements.get(snapshot.snapshot_id)
@@ -1891,7 +1897,7 @@ def compute_missing_intervals(
18911897
Returns:
18921898
A list of all timestamps in this range.
18931899
"""
1894-
if start_ts == end_ts:
1900+
if start_ts >= end_ts:
18951901
return []
18961902

18971903
timestamps = expand_range(start_ts, end_ts, interval_unit)

0 commit comments

Comments
 (0)