Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
dd8764e
Some clarifications, improvements to GroupedRollingWindows in cudf-po…
Matt711 Aug 22, 2025
c60a6e2
clean up
Matt711 Aug 22, 2025
a94b6ab
Merge branch 'branch-25.10' into imp/polars/over
Matt711 Aug 22, 2025
57c6b4b
Merge branch 'branch-25.10' into imp/polars/over
Matt711 Aug 26, 2025
3fda0a0
Support rank(...).over(...) expressions in cudf-polars
Matt711 Aug 27, 2025
23e639b
merge conflict
Matt711 Aug 27, 2025
4ceedf2
implement dense, min, max, and average
Matt711 Aug 28, 2025
f7301bb
Merge branch 'branch-25.10' into fea/polars/rank-over
Matt711 Aug 28, 2025
d0522b0
add version guards
Matt711 Aug 28, 2025
1fc7711
Merge branch 'branch-25.10' into fea/polars/rank-over
Matt711 Aug 28, 2025
e3ee3bd
use a single groupby scan
Matt711 Aug 28, 2025
a197348
Merge branch 'branch-25.10' into fea/polars/rank-over
Matt711 Aug 28, 2025
8694426
Merge branch 'branch-25.10' into fea/polars/rank-over
Matt711 Sep 4, 2025
099a03c
address review
Matt711 Sep 4, 2025
fea62a7
Support ordered grouped windows in cudf-polars
Matt711 Sep 4, 2025
0bfe8bc
Merge branch 'branch-25.10' into fea/polars/over-expr-with-orderby
Matt711 Sep 4, 2025
14d637f
Merge branch 'branch-25.10' into fea/polars/over-expr-with-orderby
Matt711 Sep 5, 2025
b17d867
catch streaming test warning
Matt711 Sep 5, 2025
ba233bf
Merge branch 'branch-25.10' into fea/polars/over-expr-with-orderby
Matt711 Sep 5, 2025
24b27f8
Merge branch 'branch-25.10' into fea/polars/over-expr-with-orderby
Matt711 Sep 5, 2025
5693e76
make unary op implementations easier to support
Matt711 Sep 5, 2025
5ecaedc
not yet
Matt711 Sep 5, 2025
1bec1b7
Support forward/backward filling null values in a grouped window context
Matt711 Sep 5, 2025
1a298d5
Support cum_sum(...).over(...) expressions in cudf-polars
Matt711 Sep 5, 2025
b4b7764
Merge branch 'branch-25.10' into fea/polars/cum-sum-over
Matt711 Sep 6, 2025
bfefc98
address review
Matt711 Sep 6, 2025
a0eab9e
code coverage
Matt711 Sep 6, 2025
c8f89ae
by exprs can contain tuples
Matt711 Sep 6, 2025
d25e0c3
code coverage
Matt711 Sep 7, 2025
ac65b75
mean not avg
Matt711 Sep 7, 2025
16f8982
Merge branch 'branch-25.10' into fea/polars/cum-sum-over
Matt711 Sep 8, 2025
2a7a8a8
merge conflict
Matt711 Sep 8, 2025
d13beaa
Merge branch 'branch-25.10' into fea/polars/cum-sum-over
Matt711 Sep 9, 2025
d157a24
merge conflict
Matt711 Oct 8, 2025
f1989a9
Merge branch 'branch-25.12' into fea/polars/cum-sum-over
Matt711 Oct 8, 2025
c361978
simplify test
Matt711 Oct 8, 2025
48fb152
Merge branch 'branch-25.12' into fea/polars/cum-sum-over
Matt711 Oct 8, 2025
f174799
Merge branch 'branch-25.12' into fea/polars/cum-sum-over
Matt711 Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/expressions/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class FillNullWithStrategyOp(UnaryOp):
policy: plc.replace.ReplacePolicy = plc.replace.ReplacePolicy.PRECEDING


@dataclass(frozen=True)
class CumSumOp(UnaryOp):
pass


