Skip to content

Commit 237c765

Browse files
committed
Feat: Allow specifying a minimum number of intervals to include for dev plans with a relative start date
1 parent eeb18fb commit 237c765

File tree

10 files changed

+406
-17
lines changed

10 files changed

+406
-17
lines changed

examples/multi/repo_1/linter/__init__.py

Whitespace-only changes.

sqlmesh/cli/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
511511
help="Explain the plan instead of applying it.",
512512
default=None,
513513
)
514+
@click.option(
515+
"--min-intervals",
516+
default=0,
517+
help="In new environments created against a specific time range, ensure that models contain at least this many intervals",
518+
)
514519
@opt.verbose
515520
@click.pass_context
516521
@error_handler

sqlmesh/core/context.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,7 @@ def plan(
12151215
diff_rendered: t.Optional[bool] = None,
12161216
skip_linter: t.Optional[bool] = None,
12171217
explain: t.Optional[bool] = None,
1218+
min_intervals: t.Optional[int] = None,
12181219
) -> Plan:
12191220
"""Interactively creates a plan.
12201221
@@ -1261,6 +1262,8 @@ def plan(
12611262
diff_rendered: Whether the diff should compare raw vs rendered models
12621263
skip_linter: Linter runs by default so this will skip it if enabled
12631264
explain: Whether to explain the plan instead of applying it.
1265+
min_intervals: When populating dev environments with a specific time range, if no missing intervals are matched for the model,
1266+
wind back the model start date until we get at least this many missing intervals to backfill.
12641267
12651268
Returns:
12661269
The populated Plan object.
@@ -1289,6 +1292,7 @@ def plan(
12891292
diff_rendered=diff_rendered,
12901293
skip_linter=skip_linter,
12911294
explain=explain,
1295+
min_intervals=min_intervals,
12921296
)
12931297

12941298
plan = plan_builder.build()
@@ -1338,6 +1342,7 @@ def plan_builder(
13381342
diff_rendered: t.Optional[bool] = None,
13391343
skip_linter: t.Optional[bool] = None,
13401344
explain: t.Optional[bool] = None,
1345+
min_intervals: t.Optional[int] = None,
13411346
) -> PlanBuilder:
13421347
"""Creates a plan builder.
13431348
@@ -1374,6 +1379,8 @@ def plan_builder(
13741379
enable_preview: Indicates whether to enable preview for forward-only models in development environments.
13751380
run: Whether to run latest intervals as part of the plan application.
13761381
diff_rendered: Whether the diff should compare raw vs rendered models
1382+
min_intervals: When populating dev environments with a specific time range, if no missing intervals are matched for the model,
1383+
wind back the model start date until we get at least this many missing intervals to backfill.
13771384
13781385
Returns:
13791386
The plan builder.
@@ -1401,6 +1408,7 @@ def plan_builder(
14011408
"run": run,
14021409
"diff_rendered": diff_rendered,
14031410
"skip_linter": skip_linter,
1411+
"min_intervals": min_intervals,
14041412
}
14051413
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
14061414
k: v for k, v in kwargs.items() if v is not None
@@ -1557,6 +1565,7 @@ def plan_builder(
15571565
console=self.console,
15581566
user_provided_flags=user_provided_flags,
15591567
explain=explain or False,
1568+
min_intervals=min_intervals,
15601569
)
15611570

15621571
def apply(

sqlmesh/core/node.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class IntervalUnit(str, Enum):
3131
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
3232
based on the cron schedule of a node. The minimum time delta between a sample set of dates
3333
is used to determine which unit a node's schedule is.
34+
35+
It's designed to align with common partitioning schemes, hence why there is no WEEK unit
36+
because generally tables are not partitioned by week
3437
"""
3538

3639
YEAR = "year"

sqlmesh/core/plan/builder.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class PlanBuilder:
8787
the environment is not finalized.
8888
interval_end_per_model: The mapping from model FQNs to target end dates.
8989
explain: Whether to explain the plan instead of applying it.
90+
min_intervals: When populating dev environments with a specific time range, if no missing intervals are matched for the model,
91+
wind back the model start date until we get at least this many missing intervals to backfill.
92+
9093
"""
9194

9295
def __init__(
@@ -120,6 +123,7 @@ def __init__(
120123
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
121124
console: t.Optional[PlanBuilderConsole] = None,
122125
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
126+
min_intervals: t.Optional[int] = None,
123127
):
124128
self._context_diff = context_diff
125129
self._no_gaps = no_gaps
@@ -176,6 +180,7 @@ def __init__(
176180
)
177181

178182
self._latest_plan: t.Optional[Plan] = None
183+
self._min_intervals = min_intervals
179184

180185
@property
181186
def is_start_and_end_allowed(self) -> bool:
@@ -330,6 +335,7 @@ def build(self) -> Plan:
330335
end_bounded=self._end_bounded,
331336
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
332337
user_provided_flags=self._user_provided_flags,
338+
min_intervals=self._min_intervals,
333339
)
334340
self._latest_plan = plan
335341
return plan
@@ -788,6 +794,9 @@ def _ensure_valid_date_range(self) -> None:
788794
f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')"
789795
)
790796

797+
if not self._is_dev and self._min_intervals:
798+
raise PlanError(f"--min-intervals is only valid for non-production plans")
799+
791800
def _ensure_no_forward_only_revert(self) -> None:
792801
"""Ensures that a previously superseded breaking / non-breaking snapshot is not being
793802
used again to replace an existing forward-only snapshot with the same version.

sqlmesh/core/plan/definition.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class Plan(PydanticModel, frozen=True):
6868

6969
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
7070

71+
min_intervals: t.Optional[int] = None
72+
7173
@cached_property
7274
def start(self) -> TimeLike:
7375
if self.provided_start is not None:
@@ -179,6 +181,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
179181
deployability_index=self.deployability_index,
180182
interval_end_per_model=self.interval_end_per_model,
181183
end_bounded=self.end_bounded,
184+
min_intervals=self.min_intervals,
182185
).items()
183186
if snapshot.is_model and missing
184187
]

sqlmesh/core/snapshot/definition.py

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,26 +1783,40 @@ def missing_intervals(
17831783
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
17841784
ignore_cron: bool = False,
17851785
end_bounded: bool = False,
1786+
min_intervals: t.Optional[int] = None,
17861787
) -> t.Dict[Snapshot, Intervals]:
17871788
"""Returns all missing intervals given a collection of snapshots."""
17881789
if not isinstance(snapshots, dict):
17891790
# Make sure that the mapping is only constructed once
17901791
snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
17911792
missing = {}
17921793
cache: t.Dict[str, datetime] = {}
1793-
end_date = end or now_timestamp()
1794-
start_dt = (
1795-
to_datetime(start)
1796-
if start
1797-
else earliest_start_date(snapshots, cache=cache, relative_to=end_date)
1794+
execution_time_dt = (
1795+
to_datetime(execution_time) if execution_time is not None else execution_time
17981796
)
1797+
end_date = end or execution_time_dt or now_timestamp()
17991798
restatements = restatements or {}
18001799
interval_end_per_model = interval_end_per_model or {}
18011800
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
18021801

1802+
fallback_start = earliest_start_date(snapshots, cache=cache, relative_to=end_date)
1803+
18031804
for snapshot in snapshots.values():
18041805
if not snapshot.evaluatable:
18051806
continue
1807+
1808+
start_relative_base = execution_time_dt
1809+
if min_intervals and snapshot.node.end:
1810+
# this is an optimisation to reduce the number of calls to Snapshot.missing_intervals() if the model end date is in the past
1811+
# if we make the start date relative to the model end date instead of the plan end date, we can reach min_intervals missing intervals more quickly
1812+
node_end_dt = to_datetime(snapshot.node.end)
1813+
if to_datetime(end_date) > node_end_dt:
1814+
start_relative_base = node_end_dt
1815+
1816+
start_dt = (
1817+
to_datetime(start, relative_base=start_relative_base) if start else fallback_start
1818+
)
1819+
18061820
snapshot_start_date = start_dt
18071821
snapshot_end_date: TimeLike = end_date
18081822

@@ -1826,17 +1840,35 @@ def missing_intervals(
18261840
if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
18271841
missing_interval_end_date = node_end_date
18281842

1829-
intervals = snapshot.missing_intervals(
1830-
max(
1831-
to_datetime(snapshot_start_date),
1832-
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
1833-
),
1834-
missing_interval_end_date,
1835-
execution_time=execution_time,
1836-
deployability_index=deployability_index,
1837-
ignore_cron=ignore_cron,
1838-
end_bounded=end_bounded,
1839-
)
1843+
inferred_start_date = start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)
1844+
missing_interval_start_date = snapshot_start_date
1845+
1846+
def _get_missing_intervals() -> Intervals:
1847+
return snapshot.missing_intervals(
1848+
max(missing_interval_start_date, inferred_start_date),
1849+
missing_interval_end_date,
1850+
execution_time=execution_time,
1851+
deployability_index=deployability_index,
1852+
ignore_cron=ignore_cron,
1853+
end_bounded=end_bounded,
1854+
)
1855+
1856+
while True:
1857+
intervals = _get_missing_intervals()
1858+
1859+
if (
1860+
min_intervals
1861+
and len(intervals) < min_intervals
1862+
and missing_interval_start_date > inferred_start_date
1863+
):
1864+
# keep winding back the start date until either:
1865+
# - we get min_intervals intervals
1866+
# - we've wound back beyond the model start date which means there are no missing intervals
1867+
missing_interval_start_date = snapshot.node.cron_prev(missing_interval_start_date)
1868+
continue
1869+
1870+
break
1871+
18401872
if intervals:
18411873
missing[snapshot] = intervals
18421874

@@ -1882,7 +1914,7 @@ def compute_missing_intervals(
18821914
Returns:
18831915
A list of all timestamps in this range.
18841916
"""
1885-
if start_ts == end_ts:
1917+
if start_ts >= end_ts:
18861918
return []
18871919

18881920
timestamps = expand_range(start_ts, end_ts, interval_unit)

tests/core/test_context.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2094,6 +2094,7 @@ def test_plan_audit_intervals(tmp_path: pathlib.Path, caplog):
20942094
plan = ctx.plan(
20952095
environment="dev", auto_apply=True, no_prompts=True, start="2025-02-01", end="2025-02-01"
20962096
)
2097+
assert plan.missing_intervals
20972098

20982099
date_snapshot = next(s for s in plan.new_snapshots if "date_example" in s.name)
20992100
timestamp_snapshot = next(s for s in plan.new_snapshots if "timestamp_example" in s.name)
@@ -2304,3 +2305,136 @@ def test_dev_environment_virtual_update_with_environment_statements(tmp_path: Pa
23042305
updated_statements[0].before_all[1]
23052306
== "CREATE TABLE IF NOT EXISTS metrics (metric_name VARCHAR(50), value INT)"
23062307
)
2308+
2309+
2310+
def test_plan_min_intervals(tmp_path: Path):
2311+
init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb")
2312+
2313+
context = Context(
2314+
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2315+
)
2316+
2317+
current_time = to_datetime("2020-02-01 00:00:01")
2318+
2319+
# initial state of example project
2320+
context.plan(auto_apply=True, execution_time=current_time)
2321+
2322+
(tmp_path / "models" / "daily_model.sql").write_text("""
2323+
MODEL (
2324+
name sqlmesh_example.daily_model,
2325+
kind INCREMENTAL_BY_TIME_RANGE (
2326+
time_column event_date
2327+
),
2328+
start '2020-01-01',
2329+
cron '@daily'
2330+
);
2331+
2332+
select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds
2333+
""")
2334+
2335+
(tmp_path / "models" / "weekly_model.sql").write_text("""
2336+
MODEL (
2337+
name sqlmesh_example.weekly_model,
2338+
kind INCREMENTAL_BY_TIME_RANGE (
2339+
time_column event_date
2340+
),
2341+
start '2020-01-01',
2342+
cron '@weekly'
2343+
);
2344+
2345+
select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds
2346+
""")
2347+
2348+
(tmp_path / "models" / "monthly_model.sql").write_text("""
2349+
MODEL (
2350+
name sqlmesh_example.monthly_model,
2351+
kind INCREMENTAL_BY_TIME_RANGE (
2352+
time_column event_date
2353+
),
2354+
start '2020-01-01',
2355+
cron '@monthly'
2356+
);
2357+
2358+
select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds
2359+
""")
2360+
2361+
context.load()
2362+
2363+
# initial state - backfill from 2020-01-01 -> now() (2020-01-02 00:00:01) on new models
2364+
plan = context.plan(execution_time=current_time)
2365+
2366+
assert to_datetime(plan.start) == to_datetime("2020-01-01 00:00:00")
2367+
assert to_datetime(plan.end) == to_datetime("2020-02-01 00:00:00")
2368+
assert to_datetime(plan.execution_time) == to_datetime("2020-02-01 00:00:01")
2369+
2370+
# check initial intervals - should be full time range between start and execution time
2371+
assert len(plan.missing_intervals) == 3
2372+
assert (
2373+
plan.missing_intervals[0].snapshot_id
2374+
== context.get_snapshot("sqlmesh_example.daily_model", raise_if_missing=True).snapshot_id
2375+
)
2376+
assert [
2377+
(to_datetime(s), to_datetime(e)) for s, e in plan.missing_intervals[0].merged_intervals
2378+
] == [(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))]
2379+
assert (
2380+
plan.missing_intervals[1].snapshot_id
2381+
== context.get_snapshot("sqlmesh_example.monthly_model", raise_if_missing=True).snapshot_id
2382+
)
2383+
assert [
2384+
(to_datetime(s), to_datetime(e)) for s, e in plan.missing_intervals[1].merged_intervals
2385+
] == [(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))]
2386+
assert (
2387+
plan.missing_intervals[2].snapshot_id
2388+
== context.get_snapshot("sqlmesh_example.weekly_model", raise_if_missing=True).snapshot_id
2389+
)
2390+
assert [
2391+
(to_datetime(s), to_datetime(e)) for s, e in plan.missing_intervals[2].merged_intervals
2392+
] == [
2393+
(
2394+
to_datetime("2020-01-01 00:00:00"),
2395+
to_datetime("2020-01-26 00:00:00"),
2396+
) # last week in 2020-01 hasnt fully elapsed yet
2397+
]
2398+
2399+
# now, create a dev env for "1 day ago"
2400+
plan = context.plan(
2401+
environment="pr_env",
2402+
start="1 day ago",
2403+
execution_time=current_time,
2404+
min_intervals=1,
2405+
)
2406+
2407+
# this should pick up last day for daily model, last week for weekly model and last month for the monthly model
2408+
assert len(plan.missing_intervals) == 3
2409+
2410+
assert (
2411+
plan.missing_intervals[0].snapshot_id
2412+
== context.get_snapshot("sqlmesh_example.daily_model", raise_if_missing=True).snapshot_id
2413+
)
2414+
assert [
2415+
(to_datetime(s), to_datetime(e)) for s, e in plan.missing_intervals[0].merged_intervals
2416+
] == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00"))]
2417+
assert (
2418+
plan.missing_intervals[1].snapshot_id
2419+
== context.get_snapshot("sqlmesh_example.monthly_model", raise_if_missing=True).snapshot_id
2420+
)
2421+
assert [
2422+
(to_datetime(s), to_datetime(e)) for s, e in plan.missing_intervals[1].merged_intervals
2423+
] == [
2424+
(
2425+
to_datetime("2020-01-01 00:00:00"), # last completed month
2426+
to_datetime("2020-02-01 00:00:00"),
2427+
)
2428+
]
2429+
assert (
2430+
plan.missing_intervals[2].snapshot_id
2431+
== context.get_snapshot("sqlmesh_example.weekly_model", raise_if_missing=True).snapshot_id
2432+
)
2433+
assert [
2434+
(to_datetime(s), to_datetime(e)) for s, e in plan.missing_intervals[2].merged_intervals
2435+
] == [
2436+
(
2437+
to_datetime("2020-01-19 00:00:00"), # last completed week
2438+
to_datetime("2020-01-26 00:00:00"),
2439+
)
2440+
]

0 commit comments

Comments
 (0)