Skip to content

Commit 6548867

Browse files
Update offset implementation (#1826)
When querying offset metrics, we typically apply the offset join before aggregation. This is because post-aggregation, the grain is often no longer accessible (e.g., if we have aggregated to the year, month grain would no longer be available to offset by). This PR updates the logic to offset post-aggregation when the queried grain matches the offset grain (a scenario that ensures the offset grain is still available). This change ensures that the entire period's values are included when offsetting.
1 parent 9f4a569 commit 6548867

File tree

292 files changed

+30386
-34166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

292 files changed

+30386
-34166
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
kind: Fixes
2+
body: Apply offset join after aggregation when queried grain is available post-aggregation.
3+
This ensures results match user expectations.
4+
time: 2025-09-03T19:52:52.4573-07:00
5+
custom:
6+
Author: courtneyholcomb
7+
Issue: "1826,1758"

.cursorrules

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# MetricFlow Project Rules
2+
3+
This project uses **hatch** for dependency management and testing:
4+
5+
- Dependencies: Use `hatch` commands, not `pip install` directly
6+
- Tests: Use `hatch run dev-env:pytest <path>` instead of `pytest` directly
7+
- Snapshots: Add `--overwrite-snapshots` for new snapshot tests
8+
- Common: `make test` or `hatch run dev-env:pytest tests_metricflow/`

metricflow-semantics/metricflow_semantics/specs/measure_spec.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,8 @@ def custom_offset_window(self) -> Optional[MetricTimeWindow]:
105105
if self.offset_window and not self.offset_window.is_standard_granularity:
106106
return self.offset_window
107107
return None
108+
109+
@property
110+
def uses_offset(self) -> bool:
111+
"""Return True if the measure uses an offset."""
112+
return self.offset_window is not None or self.offset_to_grain is not None

metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -933,3 +933,23 @@ metric:
933933
offset_window: 1 alien_day
934934
alias: bookings_offset
935935
- name: bookings
936+
---
937+
metric:
938+
name: trailing_7_days_bookings
939+
description: "trailing 7 days bookings - cumulative metric with window"
940+
type: cumulative
941+
type_params:
942+
measure: bookings
943+
cumulative_type_params:
944+
window: 7 days
945+
---
946+
metric:
947+
name: trailing_7_days_bookings_offset_1_week
948+
description: "trailing 7 days bookings offset by 1 week - offset_window metric using cumulative input"
949+
type: derived
950+
type_params:
951+
expr: trailing_7_days_bookings_1_week_ago
952+
metrics:
953+
- name: trailing_7_days_bookings
954+
offset_window: 1 week
955+
alias: trailing_7_days_bookings_1_week_ago

metricflow/dataflow/builder/dataflow_plan_builder.py

Lines changed: 80 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,31 +1366,67 @@ def _build_input_measure_spec(
13661366
),
13671367
)
13681368

