Skip to content

Commit 1e56590

Browse files
committed
Widen parent start date override if it ends up being later than a child start date override
1 parent 1fb7b50 commit 1e56590

File tree

2 files changed

+196
-7
lines changed

2 files changed

+196
-7
lines changed

sqlmesh/core/context.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2925,7 +2925,7 @@ def _calculate_start_override_per_model(
29252925
# If we dont have a minimum number of intervals to consider, then we dont need to adjust the start date on a per-model basis
29262926
return {}
29272927

2928-
start_overrides = {}
2928+
start_overrides: t.Dict[str, datetime] = {}
29292929
end_override_per_model = end_override_per_model or {}
29302930

29312931
plan_execution_time_dt = to_datetime(plan_execution_time)
@@ -2934,14 +2934,42 @@ def _calculate_start_override_per_model(
29342934
plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt
29352935
)
29362936

2937-
for model_fqn in backfill_model_fqns:
2937+
# we need to take the DAG into account so that parent models can be expanded to cover at least as much as their children
2938+
# for example, A(hourly) <- B(daily)
2939+
# if min_intervals=1, A would have 1 hour and B would have 1 day
2940+
# but B depends on A so in order for B to have 1 valid day, A needs to be expanded to 24 hours
2941+
backfill_dag: DAG[str] = DAG()
2942+
for fqn in backfill_model_fqns:
2943+
backfill_dag.add(
2944+
fqn,
2945+
[
2946+
p.name
2947+
for p in snapshots_by_model_fqn[fqn].parents
2948+
if p.name in backfill_model_fqns
2949+
],
2950+
)
2951+
2952+
# start from the leaf nodes and work back towards the root because the min_start at the root node is determined by the calculated starts in the leaf nodes
2953+
reversed_dag = backfill_dag.reversed
2954+
graph = reversed_dag.graph
2955+
2956+
for model_fqn in reversed_dag:
2957+
# Get the earliest start from all immediate children of this snapshot
2958+
# this works because topological ordering guarantees that they've already been visited
2959+
# and we always set a start override
2960+
min_child_start = min(
2961+
[start_overrides[immediate_child_fqn] for immediate_child_fqn in graph[model_fqn]],
2962+
default=plan_start_dt,
2963+
)
2964+
29382965
snapshot = snapshots_by_model_fqn.get(model_fqn)
2966+
29392967
if not snapshot:
29402968
continue
29412969

29422970
starting_point = end_override_per_model.get(model_fqn, plan_end_dt)
29432971
if node_end := snapshot.node.end:
2944-
# if we dont do this, if the node end is a date (as opposed to a timestamp)
2972+
# if we dont do this, if the node end is a *date* (as opposed to a timestamp)
29452973
# we end up incorrectly winding back an extra day
29462974
node_end_dt = make_exclusive(node_end)
29472975

@@ -2956,10 +2984,7 @@ def _calculate_start_override_per_model(
29562984
# wind back the starting point by :min_intervals intervals to arrive at the minimum snapshot start date
29572985
snapshot_start = snapshot.node.cron_prev(snapshot_start)
29582986

2959-
# only consider this an override if the wound-back start date is earlier than the plan start date
2960-
# if it isnt then the plan already covers :min_intervals intervals for this snapshot
2961-
if snapshot_start < plan_start_dt:
2962-
start_overrides[model_fqn] = snapshot_start
2987+
start_overrides[model_fqn] = min(min_child_start, snapshot_start)
29632988

29642989
return start_overrides
29652990

tests/core/test_context.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2567,3 +2567,167 @@ def _get_missing_intervals(plan: Plan, name: str) -> t.List[t.Tuple[datetime, da
25672567
) == [
25682568
(to_datetime("2020-01-18 00:00:00"), to_datetime("2020-01-18 23:59:59.999999")),
25692569
]
2570+
2571+
2572+
def test_plan_min_intervals_adjusted_for_downstream(tmp_path: Path):
2573+
"""
2574+
Scenario:
2575+
A(hourly) <- B(daily) <- C(weekly)
2576+
<- D(two-hourly)
2577+
E(monthly)
2578+
2579+
We need to ensure that :min_intervals covers at least :min_intervals of all downstream models for the dag to be valid
2580+
In this scenario, if min_intervals=1:
2581+
- A would need to cover at least (7 days * 24 hours) because its downstream model C is weekly. It should also be unaffected by its sibling, E
2582+
- B would need to cover at least 7 days because its downstream model C is weekly
2583+
- C would need to cover at least 1 week because min_intervals: 1
2584+
- D would need to cover at least 2 hours because min_intervals: 1 and should be unaffected by C
2585+
- E is unrelated to A, B, C and D so would need to cover 1 month satisfy min_intervals: 1.
2586+
- It also ensures that each tree branch has a unique cumulative date, because
2587+
if the dag is iterated purely in topological order with a global min date it would set A to to 1 month instead if 1 week
2588+
"""
2589+
2590+
init_example_project(tmp_path, engine_type="duckdb", dialect="duckdb")
2591+
2592+
context = Context(
2593+
paths=tmp_path, config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2594+
)
2595+
2596+
current_time = to_datetime("2020-02-01 00:00:01")
2597+
2598+
# initial state of example project
2599+
context.plan(auto_apply=True, execution_time=current_time)
2600+
2601+
(tmp_path / "models" / "hourly_model.sql").write_text("""
2602+
MODEL (
2603+
name sqlmesh_example.hourly_model,
2604+
kind INCREMENTAL_BY_TIME_RANGE (
2605+
time_column start_dt,
2606+
batch_size 1
2607+
),
2608+
start '2020-01-01',
2609+
cron '@hourly'
2610+
);
2611+
2612+
select @start_dt as start_dt, @end_dt as end_dt;
2613+
""")
2614+
2615+
(tmp_path / "models" / "two_hourly_model.sql").write_text("""
2616+
MODEL (
2617+
name sqlmesh_example.two_hourly_model,
2618+
kind INCREMENTAL_BY_TIME_RANGE (
2619+
time_column start_dt
2620+
),
2621+
start '2020-01-01',
2622+
cron '0 */2 * * *'
2623+
);
2624+
2625+
select start_dt, end_dt from sqlmesh_example.hourly_model where start_dt between @start_dt and @end_dt;
2626+
""")
2627+
2628+
(tmp_path / "models" / "unrelated_monthly_model.sql").write_text("""
2629+
MODEL (
2630+
name sqlmesh_example.unrelated_monthly_model,
2631+
kind INCREMENTAL_BY_TIME_RANGE (
2632+
time_column start_dt
2633+
),
2634+
start '2020-01-01',
2635+
cron '@monthly'
2636+
);
2637+
2638+
select @start_dt as start_dt, @end_dt as end_dt;
2639+
""")
2640+
2641+
(tmp_path / "models" / "daily_model.sql").write_text("""
2642+
MODEL (
2643+
name sqlmesh_example.daily_model,
2644+
kind INCREMENTAL_BY_TIME_RANGE (
2645+
time_column start_dt
2646+
),
2647+
start '2020-01-01',
2648+
cron '@daily'
2649+
);
2650+
2651+
select start_dt, end_dt from sqlmesh_example.hourly_model where start_dt between @start_dt and @end_dt;
2652+
""")
2653+
2654+
(tmp_path / "models" / "weekly_model.sql").write_text("""
2655+
MODEL (
2656+
name sqlmesh_example.weekly_model,
2657+
kind INCREMENTAL_BY_TIME_RANGE (
2658+
time_column start_dt
2659+
),
2660+
start '2020-01-01',
2661+
cron '@weekly'
2662+
);
2663+
2664+
select start_dt, end_dt from sqlmesh_example.daily_model where start_dt between @start_dt and @end_dt;
2665+
""")
2666+
2667+
context.load()
2668+
2669+
# create a dev env for "1 day ago" with min_intervals=1
2670+
# this should force a weeks worth of intervals for every model
2671+
plan = context.plan(
2672+
environment="pr_env",
2673+
start="1 day ago",
2674+
execution_time=current_time,
2675+
min_intervals=1,
2676+
)
2677+
2678+
def _get_missing_intervals(name: str) -> t.List[t.Tuple[datetime, datetime]]:
2679+
snapshot_id = context.get_snapshot(name, raise_if_missing=True).snapshot_id
2680+
snapshot_intervals = next(
2681+
si for si in plan.missing_intervals if si.snapshot_id == snapshot_id
2682+
)
2683+
return [(to_datetime(s), to_datetime(e)) for s, e in snapshot_intervals.merged_intervals]
2684+
2685+
# We only operate on completed intervals, so given the current_time this is the range of the last completed week
2686+
_get_missing_intervals("sqlmesh_example.weekly_model") == [
2687+
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-26 00:00:00"))
2688+
]
2689+
2690+
# The daily model needs to cover the week, so it gets its start date moved back to line up
2691+
_get_missing_intervals("sqlmesh_example.daily_model") == [
2692+
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2693+
]
2694+
2695+
# The hourly model needs to cover both the daily model and the weekly model, so it also gets its start date moved back to line up with the weekly model
2696+
assert _get_missing_intervals("sqlmesh_example.hourly_model") == [
2697+
(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2698+
]
2699+
2700+
# The two-hourly model only needs to cover 2 hours and should be unaffected by the fact its sibling node has a weekly child node
2701+
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
2702+
assert _get_missing_intervals("sqlmesh_example.two_hourly_model") == [
2703+
(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2704+
]
2705+
2706+
# The unrelated model has no upstream constraints, so its start date doesnt get moved to line up with the weekly model
2707+
# However it still gets backfilled for 24 hours because the plan start is 1 day and this satisfies min_intervals: 1
2708+
_get_missing_intervals("sqlmesh_example.unrelated_monthly_model") == [
2709+
(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-02-01 00:00:00"))
2710+
]
2711+
2712+
# Check that actually running the plan produces the correct result, since missing intervals are re-calculated in the evaluator
2713+
context.apply(plan)
2714+
2715+
assert context.engine_adapter.fetchall(
2716+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.weekly_model"
2717+
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-25 23:59:59.999999"))]
2718+
2719+
assert context.engine_adapter.fetchall(
2720+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.daily_model"
2721+
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2722+
2723+
assert context.engine_adapter.fetchall(
2724+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.hourly_model"
2725+
) == [(to_datetime("2020-01-19 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2726+
2727+
assert context.engine_adapter.fetchall(
2728+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.two_hourly_model"
2729+
) == [(to_datetime("2020-01-31 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]
2730+
2731+
assert context.engine_adapter.fetchall(
2732+
"select min(start_dt), max(end_dt) from sqlmesh_example__pr_env.unrelated_monthly_model"
2733+
) == [(to_datetime("2020-01-01 00:00:00"), to_datetime("2020-01-31 23:59:59.999999"))]

0 commit comments

Comments
 (0)