|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
15 | 15 | use std::any::Any;
|
| 16 | +use std::collections::VecDeque; |
16 | 17 | use std::ops::BitAnd;
|
17 | 18 | use std::sync::Arc;
|
18 | 19 | use std::time::Instant;
|
@@ -65,7 +66,7 @@ pub struct DeserializeDataTransform {
|
65 | 66 |
|
66 | 67 | input: Arc<InputPort>,
|
67 | 68 | output: Arc<OutputPort>,
|
68 |
| - output_data: Vec<DataBlock>, |
| 69 | + output_data: VecDeque<DataBlock>, |
69 | 70 | src_schema: DataSchema,
|
70 | 71 | output_schema: DataSchema,
|
71 | 72 | parts: Vec<PartInfoPtr>,
|
@@ -132,7 +133,7 @@ impl DeserializeDataTransform {
|
132 | 133 | block_reader,
|
133 | 134 | input,
|
134 | 135 | output,
|
135 |
| - output_data: vec![], |
| 136 | + output_data: VecDeque::new(), |
136 | 137 | src_schema,
|
137 | 138 | output_schema,
|
138 | 139 | parts: vec![],
|
@@ -231,7 +232,7 @@ impl Processor for DeserializeDataTransform {
|
231 | 232 | return Ok(Event::NeedConsume);
|
232 | 233 | }
|
233 | 234 |
|
234 |
| - if let Some(data_block) = self.output_data.pop() { |
| 235 | + if let Some(data_block) = self.output_data.pop_front() { |
235 | 236 | self.output.push_data(Ok(data_block));
|
236 | 237 | return Ok(Event::NeedConsume);
|
237 | 238 | }
|
@@ -281,7 +282,7 @@ impl Processor for DeserializeDataTransform {
|
281 | 282 |
|
282 | 283 | self.update_scan_metrics(blocks.as_slice());
|
283 | 284 |
|
284 |
| - self.output_data = blocks; |
| 285 | + self.output_data = blocks.into(); |
285 | 286 | }
|
286 | 287 |
|
287 | 288 | ParquetDataSource::Normal((data, virtual_data)) => {
|
@@ -316,7 +317,7 @@ impl Processor for DeserializeDataTransform {
|
316 | 317 |
|
317 | 318 | self.update_scan_metrics(data_blocks.as_slice());
|
318 | 319 |
|
319 |
| - let mut output_blocks = Vec::with_capacity(data_blocks.len()); |
| 320 | + let mut output_blocks = VecDeque::with_capacity(data_blocks.len()); |
320 | 321 | for mut data_block in data_blocks {
|
321 | 322 | let origin_num_rows = data_block.num_rows();
|
322 | 323 |
|
@@ -357,7 +358,7 @@ impl Processor for DeserializeDataTransform {
|
357 | 358 | self.block_reader.query_internal_columns(),
|
358 | 359 | self.need_reserve_block_info,
|
359 | 360 | )?;
|
360 |
| - output_blocks.push(data_block); |
| 361 | + output_blocks.push_back(data_block); |
361 | 362 | }
|
362 | 363 |
|
363 | 364 | self.output_data = output_blocks;
|
|
0 commit comments