From f91a33cffaf9b53704815b237401f8c495832e7a Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Mon, 27 Oct 2025 20:02:37 -0400 Subject: [PATCH 1/7] Return result for rle reload --- .../array_reader/byte_array_dictionary.rs | 2 +- parquet/src/encodings/decoding.rs | 4 +- parquet/src/encodings/rle.rs | 68 +++++++++---------- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 4afe4264cb41..09de37a80ed9 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -293,7 +293,7 @@ where Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.slice(1..)); + decoder.set_data(data.slice(1..))?; MaybeDictionaryDecoder::Dict { decoder, max_remaining_values: num_values.unwrap_or(num_levels), diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index f5336ca7c09a..8201b38753c6 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -393,7 +393,7 @@ impl Decoder for DictDecoder { )); } let mut rle_decoder = RleDecoder::new(bit_width); - rle_decoder.set_data(data.slice(1..)); + rle_decoder.set_data(data.slice(1..))?; self.num_values = num_values; self.rle_decoder = Some(rle_decoder); Ok(()) @@ -473,7 +473,7 @@ impl Decoder for RleValueDecoder { self.decoder = RleDecoder::new(1); self.decoder - .set_data(data.slice(I32_SIZE..I32_SIZE + data_size)); + .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?; self.values_left = num_values; Ok(()) } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 41c050132064..0ca0b6c4b200 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -321,14 +321,15 @@ impl RleDecoder { } #[inline] - pub fn set_data(&mut self, data: Bytes) { + pub fn set_data(&mut self, data: Bytes) -> Result<()> { if let Some(ref mut bit_reader) = self.bit_reader { bit_reader.reset(data); } else { self.bit_reader = Some(BitReader::new(data)); } - let _ = self.reload(); + self.reload()?; + Ok(()) } // These functions inline badly, they tend to inline and then create very large loop unrolls @@ -339,7 +340,7 @@ impl RleDecoder { assert!(size_of::() <= 8); while self.rle_left == 0 && self.bit_packed_left == 0 { - if !self.reload() { + if !self.reload()? { return Ok(None); } } @@ -396,7 +397,7 @@ impl RleDecoder { } self.bit_packed_left -= num_values as u32; values_read += num_values; - } else if !self.reload() { + } else if !self.reload()? { break; } } @@ -425,7 +426,7 @@ impl RleDecoder { } self.bit_packed_left -= num_values as u32; values_skipped += num_values; - } else if !self.reload() { + } else if !self.reload()? { break; } } @@ -488,7 +489,7 @@ impl RleDecoder { break; } } - } else if !self.reload() { + } else if !self.reload()? { break; } } @@ -497,7 +498,7 @@ impl RleDecoder { } #[inline] - fn reload(&mut self) -> bool { + fn reload(&mut self) -> Result { let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); if let Some(indicator_value) = bit_reader.get_vlq_int() { @@ -505,7 +506,7 @@ impl RleDecoder { // but is handled by the C++ implementation // if indicator_value == 0 { - return false; + return Ok(false); } if indicator_value & 1 == 1 { self.bit_packed_left = ((indicator_value >> 1) * 8) as u32; @@ -513,14 +514,13 @@ impl RleDecoder { self.rle_left = (indicator_value >> 1) as u32; let value_width = bit_util::ceil(self.bit_width as usize, 8); self.current_value = bit_reader.get_aligned::(value_width); - assert!( - self.current_value.is_some(), - "parquet_data_error: not enough data for RLE decoding" - ); + if self.current_value.is_none() { + return Err(ParquetError::General("parquet_data_error: not enough data for RLE decoding".into())) + } } - true + Ok(true) } else { - false + Ok(false) } } } @@ -540,7 +540,7 @@ mod tests { // 00000011 10001000 11000110 11111010 let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![0; 8]; let expected = vec![0, 1, 2, 3, 4, 5, 6, 7]; let result = decoder.get_batch::(&mut buffer); @@ -554,7 +554,7 @@ mod tests { // 00000011 10001000 11000110 11111010 let data = vec![0x03, 0x88, 0xC6, 0xFA]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let expected = vec![2, 3, 4, 5, 6, 7]; let skipped = decoder.skip(2).expect("skipping values"); assert_eq!(skipped, 2); @@ -595,7 +595,7 @@ mod tests { ]; let mut decoder: RleDecoder = RleDecoder::new(1); - decoder.set_data(data1.into()); + decoder.set_data(data1.into()).unwrap(); let mut buffer = vec![false; 100]; let mut expected = vec![]; for i in 0..100 { @@ -609,7 +609,7 @@ mod tests { assert!(result.is_ok()); assert_eq!(buffer, expected); - decoder.set_data(data2.into()); + decoder.set_data(data2.into()).unwrap(); let mut buffer = vec![false; 100]; let mut expected = vec![]; for i in 0..100 { @@ -638,7 +638,7 @@ mod tests { ]; let mut decoder: RleDecoder = RleDecoder::new(1); - decoder.set_data(data1.into()); + decoder.set_data(data1.into()).unwrap(); let mut buffer = vec![true; 50]; let expected = vec![false; 50]; @@ -650,7 +650,7 @@ mod tests { assert_eq!(remainder, 50); assert_eq!(buffer, expected); - decoder.set_data(data2.into()); + decoder.set_data(data2.into()).unwrap(); let mut buffer = vec![false; 50]; let mut expected = vec![]; for i in 0..50 { @@ -676,7 +676,7 @@ mod tests { let dict = vec![10, 20, 30]; let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![0; 12]; let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let result = decoder.get_batch_with_dict::(&dict, &mut buffer, 12); @@ -689,7 +689,7 @@ mod tests { let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![""; 12]; let expected = vec![ "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff", @@ -707,7 +707,7 @@ mod tests { let dict = vec![10, 20, 30]; let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![0; 10]; let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let skipped = decoder.skip(2).expect("skipping two values"); @@ -724,7 +724,7 @@ mod tests { let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"]; let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B]; let mut decoder: RleDecoder = RleDecoder::new(3); - decoder.set_data(data.into()); + decoder.set_data(data.into()).unwrap(); let mut buffer = vec![""; 8]; let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"]; let skipped = decoder.skip(4).expect("skipping four values"); @@ -757,7 +757,7 @@ mod tests { // Verify read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.clone()); + decoder.set_data(buffer.clone()).unwrap(); for v in values { let val: i64 = decoder .get() @@ -767,7 +767,7 @@ mod tests { } // Verify batch read - decoder.set_data(buffer); + decoder.set_data(buffer).unwrap(); let mut values_read: Vec = vec![0; values.len()]; decoder .get_batch(&mut values_read[..]) @@ -872,7 +872,7 @@ mod tests { let data: Bytes = data.into(); let mut decoder = RleDecoder::new(8); - decoder.set_data(data.clone()); + decoder.set_data(data.clone()).unwrap(); let mut output = vec![0_u16; 100]; let read = decoder.get_batch(&mut output).unwrap(); @@ -881,7 +881,7 @@ mod tests { assert!(output.iter().take(20).all(|x| *x == 255)); // Reset decoder - decoder.set_data(data); + decoder.set_data(data).unwrap(); let dict: Vec = (0..256).collect(); let mut output = vec![0_u16; 100]; @@ -907,7 +907,7 @@ mod tests { buffer.push(0); let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.into()); + decoder.set_data(buffer.into()).unwrap(); // We don't always reliably know how many non-null values are contained in a page // and so the decoder must work correctly without a precise value count @@ -947,14 +947,14 @@ mod tests { let buffer: Bytes = writer.consume().into(); let mut decoder = RleDecoder::new(1); - decoder.set_data(buffer.clone()); + decoder.set_data(buffer.clone()).unwrap(); let mut decoded: Vec = vec![0; num_values]; let r = decoder.get_batch(&mut decoded).unwrap(); assert_eq!(r, num_values); assert_eq!(vec![1; num_values], decoded); - decoder.set_data(buffer); + decoder.set_data(buffer).unwrap(); let r = decoder .get_batch_with_dict(&[0, 23], &mut decoded, num_values) .unwrap(); @@ -973,7 +973,7 @@ mod tests { } let buffer = encoder.consume(); let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(Bytes::from(buffer)); + decoder.set_data(Bytes::from(buffer)).unwrap(); let mut actual_values: Vec = vec![0; values.len()]; decoder .get_batch(&mut actual_values) @@ -992,7 +992,7 @@ mod tests { // Verify read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer.clone()); + decoder.set_data(buffer.clone()).unwrap(); for v in values { let val = decoder .get::() @@ -1003,7 +1003,7 @@ mod tests { // Verify batch read let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(buffer); + decoder.set_data(buffer).unwrap(); let mut values_read: Vec = vec![0; values.len()]; decoder .get_batch(&mut values_read[..]) From 3b7ffe0dae4fd28a67dbcb42ab05080cb873cc0c Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Mon, 27 Oct 2025 20:10:10 -0400 Subject: [PATCH 2/7] Return result for DictIndexDecoder::new --- parquet/src/arrow/array_reader/byte_array.rs | 10 +++++----- parquet/src/arrow/array_reader/byte_view_array.rs | 10 +++++----- parquet/src/arrow/array_reader/fixed_len_byte_array.rs | 2 +- parquet/src/arrow/decoder/dictionary_index.rs | 8 ++++---- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 5a495d2c6085..0acbe6501924 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -274,7 +274,7 @@ impl ByteArrayDecoder { validate_utf8, )), Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary( - ByteArrayDecoderDictionary::new(data, num_levels, num_values), + ByteArrayDecoderDictionary::new(data, num_levels, num_values)?, ), Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength( ByteArrayDecoderDeltaLength::new(data, validate_utf8)?, @@ -563,10 +563,10 @@ pub struct ByteArrayDecoderDictionary { } impl ByteArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Result { + Ok(Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values)?, + }) } fn read( diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index cc71647f4bc9..f881690f805f 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -236,7 +236,7 @@ impl ByteViewArrayDecoder { Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( data, num_levels, num_values, - )) + )?) } Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, @@ -426,10 +426,10 @@ pub struct ByteViewArrayDecoderDictionary { } impl ByteViewArrayDecoderDictionary { - fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { - Self { - decoder: DictIndexDecoder::new(data, num_levels, num_values), - } + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Result { + Ok(Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values)?, + }) } /// Reads the next indexes from self.decoder diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index a37bef568d1c..2297926add5f 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -381,7 +381,7 @@ impl ColumnValueDecoder for ValueDecoder { offset: 0, }, Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict { - decoder: DictIndexDecoder::new(data, num_levels, num_values), + decoder: DictIndexDecoder::new(data, num_levels, num_values)?, }, Encoding::DELTA_BYTE_ARRAY => Decoder::Delta { decoder: DeltaByteArrayDecoder::new(data)?, diff --git a/parquet/src/arrow/decoder/dictionary_index.rs b/parquet/src/arrow/decoder/dictionary_index.rs index 38f2b058360c..bb96f4bf98d6 100644 --- a/parquet/src/arrow/decoder/dictionary_index.rs +++ b/parquet/src/arrow/decoder/dictionary_index.rs @@ -42,18 +42,18 @@ pub struct DictIndexDecoder { impl DictIndexDecoder { /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels /// associated with this data page, and the number of non-null values (if known) - pub fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { + pub fn new(data: Bytes, num_levels: usize, num_values: Option) -> Result { let bit_width = data[0]; let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.slice(1..)); + decoder.set_data(data.slice(1..))?; - Self { + Ok(Self { decoder, index_buf: Box::new([0; 1024]), index_buf_len: 0, index_offset: 0, max_remaining_values: num_values.unwrap_or(num_levels), - } + }) } /// Read up to `len` values, returning the number of values read From bcb5926eb287ee136faff6dfdd5f2d1f9c78f7b4 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Mon, 27 Oct 2025 20:18:41 -0400 Subject: [PATCH 3/7] Return result for LevelDecoder::new --- .../arrow/record_reader/definition_levels.rs | 7 +++-- parquet/src/column/reader.rs | 8 +++--- parquet/src/column/reader/decoder.rs | 26 ++++++++++--------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 34b728d6fa1e..6b0dc882e9ff 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -131,9 +131,12 @@ impl DefinitionLevelBufferDecoder { impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { type Buffer = DefinitionLevelBuffer; - fn set_data(&mut self, encoding: Encoding, data: Bytes) { + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { match &mut self.decoder { - MaybePacked::Packed(d) => d.set_data(encoding, data), + MaybePacked::Packed(d) => { + d.set_data(encoding, data); + Ok(()) + } MaybePacked::Fallback(d) => d.set_data(encoding, data), } } diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ebde79e6a7f2..387a0602a60d 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -451,7 +451,7 @@ where self.rep_level_decoder .as_mut() .unwrap() - .set_data(rep_level_encoding, level_data); + .set_data(rep_level_encoding, level_data)?; } if max_def_level > 0 { @@ -466,7 +466,7 @@ where self.def_level_decoder .as_mut() .unwrap() - .set_data(def_level_encoding, level_data); + .set_data(def_level_encoding, level_data)?; } self.values_decoder.set_data( @@ -512,7 +512,7 @@ where self.rep_level_decoder.as_mut().unwrap().set_data( Encoding::RLE, buf.slice(..rep_levels_byte_len as usize), - ); + )?; } // DataPage v2 only supports RLE encoding for definition @@ -524,7 +524,7 @@ where rep_levels_byte_len as usize ..(rep_levels_byte_len + def_levels_byte_len) as usize, ), - ); + )?; } self.values_decoder.set_data( diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index e49906207577..053db813ce5d 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -32,7 +32,7 @@ pub trait ColumnLevelDecoder { type Buffer; /// Set data for this [`ColumnLevelDecoder`] - fn set_data(&mut self, encoding: Encoding, data: Bytes); + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>; } pub trait RepetitionLevelDecoder: ColumnLevelDecoder { @@ -266,15 +266,15 @@ enum LevelDecoder { } impl LevelDecoder { - fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self { + fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result { match encoding { Encoding::RLE => { let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data); - Self::Rle(decoder) + decoder.set_data(data)?; + Ok(Self::Rle(decoder)) } #[allow(deprecated)] - Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width), + Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data), bit_width)), _ => unreachable!("invalid level encoding: {}", encoding), } } @@ -310,8 +310,9 @@ impl DefinitionLevelDecoderImpl { impl ColumnLevelDecoder for DefinitionLevelDecoderImpl { type Buffer = Vec; - fn set_data(&mut self, encoding: Encoding, data: Bytes) { - self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)) + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { + self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?); + Ok(()) } } @@ -413,10 +414,11 @@ impl RepetitionLevelDecoderImpl { impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { type Buffer = Vec; - fn set_data(&mut self, encoding: Encoding, data: Bytes) { - self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)); + fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { + self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?); self.buffer_len = 0; self.buffer_offset = 0; + Ok(()) } } @@ -499,14 +501,14 @@ mod tests { let data = Bytes::from(encoder.consume()); let mut decoder = RepetitionLevelDecoderImpl::new(1); - decoder.set_data(Encoding::RLE, data.clone()); + decoder.set_data(Encoding::RLE, data.clone()).unwrap(); let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap(); assert_eq!(levels, 4); // The length of the final bit packed run is ambiguous, so without the correct // levels limit, it will decode zero padding let mut decoder = RepetitionLevelDecoderImpl::new(1); - decoder.set_data(Encoding::RLE, data); + decoder.set_data(Encoding::RLE, data).unwrap(); let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap(); assert_eq!(levels, 6); } @@ -525,7 +527,7 @@ mod tests { let data = Bytes::from(encoder.consume()); let mut decoder = RepetitionLevelDecoderImpl::new(5); - decoder.set_data(Encoding::RLE, data); + decoder.set_data(Encoding::RLE, data).unwrap(); let total_records = encoded.iter().filter(|x| **x == 0).count(); let mut remaining_records = total_records; From 6a6fb8aba6b613af8447f731bb0adfb244eb9dd7 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Mon, 27 Oct 2025 20:32:31 -0400 Subject: [PATCH 4/7] Clean up bit_reader expect --- parquet/src/encodings/rle.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 0ca0b6c4b200..32c9519bc22e 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -384,7 +384,10 @@ impl RleDecoder { } else if self.bit_packed_left > 0 { let mut num_values = cmp::min(buffer.len() - values_read, self.bit_packed_left as usize); - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| ParquetError::General("bit_reader should be set".into()))?; num_values = bit_reader.get_batch::( &mut buffer[values_read..values_read + num_values], @@ -416,7 +419,10 @@ impl RleDecoder { } else if self.bit_packed_left > 0 { let mut num_values = cmp::min(num_values - values_skipped, self.bit_packed_left as usize); - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be set"))?; num_values = bit_reader.skip(num_values, self.bit_width as usize); if num_values == 0 { @@ -460,7 +466,10 @@ impl RleDecoder { self.rle_left -= num_values as u32; values_read += num_values; } else if self.bit_packed_left > 0 { - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be set"))?; loop { let to_read = index_buf @@ -499,7 +508,10 @@ impl RleDecoder { #[inline] fn reload(&mut self) -> Result { - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be set"))?; if let Some(indicator_value) = bit_reader.get_vlq_int() { // fastparquet adds padding to the end of pages. This is not spec-compliant @@ -514,9 +526,9 @@ impl RleDecoder { self.rle_left = (indicator_value >> 1) as u32; let value_width = bit_util::ceil(self.bit_width as usize, 8); self.current_value = bit_reader.get_aligned::(value_width); - if self.current_value.is_none() { - return Err(ParquetError::General("parquet_data_error: not enough data for RLE decoding".into())) - } + self.current_value.ok_or_else(|| { + general_err!("parquet_data_error: not enough data for RLE decoding") + })?; } Ok(true) } else { From 7e8f0e30a594bdcb3778e49dfdef492796786afd Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Mon, 27 Oct 2025 21:54:16 -0400 Subject: [PATCH 5/7] Remaining expect usages --- parquet/src/encodings/rle.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 32c9519bc22e..38c419532d96 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -350,14 +350,17 @@ impl RleDecoder { &self .current_value .as_mut() - .expect("current_value should be Some") + .ok_or_else(|| general_err!("current_value should be Some"))? .to_ne_bytes(), )?; self.rle_left -= 1; rle_value } else { // self.bit_packed_left > 0 - let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be Some"); + let bit_reader = self + .bit_reader + .as_mut() + .ok_or_else(|| general_err!("bit_reader should be Some"))?; let bit_packed_value = bit_reader .get_value(self.bit_width as usize) .ok_or_else(|| eof_err!("Not enough data for 'bit_packed_value'"))?; From 72a11744e83b64cce006d6ec383accd46b024ed5 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Wed, 29 Oct 2025 18:16:23 -0400 Subject: [PATCH 6/7] Move Ok out of match arms --- parquet/src/arrow/record_reader/definition_levels.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 6b0dc882e9ff..8fe26a9b5234 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -133,12 +133,10 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> { match &mut self.decoder { - MaybePacked::Packed(d) => { - d.set_data(encoding, data); - Ok(()) - } - MaybePacked::Fallback(d) => d.set_data(encoding, data), - } + MaybePacked::Packed(d) => d.set_data(encoding, data), + MaybePacked::Fallback(d) => d.set_data(encoding, data)?, + }; + Ok(()) } } From 87040212994489c199321aa184d9a3c6cec503b3 Mon Sep 17 00:00:00 2001 From: Liam Bao Date: Wed, 29 Oct 2025 18:53:52 -0400 Subject: [PATCH 7/7] Add comments --- parquet/src/encodings/rle.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 38c419532d96..c95a46c634d2 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -328,7 +328,10 @@ impl RleDecoder { self.bit_reader = Some(BitReader::new(data)); } - self.reload()?; + // Initialize decoder state. The boolean only reports whether the first run contained data, + // and `get`/`get_batch` already interpret that result to drive iteration. We only need + // errors propagated here, so the flag returned is intentionally ignored. + let _ = self.reload()?; Ok(()) }