Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2c75a01
Refactor of string length preprocessing that skips compute_page_sizes…
nvdbaranec Jul 25, 2025
b32b9b5
Move string length calculating code to happen after output chunk comp…
nvdbaranec Jul 25, 2025
8495eb5
Fixed an issue where page.str_bytes was being set at the wrong time i…
nvdbaranec Aug 15, 2025
ae246af
Fixed remaining bugs. Almost everything was the result of a large num…
nvdbaranec Aug 28, 2025
282d064
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Aug 29, 2025
18de816
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Aug 29, 2025
92714e4
Fix merge conflict issues.
nvdbaranec Sep 2, 2025
ed12844
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Sep 15, 2025
afc8006
PR cleanup.
nvdbaranec Sep 16, 2025
7d52a04
More PR cleanup
nvdbaranec Sep 16, 2025
a4f97b1
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Sep 18, 2025
313fbff
Formatting and a fix for hybrid scan string length computing.
nvdbaranec Sep 18, 2025
144aabd
PR review changes.
nvdbaranec Sep 22, 2025
84f280d
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Sep 23, 2025
1b9c4dc
Reduce output buffer size for lists and list<list<..<str>>> and elimi…
mhaseeb123 Sep 23, 2025
8a64808
Minor comment update
mhaseeb123 Sep 23, 2025
f77e210
Style fix
mhaseeb123 Sep 23, 2025
f2739f5
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 23, 2025
9fb5597
Minor update
mhaseeb123 Sep 23, 2025
83962a4
Merge branch 'fea/reduce-output-buffer-sizes-for-pruned-pages' of htt…
mhaseeb123 Sep 23, 2025
7082e32
Gtest to materialize lists of structs
mhaseeb123 Sep 23, 2025
1053d06
Minor update
mhaseeb123 Sep 24, 2025
5839627
Minor gtest update
mhaseeb123 Sep 24, 2025
2aae6f4
Minor gtest update
mhaseeb123 Sep 24, 2025
da09e82
Minor updates
mhaseeb123 Sep 26, 2025
bfde9e7
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 29, 2025
1da789c
Merge changes
mhaseeb123 Sep 29, 2025
f58456b
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 29, 2025
d6c31f2
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 30, 2025
737df82
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Oct 6, 2025
11b46af
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Oct 7, 2025
1467af3
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 8, 2025
b774c6a
Apply suggestion from @vuule
mhaseeb123 Oct 8, 2025
a471b6e
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ table_with_metadata hybrid_scan_reader_impl::read_chunk_internal(
preprocess_chunk_strings(mode, read_info, page_mask);

// Allocate memory buffers for the output columns.
allocate_columns(mode, read_info.skip_rows, read_info.num_rows);
allocate_columns(mode, read_info.skip_rows, read_info.num_rows, page_mask);

// Parse data into the output buffers.
decode_page_data(mode, read_info.skip_rows, read_info.num_rows, page_mask);
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
if (t == 0) {
// don't clobber these if they're already computed from the index
if (!pp->has_page_index) {
s->page.num_nulls = 0;
s->page.num_valids = 0;
pp->num_nulls = 0;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s is not initialized at this point so directly update pp (PageInfo) whose reference s will store in setup_local_page_info function later

pp->num_valids = 0;
}
// reset str_bytes to 0 in case it's already been calculated (esp needed for chunked reads).
pp->str_bytes = 0;
Expand Down Expand Up @@ -676,7 +676,6 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
* @param page_mask Page mask indicating if this column needs to be decoded
* @param min_rows crop all rows below min_row
* @param num_rows Maximum number of rows to read
* other settings and records the result in the PageInfo::str_bytes_all field
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stale comments

*/
CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size)
compute_delta_page_string_sizes_kernel(PageInfo* pages,
Expand Down Expand Up @@ -770,7 +769,6 @@ CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size)
* @param page_mask Page mask indicating if this column needs to be decoded
* @param min_rows crop all rows below min_row
* @param num_rows Maximum number of rows to read
* other settings
*/
CUDF_KERNEL void __launch_bounds__(delta_length_block_size)
compute_delta_length_page_string_sizes_kernel(PageInfo* pages,
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/io/parquet/page_string_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ __device__ void update_string_offsets_for_pruned_pages(

// Initial string offset
auto const initial_value = page.str_offset;
// We must use the batch size from the nesting info (the size of the page for this batch)
auto value_count = page.nesting[state->col.max_nesting_depth - 1].batch_size;
auto const tid = cg::this_thread_block().thread_rank();
// The value count is either the leaf-level batch size in case of lists or the number of
// effective rows being read by this page
auto const value_count =
has_lists ? page.nesting[state->col.max_nesting_depth - 1].batch_size : state->num_rows;
auto const tid = cg::this_thread_block().thread_rank();

// Offsets pointer contains string sizes in case of large strings and actual offsets
// otherwise
Expand All @@ -190,10 +192,6 @@ __device__ void update_string_offsets_for_pruned_pages(
auto const input_col_idx = page.chunk_idx % chunks_per_rowgroup;
compute_initial_large_strings_offset<has_lists>(state, initial_str_offsets[input_col_idx]);
} else {
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if constexpr (not has_lists) { value_count -= state->first_row; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For not has_lists, valid count is state->num_rows now which already takes care of state->first_row (see setup_local_page_info where these fields are initialized).


// Write the initial offset at all positions to indicate zero sized strings
for (int idx = tid; idx < value_count; idx += block_size) {
offptr[idx] = initial_value;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ table_with_metadata reader_impl::read_chunk_internal(read_mode mode)
preprocess_chunk_strings(mode, read_info, page_mask);

// Allocate memory buffers for the output columns.
allocate_columns(mode, read_info.skip_rows, read_info.num_rows);
allocate_columns(mode, read_info.skip_rows, read_info.num_rows, page_mask);

// Parse data into the output buffers.
decode_page_data(mode, read_info.skip_rows, read_info.num_rows, page_mask);
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,12 @@ class reader_impl {
* @param read_mode Value indicating if the data sources are read all at once or chunk by chunk
* @param skip_rows Crop all rows below skip_rows
* @param num_rows Number of rows to read
* @param page_mask Boolean device span indicating if a page needs to be decoded or is pruned
*/
void allocate_columns(read_mode mode, size_t skip_rows, size_t num_rows);
void allocate_columns(read_mode mode,
size_t skip_rows,
size_t num_rows,
cudf::device_span<bool const> page_mask);

/**
* @brief Calculate per-page offsets for string data
Expand All @@ -319,7 +323,7 @@ class reader_impl {
* @param read_mode Value indicating if the data sources are read all at once or chunk by chunk
* @param skip_rows Number of rows to skip from the start
* @param num_rows Number of rows to decode
* @param page_mask Boolean vector indicating if a page needs to be decoded or is pruned
* @param page_mask Boolean device span indicating if a page needs to be decoded or is pruned
*/
void decode_page_data(read_mode mode,
size_t skip_rows,
Expand Down
22 changes: 14 additions & 8 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,10 @@ void reader_impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_lim
compute_output_chunks_for_subpass();
}

void reader_impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num_rows)
void reader_impl::allocate_columns(read_mode mode,
size_t skip_rows,
size_t num_rows,
cudf::device_span<bool const> page_mask)
{
CUDF_FUNC_RANGE();

Expand Down Expand Up @@ -779,17 +782,20 @@ void reader_impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num_

// To keep track of the starting key of an iteration
size_t key_start = 0;

// Loop until all keys are processed
while (key_start < num_keys) {
// Number of keys processed in this iteration
auto const num_keys_this_iter = std::min<size_t>(num_keys_per_iter, num_keys - key_start);
thrust::transform(
rmm::exec_policy_nosync(_stream),
thrust::make_counting_iterator<size_t>(key_start),
thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
thrust::transform(rmm::exec_policy_nosync(_stream),
thrust::make_counting_iterator<size_t>(key_start),
thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter),
size_input.begin(),
get_page_nesting_size{d_cols_info.data(),
max_depth,
subpass.pages.size(),
subpass.pages.device_begin(),
page_mask.data()});

// Manually create a size_t `key_start` compatible counting_transform_iterator.
auto const reduction_keys =
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/parquet/reader_impl_preprocess_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <rmm/cuda_stream_view.hpp>

#include <cuda/functional>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/logical.h>

#include <future>
#include <vector>
Expand Down Expand Up @@ -345,6 +347,7 @@ struct get_page_nesting_size {
size_type const max_depth;
size_t const num_pages;
PageInfo const* const pages;
bool const* const page_mask;

__device__ inline size_type operator()(size_t index) const
{
Expand All @@ -357,6 +360,16 @@ struct get_page_nesting_size {
return 0;
}

// If this page is pruned and has a list parent, set the batch size for this depth to 0 to
// reduce the required output buffer size and eliminate any non-empty nulls
if (not page_mask[indices.page_idx] and indices.depth_idx > 0 and
thrust::any_of(thrust::seq,
thrust::counting_iterator(0),
thrust::counting_iterator(indices.depth_idx),
[&](auto depth) { return page.nesting[depth].type == type_id::LIST; })) {
page.nesting[indices.depth_idx].batch_size = 0;
}

return page.nesting[indices.depth_idx].batch_size;
}
};
Expand Down
Loading