Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2c75a01
Refactor of string length preprocessing that skips compute_page_sizes…
nvdbaranec Jul 25, 2025
b32b9b5
Move string length calculating code to happen after output chunk comp…
nvdbaranec Jul 25, 2025
8495eb5
Fixed an issue where page.str_bytes was being set at the wrong time i…
nvdbaranec Aug 15, 2025
ae246af
Fixed remaining bugs. Almost everything was the result of a large num…
nvdbaranec Aug 28, 2025
282d064
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Aug 29, 2025
18de816
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Aug 29, 2025
92714e4
Fix merge conflict issues.
nvdbaranec Sep 2, 2025
ed12844
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Sep 15, 2025
afc8006
PR cleanup.
nvdbaranec Sep 16, 2025
7d52a04
More PR cleanup
nvdbaranec Sep 16, 2025
a4f97b1
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Sep 18, 2025
313fbff
Formatting and a fix for hybrid scan string length computing.
nvdbaranec Sep 18, 2025
144aabd
PR review changes.
nvdbaranec Sep 22, 2025
84f280d
Merge branch 'branch-25.10' into pq_string_preprocess
nvdbaranec Sep 23, 2025
1b9c4dc
Reduce output buffer size for lists and list<list<..<str>>> and elimi…
mhaseeb123 Sep 23, 2025
8a64808
Minor comment update
mhaseeb123 Sep 23, 2025
f77e210
Style fix
mhaseeb123 Sep 23, 2025
f2739f5
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 23, 2025
9fb5597
Minor update
mhaseeb123 Sep 23, 2025
83962a4
Merge branch 'fea/reduce-output-buffer-sizes-for-pruned-pages' of htt…
mhaseeb123 Sep 23, 2025
7082e32
Gtest to materialize lists of structs
mhaseeb123 Sep 23, 2025
1053d06
Minor update
mhaseeb123 Sep 24, 2025
5839627
Minor gtest update
mhaseeb123 Sep 24, 2025
2aae6f4
Minor gtest update
mhaseeb123 Sep 24, 2025
da09e82
Minor updates
mhaseeb123 Sep 26, 2025
bfde9e7
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 29, 2025
1da789c
Merge changes
mhaseeb123 Sep 29, 2025
f58456b
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 29, 2025
d6c31f2
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Sep 30, 2025
737df82
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Oct 6, 2025
11b46af
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Oct 7, 2025
1467af3
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 8, 2025
b774c6a
Apply suggestion from @vuule
mhaseeb123 Oct 8, 2025
a471b6e
Merge branch 'branch-25.12' into fea/reduce-output-buffer-sizes-for-p…
mhaseeb123 Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 7 additions & 169 deletions cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -45,152 +45,6 @@ constexpr int rolling_buf_size = LEVEL_DECODE_BUF_SIZE;

using unused_state_buf = page_state_buffers_s<0, 0, 0>;

/**
* @brief Calculate string bytes for DELTA_LENGTH_BYTE_ARRAY encoded pages
*
* Operates at thread block level. Result is valid only on thread 0.
*
* @param s The local page info
* @param block Thread block cooperative group
*/
__device__ size_type delta_length_page_string_size(page_state_s* s, cg::thread_block const& block)
{
if (block.thread_rank() == 0) {
// find the beginning of char data
delta_binary_decoder string_lengths;
auto const* string_start = string_lengths.find_end_of_block(s->data_start, s->data_end);
// distance is size of string data
return static_cast<size_type>(cuda::std::distance(string_start, s->data_end));
}
return 0;
}

