Skip to content
Merged
146 changes: 134 additions & 12 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,18 +512,85 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
};
}

// 3) Allocate exactly capacity for all non-inline data
let mut data_buf = Vec::with_capacity(total_large);
let (views_buf, data_blocks) = if total_large < i32::MAX as usize {
// fast path, the entire data fits in a single buffer
// 3) Allocate exactly capacity for all non-inline data
let mut data_buf = Vec::with_capacity(total_large);

// 4) Iterate over views and process each inline/non-inline view
let views_buf: Vec<u128> = (0..len)
.map(|i| unsafe { self.copy_view_to_buffer(i, 0, &mut data_buf) })
.collect();
let data_block = Buffer::from_vec(data_buf);
let data_blocks = vec![data_block];
(views_buf, data_blocks)
} else {
// slow path, need to split into multiple buffers
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it better if I extract this into a new function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so!


struct GcCopyGroup {
total_buffer_bytes: usize,
total_len: usize,
}

impl GcCopyGroup {
fn new(total_buffer_bytes: u32, total_len: usize) -> Self {
Self {
total_buffer_bytes: total_buffer_bytes as usize,
total_len,
}
}
}

// 4) Iterate over views and process each inline/non-inline view
let views_buf: Vec<u128> = (0..len)
.map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) })
.collect();
let mut groups = Vec::new();
let mut current_length = 0;
let mut current_elements = 0;

for view in self.views() {
let len = *view as u32;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is so slow, but it's right, I can make it faster(by handling the numbers via grouping or batching) if required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how you would make this much faster - I think the code needs to find the locations to split in any event

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the buffer size is greater than i32::MAX, it's possible that a single buffer is much smaller than i32::MAX, so this can find batch-by-batch, rather than just adding small buffer one-by-one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- you are saying you could potentially optimize the input by looking at each input buffer or something and gcing it individually or something.

That would be interesting, though it would probably take a lot of care to make it fast.

if len > MAX_INLINE_VIEW_LEN {
if current_length + len > i32::MAX as u32 {
// Start a new group
groups.push(GcCopyGroup::new(current_length, current_elements));
current_length = 0;
current_elements = 0;
}
current_length += len;
current_elements += 1;
}
}
if current_elements != 0 {
groups.push(GcCopyGroup::new(current_length, current_elements));
}
debug_assert!(groups.len() <= i32::MAX as usize);

// 3) Copy the buffers group by group
let mut views_buf = Vec::with_capacity(len);
let mut data_blocks = Vec::with_capacity(groups.len());

let mut current_view_idx = 0;

for (group_idx, gc_copy_group) in groups.iter().enumerate() {
let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes);

// Directly push views to avoid intermediate Vec allocation
let new_views = (current_view_idx..current_view_idx + gc_copy_group.total_len).map(
|view_idx| {
// safety: the view index came from iterating over valid range
unsafe {
self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf)
}
},
);
views_buf.extend(new_views);

data_blocks.push(Buffer::from_vec(data_buf));
current_view_idx += gc_copy_group.total_len;
}
(views_buf, data_blocks)
};

// 5) Wrap up buffers
let data_block = Buffer::from_vec(data_buf);
// 5) Wrap up views buffer
let views_scalar = ScalarBuffer::from(views_buf);
let data_blocks = vec![data_block];

// SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized
unsafe { GenericByteViewArray::new_unchecked(views_scalar, data_blocks, nulls) }
Expand All @@ -538,10 +605,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// inside one of `self.buffers`.
/// - `data_buf` must be ready to have additional bytes appended.
/// - After this call, the returned view will have its
/// `buffer_index` reset to `0` and its `offset` updated so that it points
/// `buffer_index` reset to `buffer_idx` and its `offset` updated so that it points
/// into the bytes just appended at the end of `data_buf`.
#[inline(always)]
unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) -> u128 {
unsafe fn copy_view_to_buffer(
&self,
i: usize,
buffer_idx: i32,
data_buf: &mut Vec<u8>,
) -> u128 {
// SAFETY: `i < self.len()` ensures this is in‑bounds.
let raw_view = unsafe { *self.views().get_unchecked(i) };
let mut bv = ByteView::from(raw_view);
Expand All @@ -561,7 +633,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
let new_offset = data_buf.len() as u32;
data_buf.extend_from_slice(slice);

bv.buffer_index = 0;
bv.buffer_index = buffer_idx as u32;
bv.offset = new_offset;
bv.into()
}
Expand Down Expand Up @@ -1404,6 +1476,56 @@ mod tests {
}
}

#[test]
#[cfg_attr(miri, ignore)] // Takes too long
fn test_gc_huge_array() {
// Construct multiple 128 MiB BinaryView entries so total > 4 GiB
let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view
let num_views: usize = 36;

// Create a single 128 MiB data block with a simple byte pattern
let buffer = Buffer::from_vec(vec![0xAB; block_len]);
let buffer2 = Buffer::from_vec(vec![0xFF; block_len]);

// Append this block and then add many views pointing to it
let mut builder = BinaryViewBuilder::new();
let block_id = builder.append_block(buffer);
for _ in 0..num_views / 2 {
builder
.try_append_view(block_id, 0, block_len as u32)
.expect("append view into 128MiB block");
}
let block_id2 = builder.append_block(buffer2);
for _ in 0..num_views / 2 {
builder
.try_append_view(block_id2, 0, block_len as u32)
.expect("append view into 128MiB block");
}

let array = builder.finish();
let total = array.total_buffer_bytes_used();
assert!(
total > u32::MAX as usize,
"Expected total non-inline bytes to exceed 4 GiB, got {}",
total
);

// Run gc and verify correctness
let gced = array.gc();
assert_eq!(gced.len(), num_views, "Length mismatch after gc");
assert_eq!(gced.null_count(), 0, "Null count mismatch after gc");
assert_ne!(
gced.data_buffers().len(),
1,
"gc with huge buffer should not consolidate data into a single buffer"
);

// Element-wise equality check across the entire array
array.iter().zip(gced.iter()).for_each(|(orig, got)| {
assert_eq!(orig, got, "Value mismatch after gc on huge array");
});
}

#[test]
fn test_eq() {
let test_data = [
Expand Down
Loading