Skip to content

Commit 718ea0f

Browse files
committed
Feat: Allow specifying a minimum number of missing intervals to check
1 parent 2006664 commit 718ea0f

File tree

10 files changed

+354
-8
lines changed

10 files changed

+354
-8
lines changed

examples/multi/repo_1/linter/__init__.py

Whitespace-only changes.

sqlmesh/cli/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
513513
help="Explain the plan instead of applying it.",
514514
default=None,
515515
)
516+
@click.option(
517+
"--min-intervals",
518+
default=0,
519+
help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date",
520+
)
516521
@opt.verbose
517522
@click.pass_context
518523
@error_handler

sqlmesh/core/context.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,7 @@ def plan(
12151215
diff_rendered: t.Optional[bool] = None,
12161216
skip_linter: t.Optional[bool] = None,
12171217
explain: t.Optional[bool] = None,
1218+
min_intervals: t.Optional[int] = None,
12181219
) -> Plan:
12191220
"""Interactively creates a plan.
12201221
@@ -1261,6 +1262,8 @@ def plan(
12611262
diff_rendered: Whether the diff should compare raw vs rendered models
12621263
skip_linter: Linter runs by default so this will skip it if enabled
12631264
explain: Whether to explain the plan instead of applying it.
1265+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
1266+
on every model when checking for missing intervals
12641267
12651268
Returns:
12661269
The populated Plan object.
@@ -1289,6 +1292,7 @@ def plan(
12891292
diff_rendered=diff_rendered,
12901293
skip_linter=skip_linter,
12911294
explain=explain,
1295+
min_intervals=min_intervals,
12921296
)
12931297

12941298
plan = plan_builder.build()
@@ -1338,6 +1342,7 @@ def plan_builder(
13381342
diff_rendered: t.Optional[bool] = None,
13391343
skip_linter: t.Optional[bool] = None,
13401344
explain: t.Optional[bool] = None,
1345+
min_intervals: t.Optional[int] = None,
13411346
) -> PlanBuilder:
13421347
"""Creates a plan builder.
13431348
@@ -1374,6 +1379,8 @@ def plan_builder(
13741379
enable_preview: Indicates whether to enable preview for forward-only models in development environments.
13751380
run: Whether to run latest intervals as part of the plan application.
13761381
diff_rendered: Whether the diff should compare raw vs rendered models
1382+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
1383+
on every model when checking for missing intervals
13771384
13781385
Returns:
13791386
The plan builder.
@@ -1401,6 +1408,7 @@ def plan_builder(
14011408
"run": run,
14021409
"diff_rendered": diff_rendered,
14031410
"skip_linter": skip_linter,
1411+
"min_intervals": min_intervals,
14041412
}
14051413
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
14061414
k: v for k, v in kwargs.items() if v is not None
@@ -1557,6 +1565,7 @@ def plan_builder(
15571565
console=self.console,
15581566
user_provided_flags=user_provided_flags,
15591567
explain=explain or False,
1568+
min_intervals=min_intervals,
15601569
)
15611570

15621571
def apply(

sqlmesh/core/node.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class IntervalUnit(str, Enum):
3131
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
3232
based on the cron schedule of a node. The minimum time delta between a sample set of dates
3333
is used to determine which unit a node's schedule is.
34+
35+
It's designed to align with common partitioning schemes, hence why there is no WEEK unit
36+
because generally tables are not partitioned by week
3437
"""
3538

3639
YEAR = "year"

sqlmesh/core/plan/builder.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class PlanBuilder:
8787
the environment is not finalized.
8888
interval_end_per_model: The mapping from model FQNs to target end dates.
8989
explain: Whether to explain the plan instead of applying it.
90+
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
91+
on every model when checking for missing intervals
9092
"""
9193

9294
def __init__(
@@ -120,6 +122,7 @@ def __init__(
120122
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
121123
console: t.Optional[PlanBuilderConsole] = None,
122124
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
125+
min_intervals: t.Optional[int] = None,
123126
):
124127
self._context_diff = context_diff
125128
self._no_gaps = no_gaps
@@ -176,6 +179,7 @@ def __init__(
176179
)
177180

178181
self._latest_plan: t.Optional[Plan] = None
182+
self._min_intervals = min_intervals
179183

180184
@property
181185
def is_start_and_end_allowed(self) -> bool:
@@ -330,6 +334,7 @@ def build(self) -> Plan:
330334
end_bounded=self._end_bounded,
331335
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
332336
user_provided_flags=self._user_provided_flags,
337+
min_intervals=self._min_intervals,
333338
)
334339
self._latest_plan = plan
335340
return plan

sqlmesh/core/plan/definition.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class Plan(PydanticModel, frozen=True):
5858
deployability_index: DeployabilityIndex
5959
restatements: t.Dict[SnapshotId, Interval]
6060
interval_end_per_model: t.Optional[t.Dict[str, int]]
61+
min_intervals: t.Optional[int] = None
6162

6263
selected_models_to_backfill: t.Optional[t.Set[str]] = None
6364
"""Models that have been explicitly selected for backfill by a user."""
@@ -179,6 +180,7 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
179180
deployability_index=self.deployability_index,
180181
interval_end_per_model=self.interval_end_per_model,
181182
end_bounded=self.end_bounded,
183+
min_intervals=self.min_intervals,
182184
).items()
183185
if snapshot.is_model and missing
184186
]