1369+
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
1370+
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
1371+
)
1372+
smallest_queried_agg_time_grain: Optional[ExpandedTimeGranularity] = None
13691373
before_aggregation_time_spine_join_description = None
1370-
# If querying an offset metric, join to time spine.
1374+
after_aggregation_time_spine_join_description = None
13711375
if child_metric_offset_window is not None or child_metric_offset_to_grain is not None:
1372-
before_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
1376+
if child_metric_offset_window is not None:
1377+
offset_grain_name = child_metric_offset_window.granularity
1378+
if ExpandedTimeGranularity.is_standard_granularity_name(offset_grain_name):
1379+
offset_grain = ExpandedTimeGranularity.from_time_granularity(TimeGranularity(offset_grain_name))
1380+
else:
1381+
offset_grain = ExpandedTimeGranularity(
1382+
name=offset_grain_name,
1383+
base_granularity=self._get_base_grain_for_custom_grain(offset_grain_name),
1384+
)
1385+
else:
1386+
assert (
1387+
child_metric_offset_to_grain is not None
1388+
), "Offset to grain must be specified if no offset window is specified."
1389+
offset_grain = ExpandedTimeGranularity.from_time_granularity(child_metric_offset_to_grain)
1390+
1391+
# Determine the smallest queried agg time dimension grain (this is the grain we'll aggregate to)
1392+
if len(queried_agg_time_dimension_specs) == 0:
1393+
raise ValueError(
1394+
"No agg_time_dimension requested in offset metric query. This should have been validated earlier."
1395+
)
1396+
smallest_queried_agg_time_grain = self._sort_by_base_granularity(queried_agg_time_dimension_specs)[
1397+
0
1398+
].time_granularity
1399+
1400+
# If the smallest queried grain is equal to the offset grain, join after aggregation. Otherwise, the grain
1401+
# needed for offset will not be available anymore, so join before aggregation.
1402+
join_to_time_spine_description = JoinToTimeSpineDescription(
13731403
join_type=SqlJoinType.INNER,
13741404
offset_window=child_metric_offset_window,
13751405
offset_to_grain=child_metric_offset_to_grain,
13761406
)
1377-
1378-
# Even if the measure is configured to join to time spine, if there's no agg_time_dimension in the query,
1379-
# there's no need to join to the time spine since all time will be aggregated.
1380-
after_aggregation_time_spine_join_description = None
1381-
if input_measure.join_to_timespine:
13821407
if (
1383-
len(
1384-
queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
1385-
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
1386-
)
1387-
)
1388-
> 0
1408+
offset_grain
1409+
and smallest_queried_agg_time_grain == offset_grain
1410+
and not offset_grain.is_custom_granularity # custom offset window has special logic handled later
13891411
):
1412+
after_aggregation_time_spine_join_description = join_to_time_spine_description
1413+
else:
1414+
before_aggregation_time_spine_join_description = join_to_time_spine_description
1415+
1416+
# Measures configured to join to time spine will join to time spine after aggregation using LEFT OUTER JOIN.
1417+
# If there's no agg_time_dimension in the query, skip time spine join since all time will be aggregated.
1418+
# If we already need to join to time spine after aggregation due to offset, and the measure is also configured
1419+
# to join to time spine, update to use LEFT OUTER JOIN.
1420+
if input_measure.join_to_timespine and (len(queried_agg_time_dimension_specs) > 0):
1421+
if after_aggregation_time_spine_join_description is not None:
13901422
after_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
13911423
join_type=SqlJoinType.LEFT_OUTER,
1392-
offset_window=None,
1393-
offset_to_grain=None,
1424+
offset_window=after_aggregation_time_spine_join_description.offset_window,
1425+
offset_to_grain=after_aggregation_time_spine_join_description.offset_to_grain,
1426+
)
1427+
else:
1428+
after_aggregation_time_spine_join_description = JoinToTimeSpineDescription(
1429+
join_type=SqlJoinType.LEFT_OUTER, offset_window=None, offset_to_grain=None
13941430
)
13951431

