Skip to content

Commit 7e54bb2

Browse files
authored
[Parquet] Return error from RleDecoder::reload rather than panic (#8729)
# Which issue does this PR close? - Closes #8632. # Rationale for this change # What changes are included in this PR? - Updated `RleDecoder::reload` to return Result instead of panicking. - Adjusted all callers to handle the new return type accordingly. # Are these changes tested? Covered by existing tests # Are there any user-facing changes? No
1 parent bac0cb5 commit 7e54bb2

File tree

10 files changed

+98
-77
lines changed

10 files changed

+98
-77
lines changed

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl ByteArrayDecoder {
274274
validate_utf8,
275275
)),
276276
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary(
277-
ByteArrayDecoderDictionary::new(data, num_levels, num_values),
277+
ByteArrayDecoderDictionary::new(data, num_levels, num_values)?,
278278
),
279279
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
280280
ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
@@ -563,10 +563,10 @@ pub struct ByteArrayDecoderDictionary {
563563
}
564564

565565
impl ByteArrayDecoderDictionary {
566-
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
567-
Self {
568-
decoder: DictIndexDecoder::new(data, num_levels, num_values),
569-
}
566+
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
567+
Ok(Self {
568+
decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
569+
})
570570
}
571571

572572
fn read<I: OffsetSizeTrait>(

parquet/src/arrow/array_reader/byte_array_dictionary.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ where
293293
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
294294
let bit_width = data[0];
295295
let mut decoder = RleDecoder::new(bit_width);
296-
decoder.set_data(data.slice(1..));
296+
decoder.set_data(data.slice(1..))?;
297297
MaybeDictionaryDecoder::Dict {
298298
decoder,
299299
max_remaining_values: num_values.unwrap_or(num_levels),

parquet/src/arrow/array_reader/byte_view_array.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl ByteViewArrayDecoder {
236236
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
237237
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
238238
data, num_levels, num_values,
239-
))
239+
)?)
240240
}
241241
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
242242
ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
@@ -426,10 +426,10 @@ pub struct ByteViewArrayDecoderDictionary {
426426
}
427427

428428
impl ByteViewArrayDecoderDictionary {
429-
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
430-
Self {
431-
decoder: DictIndexDecoder::new(data, num_levels, num_values),
432-
}
429+
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
430+
Ok(Self {
431+
decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
432+
})
433433
}
434434

435435
/// Reads the next indexes from self.decoder

parquet/src/arrow/array_reader/fixed_len_byte_array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ impl ColumnValueDecoder for ValueDecoder {
381381
offset: 0,
382382
},
383383
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
384-
decoder: DictIndexDecoder::new(data, num_levels, num_values),
384+
decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
385385
},
386386
Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
387387
decoder: DeltaByteArrayDecoder::new(data)?,

parquet/src/arrow/decoder/dictionary_index.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,18 @@ pub struct DictIndexDecoder {
4242
impl DictIndexDecoder {
4343
/// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
4444
/// associated with this data page, and the number of non-null values (if known)
45-
pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
45+
pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
4646
let bit_width = data[0];
4747
let mut decoder = RleDecoder::new(bit_width);
48-
decoder.set_data(data.slice(1..));
48+
decoder.set_data(data.slice(1..))?;
4949

50-
Self {
50+
Ok(Self {
5151
decoder,
5252
index_buf: Box::new([0; 1024]),
5353
index_buf_len: 0,
5454
index_offset: 0,
5555
max_remaining_values: num_values.unwrap_or(num_levels),
56-
}
56+
})
5757
}
5858

5959
/// Read up to `len` values, returning the number of values read

parquet/src/arrow/record_reader/definition_levels.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,12 @@ impl DefinitionLevelBufferDecoder {
131131
impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
132132
type Buffer = DefinitionLevelBuffer;
133133

134-
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
134+
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
135135
match &mut self.decoder {
136136
MaybePacked::Packed(d) => d.set_data(encoding, data),
137-
MaybePacked::Fallback(d) => d.set_data(encoding, data),
138-
}
137+
MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
138+
};
139+
Ok(())
139140
}
140141
}
141142

parquet/src/column/reader.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ where
451451
self.rep_level_decoder
452452
.as_mut()
453453
.unwrap()
454-
.set_data(rep_level_encoding, level_data);
454+
.set_data(rep_level_encoding, level_data)?;
455455
}
456456