def to_request(
value: expr.Expr, orderby: Column, df: DataFrame
) -> plc.rolling.RollingRequest:
Expand Down Expand Up @@ -241,7 +246,8 @@ def __init__(
isinstance(named_expr.value, (expr.Len, expr.Agg))
or (
isinstance(named_expr.value, expr.UnaryFunction)
and named_expr.value.name in {"rank", "fill_null_with_strategy"}
and named_expr.value.name
in {"rank", "fill_null_with_strategy", "cum_sum"}
)
)
]
Expand All @@ -265,7 +271,7 @@ def __init__(
if isinstance(v, expr.Agg)
or (
isinstance(v, expr.UnaryFunction)
and v.name in {"rank", "fill_null_with_strategy"}
and v.name in {"rank", "fill_null_with_strategy", "cum_sum"}
)
]
self.by_count = len(by_expr)
Expand Down Expand Up @@ -393,6 +399,41 @@ def _(
dtypes = [ne.value.dtype for ne in named_exprs]
return names, dtypes, tables

@_apply_unary_op.register
def _(
self,
op: CumSumOp,
df: DataFrame,
_: plc.groupby.GroupBy,
) -> tuple[list[str], list[DataType], list[plc.Table]]:
cum_named = op.named_exprs
order_index = op.order_index

requests: list[plc.groupby.GroupByRequest] = []
out_names: list[str] = []
out_dtypes: list[DataType] = []

val_cols = self._gather_columns(
[
ne.value.children[0].evaluate(df, context=ExecutionContext.FRAME).obj
for ne in cum_named
],
order_index,
cudf_polars_column=False,
)
agg = plc.aggregation.sum()

for ne, val_col in zip(cum_named, val_cols, strict=True):
requests.append(plc.groupby.GroupByRequest(val_col, [agg]))
out_names.append(ne.name)
out_dtypes.append(ne.value.dtype)

lg = op.local_grouper
assert isinstance(lg, plc.groupby.GroupBy)
_, tables = lg.scan(requests)

return out_names, out_dtypes, tables

def _reorder_to_input(
self,
row_id: plc.Column,
Expand Down Expand Up @@ -444,6 +485,7 @@ def _split_named_expr(
unary_window_ops: dict[str, list[expr.NamedExpr]] = {
"rank": [],
"fill_null_with_strategy": [],
"cum_sum": [],
}

for ne in self.named_aggs:
Expand Down Expand Up @@ -733,6 +775,40 @@ def do_evaluate( # noqa: D102
)
)

if cum_named := unary_window_ops["cum_sum"]:
order_index = self._build_window_order_index(
by_cols,
row_id=row_id,
order_by_col=order_by_col if self._order_by_expr is not None else None,
ob_desc=self.options[2] if self._order_by_expr is not None else False,
ob_nulls_last=self.options[3]
if self._order_by_expr is not None
else False,
)
by_cols_for_scan = self._gather_columns(by_cols, order_index)
local = self._sorted_grouper(by_cols_for_scan)
names, dtypes, tables = self._apply_unary_op(
CumSumOp(
named_exprs=cum_named,
order_index=order_index,
by_cols_for_scan=by_cols_for_scan,
local_grouper=local,
),
df,
grouper,
)
broadcasted_cols.extend(
self._reorder_to_input(
row_id,
by_cols,
df.num_rows,
tables,
names,
dtypes,
order_index=order_index,
)
)

# Create a temporary DataFrame with the broadcasted columns named by their
# placeholder names from agg decomposition, then evaluate the post-expression.
df = DataFrame(broadcasted_cols)
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/utils/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def decompose_single_agg(
if isinstance(agg, expr.UnaryFunction) and agg.name in {
"rank",
"fill_null_with_strategy",
"cum_sum",
}:
if context != ExecutionContext.WINDOW:
raise NotImplementedError(
Expand Down
26 changes: 26 additions & 0 deletions python/cudf_polars/tests/expressions/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,29 @@ def test_fill_over(
def test_fill_null_with_mean_over_unsupported(df: pl.LazyFrame) -> None:
q = df.select(pl.col("x").fill_null(strategy="mean").over("g"))
assert_ir_translation_raises(q, NotImplementedError)


@pytest.mark.parametrize(
"expr,group_key",
[
(pl.col("x"), "g"),
(pl.when((pl.col("x") % 4) == 1).then(None).otherwise(pl.col("x")), "g"),
(pl.col("x"), "g_null"),
],
)
@pytest.mark.parametrize(
"order_by",
[
None,
["g2", pl.col("x2") * 2],
],
)
def test_cum_sum_over(
df: pl.LazyFrame,
*,
expr: pl.Expr,
group_key: str,
order_by: None | list[str | pl.Expr],
) -> None:
q = df.select(expr.cum_sum().over(group_key, order_by=order_by))
assert_gpu_result_equal(q)
Loading