/**
* @brief Calculate string bytes for DELTA_BYTE_ARRAY encoded pages
*
* Operates at thread block level of size preprocess_block_size.
*
* @param s The local page info
* @param block Thread block cooperative group
*/
__device__ size_type delta_page_string_size(page_state_s* s, cg::thread_block const& block)
{
using WarpReduce = cub::WarpReduce<uleb128_t>;
__shared__ typename WarpReduce::TempStorage temp_storage[2];

__shared__ __align__(16) delta_binary_decoder prefixes;
__shared__ __align__(16) delta_binary_decoder suffixes;

auto const warp = cg::tiled_partition<cudf::detail::warp_size>(block);

if (block.thread_rank() == 0) {
auto const* suffix_start = prefixes.find_end_of_block(s->data_start, s->data_end);
suffixes.init_binary_block(suffix_start, s->data_end);
}
block.sync();

// two warps will traverse the prefixes and suffixes and sum them up
auto const db = warp.meta_group_rank() == 0 ? &prefixes
: warp.meta_group_rank() == 1 ? &suffixes
: nullptr;

size_t total_bytes = 0;
if (db != nullptr) {
// initialize with first value (which is stored in last_value)
if (warp.thread_rank() == 0) { total_bytes = db->last_value; }

uleb128_t lane_sum = 0;
while (db->current_value_idx < db->num_encoded_values(true)) {
// calculate values for current mini-block
db->calc_mini_block_values(warp.thread_rank());

// get per lane sum for mini-block
for (uint32_t i = 0; i < db->values_per_mb; i += warp.size()) {
uint32_t const idx = db->current_value_idx + i + warp.thread_rank();
if (idx < db->value_count) {
lane_sum += db->value[rolling_index<delta_rolling_buf_size>(idx)];
}
}

if (warp.thread_rank() == 0) { db->setup_next_mini_block(true); }
warp.sync();
}

// get sum for warp.
// note: warp_sum will only be valid on lane 0.
auto const warp_sum = WarpReduce(temp_storage[warp.meta_group_rank()]).Sum(lane_sum);

if (warp.thread_rank() == 0) { total_bytes += warp_sum; }
}
block.sync();

// now sum up total_bytes from the two warps. result is only valid on thread 0.
auto const final_bytes =
cudf::detail::single_lane_block_sum_reduce<preprocess_block_size, 0>(total_bytes);

return static_cast<size_type>(final_bytes);
}

/**
* @brief Calculate the number of string bytes in the page.
*
* This function expects the dictionary position to be at 0 and will traverse
* the entire thing (for plain and dictionary encoding).
*
* Operates at thread block level of size preprocess_block_size. Result is only
* valid on thread 0.
*
* @param s The local page info
* @param block Thread block cooperative group
*
* @return The number of string bytes in the page
*/
__device__ size_type decode_total_page_string_size(page_state_s* s, cg::thread_block const& block)
{
auto const warp = cg::tiled_partition<cudf::detail::warp_size>(block);
size_type target_pos = s->num_input_values;
size_type str_len = 0;
switch (s->page.encoding) {
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// Use warp 0 to decode dictionary indices and return the new target position
if (warp.meta_group_rank() == 0 && s->dict_base) {
auto const [new_target_pos, len] =
decode_dictionary_indices<is_calc_sizes_only::YES, unused_state_buf>(
s, nullptr, target_pos, warp);
target_pos = new_target_pos;
str_len = len;
}
break;

case Encoding::PLAIN:
// For V2 headers, we know how many values are present, so can skip an expensive scan.
if ((s->page.flags & PAGEINFO_FLAGS_V2) != 0) {
auto const num_values = s->page.num_input_values - s->page.num_nulls;
str_len = s->dict_size - sizeof(int) * num_values;
}
// For V1, the choice is an overestimate (s->dict_size), or an exact number that's
// expensive to compute. For now we're going with the latter.
else {
str_len = initialize_string_descriptors<is_calc_sizes_only::YES, unused_state_buf>(
s, nullptr, target_pos, block);
}
break;

case Encoding::DELTA_LENGTH_BYTE_ARRAY:
str_len = delta_length_page_string_size(s, block);
break;

case Encoding::DELTA_BYTE_ARRAY: str_len = delta_page_string_size(s, block); break;

default:
// not a valid string encoding, so just return 0
break;
}
if (!block.thread_rank()) { s->dict_pos = target_pos; }
return str_len;
}

