-
Notifications
You must be signed in to change notification settings - Fork 239
Feat: Allow specifying a minimum number of intervals to include for each model in a plan #4780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
from pathlib import Path | ||
from shutil import rmtree | ||
from types import MappingProxyType | ||
from datetime import datetime | ||
|
||
from sqlglot import Dialect, exp | ||
from sqlglot.helper import first | ||
|
@@ -125,6 +126,8 @@ | |
format_tz_datetime, | ||
now_timestamp, | ||
now, | ||
to_datetime, | ||
make_exclusive, | ||
) | ||
from sqlmesh.utils.errors import ( | ||
CircuitBreakerError, | ||
|
@@ -1222,6 +1225,7 @@ def plan( | |
diff_rendered: t.Optional[bool] = None, | ||
skip_linter: t.Optional[bool] = None, | ||
explain: t.Optional[bool] = None, | ||
min_intervals: t.Optional[int] = None, | ||
) -> Plan: | ||
"""Interactively creates a plan. | ||
|
||
|
@@ -1268,6 +1272,8 @@ def plan( | |
diff_rendered: Whether the diff should compare raw vs rendered models | ||
skip_linter: Linter runs by default so this will skip it if enabled | ||
explain: Whether to explain the plan instead of applying it. | ||
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered | ||
on every model when checking for missing intervals | ||
|
||
Returns: | ||
The populated Plan object. | ||
|
@@ -1296,6 +1302,7 @@ def plan( | |
diff_rendered=diff_rendered, | ||
skip_linter=skip_linter, | ||
explain=explain, | ||
min_intervals=min_intervals, | ||
) | ||
|
||
plan = plan_builder.build() | ||
|
@@ -1345,6 +1352,7 @@ def plan_builder( | |
diff_rendered: t.Optional[bool] = None, | ||
skip_linter: t.Optional[bool] = None, | ||
explain: t.Optional[bool] = None, | ||
min_intervals: t.Optional[int] = None, | ||
) -> PlanBuilder: | ||
"""Creates a plan builder. | ||
|
||
|
@@ -1381,6 +1389,8 @@ def plan_builder( | |
enable_preview: Indicates whether to enable preview for forward-only models in development environments. | ||
run: Whether to run latest intervals as part of the plan application. | ||
diff_rendered: Whether the diff should compare raw vs rendered models | ||
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered | ||
on every model when checking for missing intervals | ||
|
||
Returns: | ||
The plan builder. | ||
|
@@ -1408,6 +1418,7 @@ def plan_builder( | |
"run": run, | ||
"diff_rendered": diff_rendered, | ||
"skip_linter": skip_linter, | ||
"min_intervals": min_intervals, | ||
} | ||
user_provided_flags: t.Dict[str, UserProvidedFlags] = { | ||
k: v for k, v in kwargs.items() if v is not None | ||
|
@@ -1530,6 +1541,16 @@ def plan_builder( | |
# Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. | ||
self.state_sync.refresh_snapshot_intervals(context_diff.snapshots.values()) | ||
|
||
start_override_per_model = self._calculate_start_override_per_model( | ||
min_intervals, | ||
start or default_start, | ||
end or default_end, | ||
execution_time or now(), | ||
backfill_models, | ||
snapshots, | ||
max_interval_end_per_model, | ||
) | ||
|
||
return self.PLAN_BUILDER_TYPE( | ||
context_diff=context_diff, | ||
start=start, | ||
|
@@ -1560,7 +1581,8 @@ def plan_builder( | |
), | ||
end_bounded=not run, | ||
ensure_finalized_snapshots=self.config.plan.use_finalized_state, | ||
interval_end_per_model=max_interval_end_per_model, | ||
start_override_per_model=start_override_per_model, | ||
end_override_per_model=max_interval_end_per_model, | ||
console=self.console, | ||
user_provided_flags=user_provided_flags, | ||
explain=explain or False, | ||
|
@@ -2850,15 +2872,15 @@ def _plan_preview_enabled(self) -> bool: | |
def _get_plan_default_start_end( | ||
self, | ||
snapshots: t.Dict[str, Snapshot], | ||
max_interval_end_per_model: t.Dict[str, int], | ||
max_interval_end_per_model: t.Dict[str, datetime], | ||
backfill_models: t.Optional[t.Set[str]], | ||
modified_model_names: t.Set[str], | ||
execution_time: t.Optional[TimeLike] = None, | ||
) -> t.Tuple[t.Optional[int], t.Optional[int]]: | ||
if not max_interval_end_per_model: | ||
return None, None | ||
|
||
default_end = max(max_interval_end_per_model.values()) | ||
default_end = to_timestamp(max(max_interval_end_per_model.values())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, why did we change this to datetime only to convert back to timestamp later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rome wasnt built in a day and the rest of the code in that method was ints. One day I hope we will use proper types internally and push |
||
default_start: t.Optional[int] = None | ||
# Infer the default start by finding the smallest interval start that corresponds to the default end. | ||
for model_name in backfill_models or modified_model_names or max_interval_end_per_model: | ||
|
@@ -2887,19 +2909,101 @@ def _get_plan_default_start_end( | |
|
||
return default_start, default_end | ||
|
||
def _calculate_start_override_per_model( | ||
self, | ||
min_intervals: t.Optional[int], | ||
plan_start: t.Optional[TimeLike], | ||
plan_end: t.Optional[TimeLike], | ||
plan_execution_time: TimeLike, | ||
backfill_model_fqns: t.Optional[t.Set[str]], | ||
snapshots_by_model_fqn: t.Dict[str, Snapshot], | ||
end_override_per_model: t.Optional[t.Dict[str, datetime]], | ||
) -> t.Dict[str, datetime]: | ||
if not min_intervals or not backfill_model_fqns or not plan_start: | ||
# If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number | ||
# If the plan doesnt have a start date, all intervals are considered already so we dont need to consider a minimum number | ||
# If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis | ||
return {} | ||
|
||
start_overrides: t.Dict[str, datetime] = {} | ||
end_override_per_model = end_override_per_model or {} | ||
|
||
plan_execution_time_dt = to_datetime(plan_execution_time) | ||
plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt) | ||
plan_end_dt = to_datetime( | ||
plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt | ||
) | ||
|
||
# we need to take the DAG into account so that parent models can be expanded to cover at least as much as their children | ||
# for example, A(hourly) <- B(daily) | ||
# if min_intervals=1, A would have 1 hour and B would have 1 day | ||
# but B depends on A so in order for B to have 1 valid day, A needs to be expanded to 24 hours | ||
backfill_dag: DAG[str] = DAG() | ||
for fqn in backfill_model_fqns: | ||
backfill_dag.add( | ||
fqn, | ||
[ | ||
p.name | ||
for p in snapshots_by_model_fqn[fqn].parents | ||
if p.name in backfill_model_fqns | ||
], | ||
) | ||
|
||
# start from the leaf nodes and work back towards the root because the min_start at the root node is determined by the calculated starts in the leaf nodes | ||
reversed_dag = backfill_dag.reversed | ||
graph = reversed_dag.graph | ||
|
||
for model_fqn in reversed_dag: | ||
# Get the earliest start from all immediate children of this snapshot | ||
# this works because topological ordering guarantees that they've already been visited | ||
# and we always set a start override | ||
min_child_start = min( | ||
[start_overrides[immediate_child_fqn] for immediate_child_fqn in graph[model_fqn]], | ||
default=plan_start_dt, | ||
) | ||
|
||
snapshot = snapshots_by_model_fqn.get(model_fqn) | ||
|
||
if not snapshot: | ||
continue | ||
|
||
starting_point = end_override_per_model.get(model_fqn, plan_end_dt) | ||
if node_end := snapshot.node.end: | ||
# if we dont do this, if the node end is a *date* (as opposed to a timestamp) | ||
# we end up incorrectly winding back an extra day | ||
node_end_dt = make_exclusive(node_end) | ||
|
||
if node_end_dt < plan_end_dt: | ||
# if the model has an end date that has already elapsed, use that as a starting point for calculating min_intervals | ||
# instead of the plan end. If we use the plan end, we will return intervals in the future which are invalid | ||
starting_point = node_end_dt | ||
|
||
snapshot_start = snapshot.node.cron_floor(starting_point) | ||
|
||
for _ in range(min_intervals): | ||
# wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date | ||
snapshot_start = snapshot.node.cron_prev(snapshot_start) | ||
|
||
start_overrides[model_fqn] = min(min_child_start, snapshot_start) | ||
|
||
return start_overrides | ||
|
||
def _get_max_interval_end_per_model( | ||
self, snapshots: t.Dict[str, Snapshot], backfill_models: t.Optional[t.Set[str]] | ||
) -> t.Dict[str, int]: | ||
) -> t.Dict[str, datetime]: | ||
models_for_interval_end = ( | ||
self._get_models_for_interval_end(snapshots, backfill_models) | ||
if backfill_models is not None | ||
else None | ||
) | ||
return self.state_sync.max_interval_end_per_model( | ||
c.PROD, | ||
models=models_for_interval_end, | ||
ensure_finalized_snapshots=self.config.plan.use_finalized_state, | ||
) | ||
return { | ||
model_fqn: to_datetime(ts) | ||
for model_fqn, ts in self.state_sync.max_interval_end_per_model( | ||
c.PROD, | ||
models=models_for_interval_end, | ||
ensure_finalized_snapshots=self.config.plan.use_finalized_state, | ||
).items() | ||
} | ||
|
||
@staticmethod | ||
def _get_models_for_interval_end( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the name be consistent with
interval_end_per_model
? I don't care which one is it, but I feel like they represent similar thing and should be named similarly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed
interval_end_per_model
toend_override_per_model
on the Plan to match