sqlmesh/core/snapshot/definition.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1790,6 +1790,7 @@ def missing_intervals(
17901790
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
17911791
ignore_cron: bool = False,
17921792
end_bounded: bool = False,
1793+
min_intervals: t.Optional[int] = None,
17931794
) -> t.Dict[Snapshot, Intervals]:
17941795
"""Returns all missing intervals given a collection of snapshots."""
17951796
if not isinstance(snapshots, dict):
@@ -1798,11 +1799,12 @@ def missing_intervals(
17981799
missing = {}
17991800
cache: t.Dict[str, datetime] = {}
18001801
end_date = end or now_timestamp()
1801-
start_dt = (
1802-
to_datetime(start)
1803-
if start
1804-
else earliest_start_date(snapshots, cache=cache, relative_to=end_date)
1802+
execution_time_dt = (
1803+
to_datetime(execution_time) if execution_time is not None else execution_time
18051804
)
1805+
fallback_start_dt = earliest_start_date(snapshots, cache=cache, relative_to=end_date)
1806+
start_dt = to_datetime(start, relative_base=execution_time_dt) if start else fallback_start_dt
1807+
end_date = end or execution_time_dt or now_timestamp()
18061808
restatements = restatements or {}
18071809
interval_end_per_model = interval_end_per_model or {}
18081810
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
@@ -1833,10 +1835,24 @@ def missing_intervals(
18331835
if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
18341836
missing_interval_end_date = node_end_date
18351837

1838+
missing_interval_start_date = snapshot_start_date
1839+
if min_intervals:
1840+
# we need to ensure that the time range submitted to snapshot.missing_intervals covers at least :min_intervals intervals.
1841+
# To do that, we can wind back the model cron :min_interval intervals and check if the start date we were going to send
1842+
# is early enough to cover those intervals. If it isnt, we widen it, otherwise we keep it as-is
1843+
min_interval_start = snapshot.node.cron_floor(missing_interval_end_date)
1844+
for _ in range(min_intervals):
1845+
min_interval_start = snapshot.node.cron_prev(min_interval_start)
1846+
1847+
if min_interval_start < snapshot_start_date:
1848+
missing_interval_start_date = min_interval_start
1849+
1850+
inferred_start_date = start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)
1851+
18361852
intervals = snapshot.missing_intervals(
18371853
max(
1838-
to_datetime(snapshot_start_date),
1839-
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
1854+
missing_interval_start_date,
1855+
inferred_start_date,
18401856
),
18411857
missing_interval_end_date,
18421858
execution_time=execution_time,
@@ -1889,7 +1905,7 @@ def compute_missing_intervals(
18891905
Returns:
18901906
A list of all timestamps in this range.
18911907
"""
1892-
if start_ts == end_ts:
1908+
if start_ts >= end_ts:
18931909
return []
18941910

18951911
timestamps = expand_range(start_ts, end_ts, interval_unit)

tests/core/test_context.py

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import pathlib
33
import typing as t
44
import re
5-
from datetime import date, timedelta
5+
from datetime import date, timedelta, datetime
66
from tempfile import TemporaryDirectory
77
from unittest.mock import PropertyMock, call, patch
88

@@ -36,6 +36,7 @@
3636
from sqlmesh.core.dialect import parse, schema_
3737
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
3838
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements
39+
from sqlmesh.core.plan.definition import Plan
3940
from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
4041
from sqlmesh.core.model import load_sql_based_model, model, SqlModel, Model
4142
from sqlmesh.core.model.cache import OptimizedQueryCache
@@ -2094,6 +2095,7 @@ def test_plan_audit_intervals(tmp_path: pathlib.Path, caplog):
20942095
plan = ctx.plan(
20952096
environment="dev", auto_apply=True, no_prompts=True, start="2025-02-01", end="2025-02-01"
20962097
)
2098+
assert plan.missing_intervals
20972099

20982100
date_snapshot = next(s for s in plan.new_snapshots if "date_example" in s.name)
20992101
timestamp_snapshot = next(s for s in plan.new_snapshots if "timestamp_example" in s.name)
@@ -2304,3 +2306,114 @@ def test_dev_environment_virtual_update_with_environment_statements(tmp_path: Pa
23042306
updated_statements[0].before_all[1]
23052307
== "CREATE TABLE IF NOT EXISTS metrics (metric_name VARCHAR(50), value INT)"
23062308
)
2309+
2310+
2311+
def test_plan_min_intervals(tmp_path: Path):
2312+
init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb")
2313+
2314+
context = Context(
2315+
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2316+
)
2317+
2318+
current_time = to_datetime("2020-02-01 00:00:01")
2319+
2320+
# initial state of example project
2321+
context.plan(auto_apply=True, execution_time=current_time)
2322+
2323+
(tmp_path / "models" / "daily_model.sql").write_text("""
2324+
MODEL (
2325+
name sqlmesh_example.daily_model,
2326+
kind INCREMENTAL_BY_TIME_RANGE (
2327+
time_column event_date
2328+
),
2329+
start '2020-01-01',
2330+
cron '@daily'
2331+
);
2332+
2333+
select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds
2334+
""")
2335+
2336+
(tmp_path / "models" / "weekly_model.sql").write_text("""
2337+
MODEL (
2338+
name sqlmesh_example.weekly_model,
2339+
kind INCREMENTAL_BY_TIME_RANGE (
2340+
time_column event_date
2341+
),
2342+
start '2020-01-01',
2343+
cron '@weekly'
2344+
);
2345+
2346+
select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds
2347+
""")
2348+
2349+
(tmp_path / "models" / "monthly_model.sql").write_text("""
2350+
MODEL (
2351+
name sqlmesh_example.monthly_model,
2352+
kind INCREMENTAL_BY_TIME_RANGE (
2353+
time_column event_date
2354+
),
2355+
start '2020-01-01',
2356+
cron '@monthly'
2357+
);
2358+
2359+
select * from sqlmesh_example.incremental_model where event_date between @start_ds and @end_ds
2360+
""")
2361+
2362+
context.load()
2363+
2364+
# initial state - backfill from 2020-01-01 -> now() (2020-01-02 00:00:01) on new models
2365+
plan = context.plan(execution_time=current_time)
2366+
2367+
assert to_datetime(plan.start) == to_datetime("2020-01-01 00:00:00")
2368+
assert to_datetime(plan.end) == to_datetime("2020-02-01 00:00:00")
2369+
assert to_datetime(plan.execution_time) == to_datetime("2020-02-01 00:00:01")
2370+
2371+
def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, datetime]]:
2372+
snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id
2373+
snapshot_intervals = next(
2374+
si for si in plan.missing_intervals if si.snapshot_id == snapshot_id
2375+
)
2376+
return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals]
2377+
2378+
# check initial intervals - should be full time range between start and execution time
2379+
assert len(plan.missing_intervals) == 3
2380+
2381+
assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [
2382+
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2383+
]
2384+
assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [
2385+
(
2386+
to_datetime("2020-01-01 00:00:00"),
2387+
to_datetime("2020-01-26 00:00:00"),
2388+
) # last week in 2020-01 hasnt fully elapsed yet
2389+
]
2390+
assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [
2391+
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2392+
]
2393+
2394+
# now, create a dev env for "1 day ago"
2395+
plan = context.plan(
2396+
environment="pr_env",
2397+
start="1 day ago",
2398+
execution_time=current_time,
2399+
min_intervals=1,
2400+
)
2401+
2402+
# this should pick up last day for daily model, last week for weekly model and last month for the monthly model
2403+
assert len(plan.missing_intervals) == 3
2404+
2405+
assert _get_missing_intervals(plan, "sqlmesh_example.daily_model") == [
2406+
(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2407+
]
2408+
assert _get_missing_intervals(plan, "sqlmesh_example.weekly_model") == [
2409+
(
2410+
to_datetime("2020-01-19 00:00:00"), # last completed week
2411+
to_datetime("2020-01-26 00:00:00"),
2412+
)
2413+
]
2414+
assert _get_missing_intervals(plan, "sqlmesh_example.monthly_model") == [
2415+
(
2416+
to_datetime("2020-01-01 00:00:00"), # last completed month
2417+
to_datetime("2020-02-01 00:00:00"),
2418+
)
2419+
]

tests/core/test_plan.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3289,3 +3289,34 @@ def test_environment_statements_change_allows_dev_environment_creation(make_snap
32893289
assert plan is not None
32903290
assert plan.context_diff.has_environment_statements_changes
32913291
assert plan.context_diff.environment_statements == environment_statements
3292+
3293+
3294+
def test_min_intervals(make_snapshot):
3295+
snapshot_a = make_snapshot(
3296+
SqlModel(name="a", query=parse_one("select 1, ds"), dialect="duckdb")
3297+
)
3298+
3299+
context_diff = ContextDiff(
3300+
environment="test_environment",
3301+
is_new_environment=True,
3302+
is_unfinalized_environment=False,
3303+
normalize_environment_name=True,
3304+
create_from="prod",
3305+
create_from_env_exists=True,
3306+
added={snapshot_a.snapshot_id},
3307+
removed_snapshots={},
3308+
modified_snapshots={},
3309+
snapshots={},
3310+
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
3311+
previous_plan_id=None,
3312+
previously_promoted_snapshot_ids=set(),
3313+
previous_finalized_snapshots=None,
3314+
previous_gateway_managed_virtual_layer=False,
3315+
gateway_managed_virtual_layer=False,
3316+
)
3317+
3318+
prod_plan = PlanBuilder(context_diff, min_intervals=1).build()
3319+
dev_plan = PlanBuilder(context_diff, start="1 day ago", is_dev=True, min_intervals=1).build()
3320+
3321+
assert prod_plan.min_intervals == 1
3322+
assert dev_plan.min_intervals == 1

0 commit comments

Comments
 (0)