13961432
return MetricInputMeasureSpec(
@@ -1544,7 +1580,7 @@ def __get_required_linkable_specs(
15441580

15451581
return required_linkable_specs
15461582

1547-
def _build_time_spine_join_node_for_measure_config(
1583+
def _build_time_spine_join_node_for_after_aggregation(
15481584
self,
15491585
join_description: JoinToTimeSpineDescription,
15501586
measure_reference: MeasureReference,
@@ -1554,11 +1590,6 @@ def _build_time_spine_join_node_for_measure_config(
15541590
after_aggregation_where_filter_specs: Sequence[WhereFilterSpec],
15551591
time_range_constraint: Optional[TimeRangeConstraint],
15561592
) -> DataflowPlanNode:
1557-
assert join_description.join_type is SqlJoinType.LEFT_OUTER, (
1558-
f"Expected {SqlJoinType.LEFT_OUTER} for joining to time spine after aggregation. Remove this if "
1559-
f"there's a new use case."
1560-
)
1561-
15621593
# Find filters that contain only metric_time or agg_time_dimension. They will be applied to the time spine table.
15631594
agg_time_only_filters: List[WhereFilterSpec] = []
15641595
non_agg_time_filters: List[WhereFilterSpec] = []
@@ -1584,6 +1615,8 @@ def _build_time_spine_join_node_for_measure_config(
15841615
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
15851616
join_on_time_dimension_spec=join_spec,
15861617
join_type=join_description.join_type,
1618+
standard_offset_window=join_description.standard_offset_window,
1619+
offset_to_grain=join_description.offset_to_grain,
15871620
)
15881621

15891622
# Since new rows might have been added due to time spine join, re-apply constraints here. Only re-apply filters
@@ -1654,15 +1687,14 @@ def _build_time_spine_join_node_for_nested_offset(
16541687
)
16551688
return output_node
16561689

1657-
def _build_time_spine_join_node_for_offset(
1690+
def _build_time_spine_join_node_for_before_aggregation(
16581691
self,
16591692
join_description: JoinToTimeSpineDescription,
16601693
measure_properties: MeasureSpecProperties,
16611694
queried_agg_time_dimension_specs: Tuple[TimeDimensionSpec, ...],
16621695
metric_source_node: DataflowPlanNode,
16631696
use_offset_custom_granularity_node: bool,
16641697
) -> DataflowPlanNode:
1665-
"""Build a node to join to the time spine for a measure with the `join_to_timespine: true` YAML config."""
16661698
assert join_description.join_type is SqlJoinType.INNER, (
16671699
f"Expected {SqlJoinType.INNER} for joining to time spine before aggregation. Remove this if there's a "
16681700
f"new use case."
@@ -1761,6 +1793,15 @@ def _build_aggregated_measure_from_measure_source_node(
17611793
before_aggregation_time_spine_join_description = (
17621794
metric_input_measure_spec.before_aggregation_time_spine_join_description
17631795
)
1796+
after_aggregation_time_spine_join_description = (
1797+
metric_input_measure_spec.after_aggregation_time_spine_join_description
1798+
)
1799+
uses_offset = (
1800+
before_aggregation_time_spine_join_description
1801+
and before_aggregation_time_spine_join_description.uses_offset
1802+
) or (
1803+
after_aggregation_time_spine_join_description and after_aggregation_time_spine_join_description.uses_offset
1804+
)
17641805

17651806
if measure_recipe is None:
17661807
logger.debug(
@@ -1772,8 +1813,7 @@ def _build_aggregated_measure_from_measure_source_node(
17721813
)
17731814
measure_time_constraint = (
17741815
(cumulative_metric_adjusted_time_constraint or predicate_pushdown_state.time_range_constraint)
1775-
# If joining to time spine for time offset, constraints will be applied after that join.
1776-
if not before_aggregation_time_spine_join_description
1816+
if not uses_offset # Time constraints will be applied after offset
17771817
else None
17781818
)
17791819
if measure_time_constraint is None:
@@ -1805,10 +1845,14 @@ def _build_aggregated_measure_from_measure_source_node(
18051845
f"Unable to join all items in request. Measure: {measure_spec.element_name}; Specs to join: {required_linkable_specs}"
18061846
)
18071847

1808-
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
1809-
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
1848+
queried_agg_time_dimension_specs = tuple(
1849+
queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
1850+
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
1851+
)
1852+
)
1853+
base_queried_agg_time_dimension_specs = tuple(
1854+
TimeDimensionSpec.with_base_grains(queried_agg_time_dimension_specs)
18101855
)
1811-
base_queried_agg_time_dimension_specs = TimeDimensionSpec.with_base_grains(queried_agg_time_dimension_specs)
18121856

18131857
# If a cumulative metric is queried with metric_time / agg_time_dimension, join over time range.
18141858
# Otherwise, the measure will be aggregated over all time.
@@ -1821,11 +1865,7 @@ def _build_aggregated_measure_from_measure_source_node(
18211865
grain_to_date=cumulative_grain_to_date,
18221866
# Note: we use the original constraint here because the JoinOverTimeRangeNode will eventually get
18231867
# rendered with an interval that expands the join window
1824-
time_range_constraint=(
1825-
predicate_pushdown_state.time_range_constraint
1826-
if not before_aggregation_time_spine_join_description
1827-
else None
1828-
),
1868+
time_range_constraint=(predicate_pushdown_state.time_range_constraint if not uses_offset else None),
18291869
)
18301870

18311871
# If querying an offset metric, join to time spine before aggregation.
@@ -1836,10 +1876,10 @@ def _build_aggregated_measure_from_measure_source_node(
18361876
== {before_aggregation_time_spine_join_description.custom_offset_window.granularity}
18371877
)
18381878
if before_aggregation_time_spine_join_description and queried_agg_time_dimension_specs:
1839-
unaggregated_measure_node = self._build_time_spine_join_node_for_offset(
1879+
unaggregated_measure_node = self._build_time_spine_join_node_for_before_aggregation(
18401880
join_description=before_aggregation_time_spine_join_description,
18411881
measure_properties=measure_properties,
1842-
queried_agg_time_dimension_specs=tuple(queried_agg_time_dimension_specs),
1882+
queried_agg_time_dimension_specs=queried_agg_time_dimension_specs,
18431883
metric_source_node=unaggregated_measure_node,
18441884
use_offset_custom_granularity_node=use_offset_custom_granularity_node,
18451885
)
@@ -1874,21 +1914,16 @@ def _build_aggregated_measure_from_measure_source_node(
18741914
parent_node=unaggregated_measure_node, metric_input_measure_specs=(metric_input_measure_spec,)
18751915
)
18761916

1877-
# Joining to time spine after aggregation is for measures that specify `join_to_timespine: true` in the YAML spec.
1878-
after_aggregation_time_spine_join_description = (
1879-
metric_input_measure_spec.after_aggregation_time_spine_join_description
1880-
)
1881-
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_measure(
1882-
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
1883-
)
18841917
if after_aggregation_time_spine_join_description and queried_agg_time_dimension_specs:
1885-
return self._build_time_spine_join_node_for_measure_config(
1918+
return self._build_time_spine_join_node_for_after_aggregation(
18861919
join_description=after_aggregation_time_spine_join_description,
18871920
measure_reference=measure_spec.reference,
18881921
measure_source_node=aggregate_measures_node,
1889-
queried_agg_time_dimension_specs=tuple(queried_agg_time_dimension_specs),
1922+
queried_agg_time_dimension_specs=queried_agg_time_dimension_specs,
18901923
queried_linkable_specs=queried_linkable_specs.as_tuple,
1891-
after_aggregation_where_filter_specs=metric_input_measure_spec.filter_spec_set.after_measure_aggregation_filter_specs,
1924+
after_aggregation_where_filter_specs=(
1925+
metric_input_measure_spec.filter_spec_set.after_measure_aggregation_filter_specs
1926+
),
18921927
time_range_constraint=predicate_pushdown_state.time_range_constraint,
18931928
)
18941929

tests_metricflow/generate_snapshots.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,34 +81,34 @@ class MetricFlowTestCredentialSetForAllEngines(FrozenBaseModel): # noqa: D101
8181
@property
8282
def as_configurations(self) -> Sequence[MetricFlowEngineConfiguration]: # noqa: D102
8383
return (
84-
MetricFlowEngineConfiguration(
85-
engine=SqlEngine.DUCKDB,
86-
credential_set=self.duck_db,
87-
),
88-
MetricFlowEngineConfiguration(
89-
engine=SqlEngine.REDSHIFT,
90-
credential_set=self.redshift,
91-
),
92-
MetricFlowEngineConfiguration(
93-
engine=SqlEngine.SNOWFLAKE,
94-
credential_set=self.snowflake,
95-
),
96-
MetricFlowEngineConfiguration(
97-
engine=SqlEngine.BIGQUERY,
98-
credential_set=self.big_query,
99-
),
84+
# MetricFlowEngineConfiguration(
85+
# engine=SqlEngine.DUCKDB,
86+
# credential_set=self.duck_db,
87+
# ),
88+
# MetricFlowEngineConfiguration(
89+
# engine=SqlEngine.REDSHIFT,
90+
# credential_set=self.redshift,
91+
# ),
92+
# MetricFlowEngineConfiguration(
93+
# engine=SqlEngine.SNOWFLAKE,
94+
# credential_set=self.snowflake,
95+
# ),
96+
# MetricFlowEngineConfiguration(
97+
# engine=SqlEngine.BIGQUERY,
98+
# credential_set=self.big_query,
99+
# ),
100100
MetricFlowEngineConfiguration(
101101
engine=SqlEngine.DATABRICKS,
102102
credential_set=self.databricks,
103103
),
104-
MetricFlowEngineConfiguration(
105-
engine=SqlEngine.POSTGRES,
106-
credential_set=self.postgres,
107-
),
108-
MetricFlowEngineConfiguration(
109-
engine=SqlEngine.TRINO,
110-
credential_set=self.trino,
111-
),
104+
# MetricFlowEngineConfiguration(
105+
# engine=SqlEngine.POSTGRES,
106+
# credential_set=self.postgres,
107+
# ),
108+
# MetricFlowEngineConfiguration(
109+
# engine=SqlEngine.TRINO,
110+
# credential_set=self.trino,
111+
# ),
112112
)
113113

114114

0 commit comments

Comments
 (0)