Skip to content

Feat: Allow specifying a minimum number of intervals to include for each model in a plan #4780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
5 changes: 5 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
help="Explain the plan instead of applying it.",
default=None,
)
@click.option(
"--min-intervals",
default=0,
help="For every model, ensure at least this many intervals are covered by a missing intervals check regardless of the plan start date",
)
@opt.verbose
@click.pass_context
@error_handler
Expand Down
4 changes: 2 additions & 2 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -2073,8 +2073,8 @@ def _prompt_backfill(
if not plan_builder.override_end:
if plan.provided_end:
blank_meaning = f"'{time_like_to_str(plan.provided_end)}'"
elif plan.interval_end_per_model:
max_end = max(plan.interval_end_per_model.values())
elif plan.end_override_per_model:
max_end = max(plan.end_override_per_model.values())
blank_meaning = f"'{time_like_to_str(max_end)}'"
else:
blank_meaning = "now"
Expand Down
122 changes: 113 additions & 9 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from pathlib import Path
from shutil import rmtree
from types import MappingProxyType
from datetime import datetime

from sqlglot import Dialect, exp
from sqlglot.helper import first
Expand Down Expand Up @@ -125,6 +126,8 @@
format_tz_datetime,
now_timestamp,
now,
to_datetime,
make_exclusive,
)
from sqlmesh.utils.errors import (
CircuitBreakerError,
Expand Down Expand Up @@ -1222,6 +1225,7 @@ def plan(
diff_rendered: t.Optional[bool] = None,
skip_linter: t.Optional[bool] = None,
explain: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
) -> Plan:
"""Interactively creates a plan.

Expand Down Expand Up @@ -1268,6 +1272,8 @@ def plan(
diff_rendered: Whether the diff should compare raw vs rendered models
skip_linter: Linter runs by default so this will skip it if enabled
explain: Whether to explain the plan instead of applying it.
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
on every model when checking for missing intervals

Returns:
The populated Plan object.
Expand Down Expand Up @@ -1296,6 +1302,7 @@ def plan(
diff_rendered=diff_rendered,
skip_linter=skip_linter,
explain=explain,
min_intervals=min_intervals,
)

plan = plan_builder.build()
Expand Down Expand Up @@ -1345,6 +1352,7 @@ def plan_builder(
diff_rendered: t.Optional[bool] = None,
skip_linter: t.Optional[bool] = None,
explain: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
) -> PlanBuilder:
"""Creates a plan builder.

Expand Down Expand Up @@ -1381,6 +1389,8 @@ def plan_builder(
enable_preview: Indicates whether to enable preview for forward-only models in development environments.
run: Whether to run latest intervals as part of the plan application.
diff_rendered: Whether the diff should compare raw vs rendered models
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
on every model when checking for missing intervals

Returns:
The plan builder.
Expand Down Expand Up @@ -1408,6 +1418,7 @@ def plan_builder(
"run": run,
"diff_rendered": diff_rendered,
"skip_linter": skip_linter,
"min_intervals": min_intervals,
}
user_provided_flags: t.Dict[str, UserProvidedFlags] = {
k: v for k, v in kwargs.items() if v is not None
Expand Down Expand Up @@ -1530,6 +1541,16 @@ def plan_builder(
# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model.
self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values())

start_override_per_model = self._calculate_start_override_per_model(
min_intervals,
start or default_start,
end or default_end,
execution_time or now(),
backfill_models,
snapshots,
max_interval_end_per_model,
)

return self.PLAN_BUILDER_TYPE(
context_diff=context_diff,
start=start,
Expand Down Expand Up @@ -1560,7 +1581,8 @@ def plan_builder(
),
end_bounded=not run,
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
interval_end_per_model=max_interval_end_per_model,
start_override_per_model=start_override_per_model,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the name be consistent with interval_end_per_model? I don't care which one is it, but I feel like they represent similar thing and should be named similarly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed interval_end_per_model to end_override_per_model on the Plan to match

end_override_per_model=max_interval_end_per_model,
console=self.console,
user_provided_flags=user_provided_flags,
explain=explain or False,
Expand Down Expand Up @@ -2850,15 +2872,15 @@ def _plan_preview_enabled(self) -> bool:
def _get_plan_default_start_end(
self,
snapshots: t.Dict[str, Snapshot],
max_interval_end_per_model: t.Dict[str, int],
max_interval_end_per_model: t.Dict[str, datetime],
backfill_models: t.Optional[t.Set[str]],
modified_model_names: t.Set[str],
execution_time: t.Optional[TimeLike] = None,
) -> t.Tuple[t.Optional[int], t.Optional[int]]:
if not max_interval_end_per_model:
return None, None

default_end = max(max_interval_end_per_model.values())
default_end = to_timestamp(max(max_interval_end_per_model.values()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why did we change this to datetime only to convert back to timestamp later?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rome wasnt built in a day and the rest of the code in that method was ints.

One day I hope we will use proper types internally and push TimeLike and co back to the edges / user input handling only

default_start: t.Optional[int] = None
# Infer the default start by finding the smallest interval start that corresponds to the default end.
for model_name in backfill_models or modified_model_names or max_interval_end_per_model:
Expand Down Expand Up @@ -2887,19 +2909,101 @@ def _get_plan_default_start_end(

return default_start, default_end

def _calculate_start_override_per_model(
self,
min_intervals: t.Optional[int],
plan_start: t.Optional[TimeLike],
plan_end: t.Optional[TimeLike],
plan_execution_time: TimeLike,
backfill_model_fqns: t.Optional[t.Set[str]],
snapshots_by_model_fqn: t.Dict[str, Snapshot],
end_override_per_model: t.Optional[t.Dict[str, datetime]],
) -> t.Dict[str, datetime]:
if not min_intervals or not backfill_model_fqns or not plan_start:
# If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number
# If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number
# If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis
return {}

start_overrides: t.Dict[str, datetime] = {}
end_override_per_model = end_override_per_model or {}

plan_execution_time_dt = to_datetime(plan_execution_time)
plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt)
plan_end_dt = to_datetime(
plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt
)

# we need to take the DAG into account so that parent models can be expanded to cover at least as much as their children
# for example, A(hourly) <- B(daily)
# if min_intervals=1, A would have 1 hour and B would have 1 day
# but B depends on A so in order for B to have 1 valid day, A needs to be expanded to 24 hours
backfill_dag: DAG[str] = DAG()
for fqn in backfill_model_fqns:
backfill_dag.add(
fqn,
[
p.name
for p in snapshots_by_model_fqn[fqn].parents
if p.name in backfill_model_fqns
],
)

# start from the leaf nodes and work back towards the root because the min_start at the root node is determined by the calculated starts in the leaf nodes
reversed_dag = backfill_dag.reversed
graph = reversed_dag.graph

for model_fqn in reversed_dag:
# Get the earliest start from all immediate children of this snapshot
# this works because topological ordering guarantees that they've already been visited
# and we always set a start override
min_child_start = min(
[start_overrides[immediate_child_fqn] for immediate_child_fqn in graph[model_fqn]],
default=plan_start_dt,
)

snapshot = snapshots_by_model_fqn.get(model_fqn)

if not snapshot:
continue

starting_point = end_override_per_model.get(model_fqn, plan_end_dt)
if node_end := snapshot.node.end:
# if we dont do this, if the node end is a *date* (as opposed to a timestamp)
# we end up incorrectly winding back an extra day
node_end_dt = make_exclusive(node_end)

if node_end_dt < plan_end_dt:
# if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals
# instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid
starting_point = node_end_dt

snapshot_start = snapshot.node.cron_floor(starting_point)

for _ in range(min_intervals):
# wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date
snapshot_start = snapshot.node.cron_prev(snapshot_start)

start_overrides[model_fqn] = min(min_child_start, snapshot_start)

return start_overrides

def _get_max_interval_end_per_model(
self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]]
) -> t.Dict[str, int]:
) -> t.Dict[str, datetime]:
models_for_interval_end = (
self._get_models_for_interval_end(snapshots, backfill_models)
if backfill_models is not None
else None
)
return self.state_sync.max_interval_end_per_model(
c.PROD,
models=models_for_interval_end,
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
)
return {
model_fqn: to_datetime(ts)
for model_fqn, ts in self.state_sync.max_interval_end_per_model(
c.PROD,
models=models_for_interval_end,
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
).items()
}

@staticmethod
def _get_models_for_interval_end(
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class IntervalUnit(str, Enum):
IntervalUnit can be one of 5 types, YEAR, MONTH, DAY, HOUR, MINUTE. The unit is inferred
based on the cron schedule of a node. The minimum time delta between a sample set of dates
is used to determine which unit a node's schedule is.

It's designed to align with common partitioning schemes, hence why there is no WEEK unit
because generally tables are not partitioned by week
"""

YEAR = "year"
Expand Down
25 changes: 17 additions & 8 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing as t
from collections import defaultdict
from functools import cached_property
from datetime import datetime


from sqlmesh.core.console import PlanBuilderConsole, get_console
Expand Down Expand Up @@ -85,7 +86,8 @@ class PlanBuilder:
ensure_finalized_snapshots: Whether to compare against snapshots from the latest finalized
environment state, or to use whatever snapshots are in the current environment state even if
the environment is not finalized.
interval_end_per_model: The mapping from model FQNs to target end dates.
start_override_per_model: A mapping of model FQNs to target start dates.
end_override_per_model: A mapping of model FQNs to target end dates.
explain: Whether to explain the plan instead of applying it.
"""

Expand Down Expand Up @@ -117,7 +119,8 @@ def __init__(
end_bounded: bool = False,
ensure_finalized_snapshots: bool = False,
explain: bool = False,
interval_end_per_model: t.Optional[t.Dict[str, int]] = None,
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
console: t.Optional[PlanBuilderConsole] = None,
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
):
Expand All @@ -133,7 +136,8 @@ def __init__(
self._enable_preview = enable_preview
self._end_bounded = end_bounded
self._ensure_finalized_snapshots = ensure_finalized_snapshots
self._interval_end_per_model = interval_end_per_model
self._start_override_per_model = start_override_per_model
self._end_override_per_model = end_override_per_model
self._environment_ttl = environment_ttl
self._categorizer_config = categorizer_config or CategorizerConfig()
self._auto_categorization_enabled = auto_categorization_enabled
Expand Down Expand Up @@ -280,7 +284,11 @@ def build(self) -> Plan:
self._adjust_new_snapshot_intervals()

deployability_index = (
DeployabilityIndex.create(self._context_diff.snapshots.values(), start=self._start)
DeployabilityIndex.create(
self._context_diff.snapshots.values(),
start=self._start,
start_override_per_model=self._start_override_per_model,
)
if self._is_dev
else DeployabilityIndex.all_deployable()
)
Expand All @@ -291,11 +299,11 @@ def build(self) -> Plan:
)
models_to_backfill = self._build_models_to_backfill(dag, restatements)

interval_end_per_model = self._interval_end_per_model
if interval_end_per_model and self.override_end:
end_override_per_model = self._end_override_per_model
if end_override_per_model and self.override_end:
# If the end date was provided explicitly by a user, then interval end for each individual
# model should be ignored.
interval_end_per_model = None
end_override_per_model = None

# this deliberately uses the passed in self._execution_time and not self.execution_time cached property
# the reason is because that there can be a delay between the Plan being built and the Plan being actually run,
Expand All @@ -322,7 +330,8 @@ def build(self) -> Plan:
indirectly_modified=indirectly_modified,
deployability_index=deployability_index,
restatements=restatements,
interval_end_per_model=interval_end_per_model,
start_override_per_model=self._start_override_per_model,
end_override_per_model=end_override_per_model,
selected_models_to_backfill=self._backfill_models,
models_to_backfill=models_to_backfill,
effective_from=self._effective_from,
Expand Down
12 changes: 8 additions & 4 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class Plan(PydanticModel, frozen=True):

deployability_index: DeployabilityIndex
restatements: t.Dict[SnapshotId, Interval]
interval_end_per_model: t.Optional[t.Dict[str, int]]
start_override_per_model: t.Optional[t.Dict[str, datetime]]
end_override_per_model: t.Optional[t.Dict[str, datetime]]

selected_models_to_backfill: t.Optional[t.Set[str]] = None
"""Models that have been explicitly selected for backfill by a user."""
Expand Down Expand Up @@ -177,7 +178,8 @@ def missing_intervals(self) -> t.List[SnapshotIntervals]:
execution_time=self.execution_time,
restatements=self.restatements,
deployability_index=self.deployability_index,
interval_end_per_model=self.interval_end_per_model,
start_override_per_model=self.start_override_per_model,
end_override_per_model=self.end_override_per_model,
end_bounded=self.end_bounded,
).items()
if snapshot.is_model and missing
Expand Down Expand Up @@ -265,7 +267,8 @@ def to_evaluatable(self) -> EvaluatablePlan:
removed_snapshots=sorted(self.context_diff.removed_snapshots),
requires_backfill=self.requires_backfill,
models_to_backfill=self.models_to_backfill,
interval_end_per_model=self.interval_end_per_model,
start_override_per_model=self.start_override_per_model,
end_override_per_model=self.end_override_per_model,
execution_time=self.execution_time,
disabled_restatement_models={
s.name
Expand Down Expand Up @@ -303,7 +306,8 @@ class EvaluatablePlan(PydanticModel):
removed_snapshots: t.List[SnapshotId]
requires_backfill: bool
models_to_backfill: t.Optional[t.Set[str]] = None
interval_end_per_model: t.Optional[t.Dict[str, int]] = None
start_override_per_model: t.Optional[t.Dict[str, datetime]] = None
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None
execution_time: t.Optional[TimeLike] = None
disabled_restatement_models: t.Set[str]
environment_statements: t.Optional[t.List[EnvironmentStatements]] = None
Expand Down
3 changes: 2 additions & 1 deletion sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ def visit_audit_only_run_stage(
plan.end,
execution_time=plan.execution_time,
end_bounded=plan.end_bounded,
interval_end_per_model=plan.interval_end_per_model,
start_override_per_model=plan.start_override_per_model,
end_override_per_model=plan.end_override_per_model,
)

if completion_status.is_failure:
Expand Down
3 changes: 2 additions & 1 deletion sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ def _missing_intervals(
},
deployability_index=deployability_index,
end_bounded=plan.end_bounded,
interval_end_per_model=plan.interval_end_per_model,
start_override_per_model=plan.start_override_per_model,
end_override_per_model=plan.end_override_per_model,
)

def _get_audit_only_snapshots(
Expand Down
Loading