-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fix: ViewType gc on huge batch would produce bad output #8694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0482a20
7034580
464db03
142284c
3f7c50d
2110c46
aa511d7
fdefa5f
a962382
7bab606
c5146eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -512,18 +512,85 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |
| }; | ||
| } | ||
|
|
||
| // 3) Allocate exactly capacity for all non-inline data | ||
| let mut data_buf = Vec::with_capacity(total_large); | ||
| let (views_buf, data_blocks) = if total_large < i32::MAX as usize { | ||
| // fast path, the entire data fits in a single buffer | ||
| // 3) Allocate exactly capacity for all non-inline data | ||
| let mut data_buf = Vec::with_capacity(total_large); | ||
|
|
||
| // 4) Iterate over views and process each inline/non-inline view | ||
| let views_buf: Vec<u128> = (0..len) | ||
| .map(|i| unsafe { self.copy_view_to_buffer(i, 0, &mut data_buf) }) | ||
| .collect(); | ||
| let data_block = Buffer::from_vec(data_buf); | ||
| let data_blocks = vec![data_block]; | ||
| (views_buf, data_blocks) | ||
| } else { | ||
| // slow path, need to split into multiple buffers | ||
|
|
||
| struct GcCopyGroup { | ||
| total_buffer_bytes: usize, | ||
| total_len: usize, | ||
| } | ||
|
|
||
| impl GcCopyGroup { | ||
| fn new(total_buffer_bytes: u32, total_len: usize) -> Self { | ||
| Self { | ||
| total_buffer_bytes: total_buffer_bytes as usize, | ||
| total_len, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // 4) Iterate over views and process each inline/non-inline view | ||
| let views_buf: Vec<u128> = (0..len) | ||
| .map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) }) | ||
| .collect(); | ||
| let mut groups = Vec::new(); | ||
| let mut current_length = 0; | ||
| let mut current_elements = 0; | ||
|
|
||
| for view in self.views() { | ||
| let len = *view as u32; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part is so slow, but it's right, I can make it faster(by handling the numbers via grouping or batching) if required
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure how you would make this much faster - I think the code needs to find the locations to split in any event
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if the buffer size is greater than i32::MAX, it's possible that a single buffer is much smaller than i32::MAX, so this can find batch-by-batch, rather than just adding small buffer one-by-one?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see -- you are saying you could potentially optimize the input by looking at each input buffer or something and gcing it individually or something. That would be interesting, though it would probably take a lot of care to make it fast. |
||
| if len > MAX_INLINE_VIEW_LEN { | ||
| if current_length + len > i32::MAX as u32 { | ||
| // Start a new group | ||
| groups.push(GcCopyGroup::new(current_length, current_elements)); | ||
| current_length = 0; | ||
| current_elements = 0; | ||
| } | ||
| current_length += len; | ||
| current_elements += 1; | ||
| } | ||
| } | ||
| if current_elements != 0 { | ||
| groups.push(GcCopyGroup::new(current_length, current_elements)); | ||
| } | ||
| debug_assert!(groups.len() <= i32::MAX as usize); | ||
|
|
||
| // 3) Copy the buffers group by group | ||
| let mut views_buf = Vec::with_capacity(len); | ||
| let mut data_blocks = Vec::with_capacity(groups.len()); | ||
|
|
||
| let mut current_view_idx = 0; | ||
|
|
||
| for (group_idx, gc_copy_group) in groups.iter().enumerate() { | ||
| let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes); | ||
|
|
||
| // Directly push views to avoid intermediate Vec allocation | ||
| let new_views = (current_view_idx..current_view_idx + gc_copy_group.total_len).map( | ||
| |view_idx| { | ||
| // safety: the view index came from iterating over valid range | ||
| unsafe { | ||
| self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf) | ||
| } | ||
| }, | ||
| ); | ||
| views_buf.extend(new_views); | ||
|
|
||
| data_blocks.push(Buffer::from_vec(data_buf)); | ||
| current_view_idx += gc_copy_group.total_len; | ||
| } | ||
| (views_buf, data_blocks) | ||
| }; | ||
|
|
||
| // 5) Wrap up buffers | ||
| let data_block = Buffer::from_vec(data_buf); | ||
| // 5) Wrap up views buffer | ||
| let views_scalar = ScalarBuffer::from(views_buf); | ||
| let data_blocks = vec![data_block]; | ||
|
|
||
| // SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized | ||
| unsafe { GenericByteViewArray::new_unchecked(views_scalar, data_blocks, nulls) } | ||
|
|
@@ -538,10 +605,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |
| /// inside one of `self.buffers`. | ||
| /// - `data_buf` must be ready to have additional bytes appended. | ||
| /// - After this call, the returned view will have its | ||
| /// `buffer_index` reset to `0` and its `offset` updated so that it points | ||
| /// `buffer_index` reset to `buffer_idx` and its `offset` updated so that it points | ||
| /// into the bytes just appended at the end of `data_buf`. | ||
| #[inline(always)] | ||
| unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) -> u128 { | ||
| unsafe fn copy_view_to_buffer( | ||
| &self, | ||
| i: usize, | ||
| buffer_idx: i32, | ||
| data_buf: &mut Vec<u8>, | ||
| ) -> u128 { | ||
| // SAFETY: `i < self.len()` ensures this is in‑bounds. | ||
| let raw_view = unsafe { *self.views().get_unchecked(i) }; | ||
| let mut bv = ByteView::from(raw_view); | ||
|
|
@@ -561,7 +633,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |
| let new_offset = data_buf.len() as u32; | ||
| data_buf.extend_from_slice(slice); | ||
|
|
||
| bv.buffer_index = 0; | ||
| bv.buffer_index = buffer_idx as u32; | ||
| bv.offset = new_offset; | ||
| bv.into() | ||
| } | ||
|
|
@@ -1404,6 +1476,56 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| #[test] | ||
| #[cfg_attr(miri, ignore)] // Takes too long | ||
| fn test_gc_huge_array() { | ||
mapleFU marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Construct multiple 128 MiB BinaryView entries so total > 4 GiB | ||
| let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view | ||
| let num_views: usize = 36; | ||
|
|
||
| // Create a single 128 MiB data block with a simple byte pattern | ||
| let buffer = Buffer::from_vec(vec![0xAB; block_len]); | ||
| let buffer2 = Buffer::from_vec(vec![0xFF; block_len]); | ||
|
|
||
| // Append this block and then add many views pointing to it | ||
| let mut builder = BinaryViewBuilder::new(); | ||
| let block_id = builder.append_block(buffer); | ||
| for _ in 0..num_views / 2 { | ||
| builder | ||
| .try_append_view(block_id, 0, block_len as u32) | ||
| .expect("append view into 128MiB block"); | ||
| } | ||
| let block_id2 = builder.append_block(buffer2); | ||
| for _ in 0..num_views / 2 { | ||
| builder | ||
| .try_append_view(block_id2, 0, block_len as u32) | ||
| .expect("append view into 128MiB block"); | ||
| } | ||
|
|
||
| let array = builder.finish(); | ||
| let total = array.total_buffer_bytes_used(); | ||
| assert!( | ||
| total > u32::MAX as usize, | ||
| "Expected total non-inline bytes to exceed 4 GiB, got {}", | ||
| total | ||
| ); | ||
|
|
||
| // Run gc and verify correctness | ||
| let gced = array.gc(); | ||
| assert_eq!(gced.len(), num_views, "Length mismatch after gc"); | ||
| assert_eq!(gced.null_count(), 0, "Null count mismatch after gc"); | ||
| assert_ne!( | ||
| gced.data_buffers().len(), | ||
| 1, | ||
| "gc with huge buffer should not consolidate data into a single buffer" | ||
| ); | ||
|
|
||
| // Element-wise equality check across the entire array | ||
| array.iter().zip(gced.iter()).for_each(|(orig, got)| { | ||
| assert_eq!(orig, got, "Value mismatch after gc on huge array"); | ||
| }); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_eq() { | ||
| let test_data = [ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it better if I extract this into a new function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so!