Skip to content

Commit fe45d1e

Browse files
authored
Merge branch 'branch-25.12' into fea/chunked-pq-reader-with-deletion-vector
2 parents f79e019 + 81adc6d commit fe45d1e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+465
-403
lines changed

cpp/examples/parquet_io/io_source.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -69,7 +69,7 @@ struct pinned_allocator : public std::allocator<T> {
6969
return static_cast<T*>(ptr);
7070
}
7171

72-
void deallocate(T* ptr, std::size_t n)
72+
void deallocate(T* ptr, std::size_t n) noexcept
7373
{
7474
mr.deallocate_async(ptr, n * sizeof(T), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream);
7575
}

cpp/include/cudf/detail/utilities/host_vector.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class rmm_host_allocator {
154154
* It is the responsibility of the caller to destroy
155155
* the objects stored at \p p.
156156
*/
157-
inline void deallocate(pointer p, size_type cnt)
157+
inline void deallocate(pointer p, size_type cnt) noexcept
158158
{
159159
mr.deallocate_async(p, cnt * sizeof(value_type), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream);
160160
}

cpp/include/cudf_test/stream_checking_resource_adaptor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res
9494
* @param bytes Size of the allocation
9595
* @param stream Stream on which to perform the deallocation
9696
*/
97-
void do_deallocate(void* ptr, std::size_t bytes, rmm::cuda_stream_view stream) override
97+
void do_deallocate(void* ptr, std::size_t bytes, rmm::cuda_stream_view stream) noexcept override
9898
{
9999
verify_stream(stream);
100100
upstream_.deallocate_async(ptr, bytes, rmm::CUDA_ALLOCATION_ALIGNMENT, stream);

cpp/src/io/parquet/delta_binary.cuh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ constexpr int max_delta_mini_block_size = 64;
4747
// batch of size `values_per_mb`. The largest value for values_per_miniblock among the
4848
// major writers seems to be 64, so 2 * 64 should be good. We save the first value separately
4949
// since it is not encoded in the first mini-block.
50-
constexpr int delta_rolling_buf_size = 2 * max_delta_mini_block_size;
50+
// The extra 1 is for the first value, from the block header. It's not stored in the buffer, but it
51+
// still impacts buffer indexing and we need to account for it to avoid race conditions.
52+
constexpr int delta_rolling_buf_size = (2 * max_delta_mini_block_size) + 1;
5153

5254
/**
5355
* @brief Read a ULEB128 varint integer

cpp/src/utilities/host_memory.cpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class fixed_pinned_pool_memory_resource {
9696
void deallocate_async(void* ptr,
9797
std::size_t bytes,
9898
std::size_t alignment,
99-
cuda::stream_ref stream)
99+
cuda::stream_ref stream) noexcept
100100
{
101101
if (bytes <= pool_size_ && ptr >= pool_begin_ && ptr < pool_end_) {
102102
pool_->deallocate_async(ptr, bytes, alignment, stream);
@@ -105,14 +105,14 @@ class fixed_pinned_pool_memory_resource {
105105
}
106106
}
107107

108-
void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream)
108+
void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) noexcept
109109
{
110110
return deallocate_async(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream);
111111
}
112112

113113
void deallocate(void* ptr,
114114
std::size_t bytes,
115-
std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT)
115+
std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept
116116
{
117117
deallocate_async(ptr, bytes, alignment, stream_);
118118
#if CCCL_MAJOR_VERSION > 3 || (CCCL_MAJOR_VERSION == 3 && CCCL_MINOR_VERSION >= 1)
@@ -156,7 +156,7 @@ class fixed_pinned_pool_memory_resource {
156156
return this->allocate(bytes, alignment);
157157
}
158158

159-
void deallocate_sync(void* ptr, std::size_t bytes, std::size_t alignment)
159+
void deallocate_sync(void* ptr, std::size_t bytes, std::size_t alignment) noexcept
160160
{
161161
return this->deallocate(ptr, bytes, alignment);
162162
}
@@ -166,7 +166,10 @@ class fixed_pinned_pool_memory_resource {
166166
return this->allocate_async(bytes, alignment, stream);
167167
}
168168

169-
void deallocate(rmm::cuda_stream_view stream, void* ptr, std::size_t bytes, std::size_t alignment)
169+
void deallocate(rmm::cuda_stream_view stream,
170+
void* ptr,
171+
std::size_t bytes,
172+
std::size_t alignment) noexcept
170173
{
171174
return this->deallocate_async(ptr, bytes, alignment, stream);
172175
}
@@ -260,7 +263,7 @@ class new_delete_memory_resource {
260263

261264
void deallocate(void* ptr,
262265
std::size_t bytes,
263-
std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT)
266+
std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept
264267
{
265268
rmm::detail::aligned_host_deallocate(
266269
ptr, bytes, alignment, [](void* ptr) { ::operator delete(ptr); });
@@ -269,12 +272,12 @@ class new_delete_memory_resource {
269272
void deallocate_async(void* ptr,
270273
std::size_t bytes,
271274
std::size_t alignment,
272-
[[maybe_unused]] cuda::stream_ref stream)
275+
[[maybe_unused]] cuda::stream_ref stream) noexcept
273276
{
274277
deallocate(ptr, bytes, alignment);
275278
}
276279

277-
void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream)
280+
void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) noexcept
278281
{
279282
deallocate(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT);
280283
}
@@ -294,7 +297,7 @@ class new_delete_memory_resource {
294297
return this->allocate(bytes, alignment);
295298
}
296299

297-
void deallocate_sync(void* ptr, std::size_t bytes, std::size_t alignment)
300+
void deallocate_sync(void* ptr, std::size_t bytes, std::size_t alignment) noexcept
298301
{
299302
return this->deallocate(ptr, bytes, alignment);
300303
}
@@ -304,7 +307,10 @@ class new_delete_memory_resource {
304307
return this->allocate_async(bytes, alignment, stream);
305308
}
306309

307-
void deallocate(rmm::cuda_stream_view stream, void* ptr, std::size_t bytes, std::size_t alignment)
310+
void deallocate(rmm::cuda_stream_view stream,
311+
void* ptr,
312+
std::size_t bytes,
313+
std::size_t alignment) noexcept
308314
{
309315
return this->deallocate_async(ptr, bytes, alignment, stream);
310316
}

python/cudf/cudf/core/column/column.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -611,19 +611,21 @@ def from_pylibcudf(
611611
new_dtype = plc.DataType(plc.TypeId.INT8)
612612

613613
col = plc.column_factories.make_numeric_column(
614-
new_dtype, col.size(), plc.column_factories.MaskState.ALL_NULL
614+
new_dtype, col.size(), plc.types.MaskState.ALL_NULL
615615
)
616616

617617
dtype = dtype_from_pylibcudf_column(col)
618618

619+
data_view = col.data()
620+
mask_view = col.null_mask()
619621
return build_column( # type: ignore[return-value]
620-
data=as_buffer(col.data().obj, exposed=data_ptr_exposed)
621-
if col.data() is not None
622+
data=as_buffer(data_view.obj, exposed=data_ptr_exposed)
623+
if data_view is not None
622624
else None,
623625
dtype=dtype,
624626
size=col.size(),
625-
mask=as_buffer(col.null_mask().obj, exposed=data_ptr_exposed)
626-
if col.null_mask() is not None
627+
mask=as_buffer(mask_view.obj, exposed=data_ptr_exposed)
628+
if mask_view is not None
627629
else None,
628630
offset=col.offset(),
629631
null_count=col.null_count(),
@@ -981,7 +983,7 @@ def _fill(
981983
if not fill_value.is_valid() and not self.nullable:
982984
mask = as_buffer(
983985
plc.null_mask.create_null_mask(
984-
self.size, plc.null_mask.MaskState.ALL_VALID
986+
self.size, plc.types.MaskState.ALL_VALID
985987
)
986988
)
987989
self.set_base_mask(mask)
@@ -2436,7 +2438,7 @@ def column_empty(
24362438
if row_count == 0
24372439
else plc.gpumemoryview(
24382440
plc.null_mask.create_null_mask(
2439-
row_count, plc.null_mask.MaskState.ALL_NULL
2441+
row_count, plc.types.MaskState.ALL_NULL
24402442
)
24412443
)
24422444
)

python/cudf_polars/cudf_polars/containers/column.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __init__(
8585

8686
@classmethod
8787
def deserialize(
88-
cls, header: ColumnHeader, frames: tuple[memoryview, plc.gpumemoryview]
88+
cls, header: ColumnHeader, frames: tuple[memoryview[bytes], plc.gpumemoryview]
8989
) -> Self:
9090
"""
9191
Create a Column from a serialized representation returned by `.serialize()`.
@@ -126,7 +126,7 @@ def deserialize_ctor_kwargs(
126126

127127
def serialize(
128128
self,
129-
) -> tuple[ColumnHeader, tuple[memoryview, plc.gpumemoryview]]:
129+
) -> tuple[ColumnHeader, tuple[memoryview[bytes], plc.gpumemoryview]]:
130130
"""
131131
Serialize the Column into header and frames.
132132
@@ -297,7 +297,7 @@ def astype(self, dtype: DataType) -> Column:
297297
self.obj.type()
298298
) and plc.traits.is_timestamp(plc_dtype):
299299
upcasted = plc.unary.cast(self.obj, plc.DataType(plc.TypeId.INT64))
300-
result = plc.column.Column(
300+
plc_col = plc.column.Column(
301301
plc_dtype,
302302
upcasted.size(),
303303
upcasted.data(),
@@ -306,11 +306,11 @@ def astype(self, dtype: DataType) -> Column:
306306
upcasted.offset(),
307307
upcasted.children(),
308308
)
309-
return Column(result, dtype=dtype).sorted_like(self)
309+
return Column(plc_col, dtype=dtype).sorted_like(self)
310310
elif plc.traits.is_integral_not_bool(plc_dtype) and plc.traits.is_timestamp(
311311
self.obj.type()
312312
):
313-
result = plc.column.Column(
313+
plc_col = plc.column.Column(
314314
plc.DataType(plc.TypeId.INT64),
315315
self.obj.size(),
316316
self.obj.data(),
@@ -319,7 +319,7 @@ def astype(self, dtype: DataType) -> Column:
319319
self.obj.offset(),
320320
self.obj.children(),
321321
)
322-
return Column(plc.unary.cast(result, plc_dtype), dtype=dtype).sorted_like(
322+
return Column(plc.unary.cast(plc_col, plc_dtype), dtype=dtype).sorted_like(
323323
self
324324
)
325325
else:
@@ -454,11 +454,12 @@ def mask_nans(self) -> Self:
454454
def nan_count(self) -> int:
455455
"""Return the number of NaN values in the column."""
456456
if self.size > 0 and plc.traits.is_floating_point(self.obj.type()):
457+
# See https://github.com/rapidsai/cudf/issues/20202 for we type ignore
457458
return plc.reduce.reduce(
458459
plc.unary.is_nan(self.obj),
459460
plc.aggregation.sum(),
460461
plc.types.SIZE_TYPE,
461-
).to_py()
462+
).to_py() # type: ignore[return-value]
462463
return 0
463464

464465
@property

python/cudf_polars/cudf_polars/containers/dataframe.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,9 @@ def from_table(
191191

192192
@classmethod
193193
def deserialize(
194-
cls, header: DataFrameHeader, frames: tuple[memoryview, plc.gpumemoryview]
194+
cls,
195+
header: DataFrameHeader,
196+
frames: tuple[memoryview[bytes], plc.gpumemoryview],
195197
) -> Self:
196198
"""
197199
Create a DataFrame from a serialized representation returned by `.serialize()`.
@@ -219,7 +221,7 @@ def deserialize(
219221

220222
def serialize(
221223
self,
222-
) -> tuple[DataFrameHeader, tuple[memoryview, plc.gpumemoryview]]:
224+
) -> tuple[DataFrameHeader, tuple[memoryview[bytes], plc.gpumemoryview]]:
223225
"""
224226
Serialize the table into header and frames.
225227

python/cudf_polars/cudf_polars/dsl/expressions/rolling.py

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ class FillNullWithStrategyOp(UnaryOp):
4848
policy: plc.replace.ReplacePolicy = plc.replace.ReplacePolicy.PRECEDING
4949

5050

51+
@dataclass(frozen=True)
52+
class CumSumOp(UnaryOp):
53+
pass
54+
55+
5156
def to_request(
5257
value: expr.Expr, orderby: Column, df: DataFrame
5358
) -> plc.rolling.RollingRequest:
@@ -241,7 +246,8 @@ def __init__(
241246
isinstance(named_expr.value, (expr.Len, expr.Agg))
242247
or (
243248
isinstance(named_expr.value, expr.UnaryFunction)
244-
and named_expr.value.name in {"rank", "fill_null_with_strategy"}
249+
and named_expr.value.name
250+
in {"rank", "fill_null_with_strategy", "cum_sum"}
245251
)
246252
)
247253
]
@@ -265,7 +271,7 @@ def __init__(
265271
if isinstance(v, expr.Agg)
266272
or (
267273
isinstance(v, expr.UnaryFunction)
268-
and v.name in {"rank", "fill_null_with_strategy"}
274+
and v.name in {"rank", "fill_null_with_strategy", "cum_sum"}
269275
)
270276
]
271277
self.by_count = len(by_expr)
@@ -393,6 +399,41 @@ def _(
393399
dtypes = [ne.value.dtype for ne in named_exprs]
394400
return names, dtypes, tables
395401

402+
@_apply_unary_op.register
403+
def _(
404+
self,
405+
op: CumSumOp,
406+
df: DataFrame,
407+
_: plc.groupby.GroupBy,
408+
) -> tuple[list[str], list[DataType], list[plc.Table]]:
409+
cum_named = op.named_exprs
410+
order_index = op.order_index
411+
412+
requests: list[plc.groupby.GroupByRequest] = []
413+
out_names: list[str] = []
414+
out_dtypes: list[DataType] = []
415+
416+
val_cols = self._gather_columns(
417+
[
418+
ne.value.children[0].evaluate(df, context=ExecutionContext.FRAME).obj
419+
for ne in cum_named
420+
],
421+
order_index,
422+
cudf_polars_column=False,
423+
)
424+
agg = plc.aggregation.sum()
425+
426+
for ne, val_col in zip(cum_named, val_cols, strict=True):
427+
requests.append(plc.groupby.GroupByRequest(val_col, [agg]))
428+
out_names.append(ne.name)
429+
out_dtypes.append(ne.value.dtype)
430+
431+
lg = op.local_grouper
432+
assert isinstance(lg, plc.groupby.GroupBy)
433+
_, tables = lg.scan(requests)
434+
435+
return out_names, out_dtypes, tables
436+
396437
def _reorder_to_input(
397438
self,
398439
row_id: plc.Column,
@@ -444,6 +485,7 @@ def _split_named_expr(
444485
unary_window_ops: dict[str, list[expr.NamedExpr]] = {
445486
"rank": [],
446487
"fill_null_with_strategy": [],
488+
"cum_sum": [],
447489
}
448490

449491
for ne in self.named_aggs:
@@ -733,6 +775,40 @@ def do_evaluate( # noqa: D102
733775
)
734776
)
735777

778+
if cum_named := unary_window_ops["cum_sum"]:
779+
order_index = self._build_window_order_index(
780+
by_cols,
781+
row_id=row_id,
782+
order_by_col=order_by_col if self._order_by_expr is not None else None,
783+
ob_desc=self.options[2] if self._order_by_expr is not None else False,
784+
ob_nulls_last=self.options[3]
785+
if self._order_by_expr is not None
786+
else False,
787+
)
788+
by_cols_for_scan = self._gather_columns(by_cols, order_index)
789+
local = self._sorted_grouper(by_cols_for_scan)
790+
names, dtypes, tables = self._apply_unary_op(
791+
CumSumOp(
792+
named_exprs=cum_named,
793+
order_index=order_index,
794+
by_cols_for_scan=by_cols_for_scan,
795+
local_grouper=local,
796+
),
797+
df,
798+
grouper,
799+
)
800+
broadcasted_cols.extend(
801+
self._reorder_to_input(
802+
row_id,
803+
by_cols,
804+
df.num_rows,
805+
tables,
806+
names,
807+
dtypes,
808+
order_index=order_index,
809+
)
810+
)
811+
736812
# Create a temporary DataFrame with the broadcasted columns named by their
737813
# placeholder names from agg decomposition, then evaluate the post-expression.
738814
df = DataFrame(broadcasted_cols)

0 commit comments

Comments
 (0)