/**
* @brief Update output column sizes for every nesting level based on a batch
* of incoming decoded definition and repetition level values.
Expand Down Expand Up @@ -338,8 +192,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
bool is_base_pass,
bool compute_string_sizes)
bool is_base_pass)
{
__shared__ __align__(16) page_state_s state_g;

Expand Down Expand Up @@ -385,10 +238,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
if (!t) {
s->page.skipped_values = -1;
s->page.skipped_leaf_values = 0;
// str_bytes_from_index will be 0 if no page stats are present
s->page.str_bytes = s->page.str_bytes_from_index;
s->input_row_count = 0;
s->input_value_count = 0;
s->input_row_count = 0;
s->input_value_count = 0;

// in the base pass, we're computing the number of rows, make sure we visit absolutely
// everything
Expand All @@ -399,17 +250,12 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
}
}

// we only need to preprocess hierarchies with repetition in them (ie, hierarchies
// containing lists anywhere within).
compute_string_sizes =
compute_string_sizes && s->col.physical_type == Type::BYTE_ARRAY && !s->col.is_strings_to_cat;

// early out optimizations:

// - if this is a flat hierarchy (no lists) and is not a string column. in this case we don't need
// - if this is a flat hierarchy (no lists), we don't need
// to do the expensive work of traversing the level data to determine sizes. we can just compute
// it directly.
if (!has_repetition && !compute_string_sizes) {
if (!has_repetition) {
int depth = 0;
while (depth < s->page.num_output_nesting_levels) {
auto const thread_depth = depth + t;
Expand Down Expand Up @@ -472,12 +318,6 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
block.sync();
}

// retrieve total string size.
if (compute_string_sizes && !pp->has_page_index) {
auto const str_bytes = decode_total_page_string_size(s, block);
if (t == 0) { s->page.str_bytes = str_bytes; }
}

// update output results:
// - real number of rows for the whole page
// - nesting sizes for the whole page
Expand All @@ -501,7 +341,6 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
if (!t) {
pp->skipped_values = s->page.skipped_values;
pp->skipped_leaf_values = s->page.skipped_leaf_values;
pp->str_bytes = s->page.str_bytes;
}
}

Expand All @@ -515,7 +354,6 @@ void compute_page_sizes(cudf::detail::hostdevice_span<PageInfo> pages,
size_t min_row,
size_t num_rows,
bool compute_num_rows,
bool compute_string_sizes,
int level_type_size,
rmm::cuda_stream_view stream)
{
Expand All @@ -531,10 +369,10 @@ void compute_page_sizes(cudf::detail::hostdevice_span<PageInfo> pages,
// the starting and ending read values to account for these bounds.
if (level_type_size == 1) {
compute_page_sizes_kernel<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, compute_num_rows, compute_string_sizes);
pages.device_ptr(), chunks, min_row, num_rows, compute_num_rows);
} else {
compute_page_sizes_kernel<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, compute_num_rows, compute_string_sizes);
pages.device_ptr(), chunks, min_row, num_rows, compute_num_rows);
}
}

Expand Down
24 changes: 23 additions & 1 deletion cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,33 @@ table_with_metadata hybrid_scan_reader_impl::read_chunk_internal(
auto& subpass = *pass.subpass;
auto const& read_info = subpass.output_chunk_read_info[subpass.current_output_chunk];

CUDF_EXPECTS(_subpass_page_mask.size() == subpass.pages.size(),
"Page mask size must be equal to the number of pages in the subpass");
auto page_mask = cudf::detail::make_device_uvector_async(_subpass_page_mask, _stream, _mr);

// computes:
// PageNestingInfo::batch_size for each level of nesting, for each page, taking row bounds into
// account. PageInfo::skipped_values, which tells us where to start decoding in the input to
// respect the user bounds. It is only necessary to do this second pass if uses_custom_row_bounds
// is set (if the user has specified artificial bounds).
if (uses_custom_row_bounds(mode)) {
compute_page_sizes(subpass.pages,
pass.chunks,
read_info.skip_rows,
read_info.num_rows,
false, // num_rows is already computed
pass.level_type_size,
_stream);
}

// preprocess strings
preprocess_chunk_strings(read_info, page_mask);

// Allocate memory buffers for the output columns.
allocate_columns(mode, read_info.skip_rows, read_info.num_rows);

// Parse data into the output buffers.
decode_page_data(mode, read_info.skip_rows, read_info.num_rows);
decode_page_data(mode, read_info.skip_rows, read_info.num_rows, page_mask);

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,10 @@ void __launch_bounds__(decode_page_headers_block_size)
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
bs->page.chunk_row += bs->page.num_rows;
bs->page.num_rows = 0;
bs->page.flags = 0;
bs->page.num_rows = 0;
bs->page.flags = 0;
bs->page.str_bytes = 0;
bs->page.str_bytes_all = 0;
// zero out V2 info
bs->page.num_nulls = 0;
bs->page.lvl_bytes[level_type::DEFINITION] = 0;
Expand Down
Loading