457457
if max_def_level > 0 {
@@ -466,7 +466,7 @@ where
466466
self.def_level_decoder
467467
.as_mut()
468468
.unwrap()
469-
.set_data(def_level_encoding, level_data);
469+
.set_data(def_level_encoding, level_data)?;
470470
}
471471

472472
self.values_decoder.set_data(
@@ -512,7 +512,7 @@ where
512512
self.rep_level_decoder.as_mut().unwrap().set_data(
513513
Encoding::RLE,
514514
buf.slice(..rep_levels_byte_len as usize),
515-
);
515+
)?;
516516
}
517517

518518
// DataPage v2 only supports RLE encoding for definition
@@ -524,7 +524,7 @@ where
524524
rep_levels_byte_len as usize
525525
..(rep_levels_byte_len + def_levels_byte_len) as usize,
526526
),
527-
);
527+
)?;
528528
}
529529

530530
self.values_decoder.set_data(

parquet/src/column/reader/decoder.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub trait ColumnLevelDecoder {
3232
type Buffer;
3333

3434
/// Set data for this [`ColumnLevelDecoder`]
35-
fn set_data(&mut self, encoding: Encoding, data: Bytes);
35+
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>;
3636
}
3737

3838
pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
@@ -266,15 +266,15 @@ enum LevelDecoder {
266266
}
267267

268268
impl LevelDecoder {
269-
fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
269+
fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result<Self> {
270270
match encoding {
271271
Encoding::RLE => {
272272
let mut decoder = RleDecoder::new(bit_width);
273-
decoder.set_data(data);
274-
Self::Rle(decoder)
273+
decoder.set_data(data)?;
274+
Ok(Self::Rle(decoder))
275275
}
276276
#[allow(deprecated)]
277-
Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
277+
Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data), bit_width)),
278278
_ => unreachable!("invalid level encoding: {}", encoding),
279279
}
280280
}
@@ -310,8 +310,9 @@ impl DefinitionLevelDecoderImpl {
310310
impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
311311
type Buffer = Vec<i16>;
312312

313-
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
314-
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
313+
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
314+
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
315+
Ok(())
315316
}
316317
}
317318

@@ -413,10 +414,11 @@ impl RepetitionLevelDecoderImpl {
413414
impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
414415
type Buffer = Vec<i16>;
415416

416-
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
417-
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
417+
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
418+
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
418419
self.buffer_len = 0;
419420
self.buffer_offset = 0;
421+
Ok(())
420422
}
421423
}
422424

@@ -499,14 +501,14 @@ mod tests {
499501
let data = Bytes::from(encoder.consume());
500502

501503
let mut decoder = RepetitionLevelDecoderImpl::new(1);
502-
decoder.set_data(Encoding::RLE, data.clone());
504+
decoder.set_data(Encoding::RLE, data.clone()).unwrap();
503505
let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
504506
assert_eq!(levels, 4);
505507

506508
// The length of the final bit packed run is ambiguous, so without the correct
507509
// levels limit, it will decode zero padding
508510
let mut decoder = RepetitionLevelDecoderImpl::new(1);
509-
decoder.set_data(Encoding::RLE, data);
511+
decoder.set_data(Encoding::RLE, data).unwrap();
510512
let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
511513
assert_eq!(levels, 6);
512514
}
@@ -525,7 +527,7 @@ mod tests {
525527
let data = Bytes::from(encoder.consume());
526528

527529
let mut decoder = RepetitionLevelDecoderImpl::new(5);
528-
decoder.set_data(Encoding::RLE, data);
530+
decoder.set_data(Encoding::RLE, data).unwrap();
529531

530532
let total_records = encoded.iter().filter(|x| **x == 0).count();
531533
let mut remaining_records = total_records;

parquet/src/encodings/decoding.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ impl<T: DataType> Decoder<T> for DictDecoder<T> {
393393
));
394394
}
395395
let mut rle_decoder = RleDecoder::new(bit_width);
396-
rle_decoder.set_data(data.slice(1..));
396+
rle_decoder.set_data(data.slice(1..))?;
397397
self.num_values = num_values;
398398
self.rle_decoder = Some(rle_decoder);
399399
Ok(())
@@ -473,7 +473,7 @@ impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
473473

474474
self.decoder = RleDecoder::new(1);
475475
self.decoder
476-
.set_data(data.slice(I32_SIZE..I32_SIZE + data_size));
476+
.set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
477477
self.values_left = num_values;
478478
Ok(())
479479
}

0 commit comments

Comments
 (0)