Skip to content

Commit 272e18a

Browse files
authored
feat: unload preserve order if the source is ordered. (#17864)
* feat: unload preserve order if the source is ordered. * fix
1 parent f502c4b commit 272e18a

File tree

10 files changed

+95
-9
lines changed

10 files changed

+95
-9
lines changed

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@ use databend_common_storage::StageFilesInfo;
3030

3131
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
3232
pub struct StageTableInfo {
33+
// common
34+
pub stage_root: String,
35+
pub stage_info: StageInfo,
36+
37+
// copy into table only
3338
pub schema: TableSchemaRef,
3439
pub default_exprs: Option<Vec<RemoteDefaultExpr>>,
3540
pub files_info: StageFilesInfo,
36-
pub stage_info: StageInfo,
3741
pub files_to_copy: Option<Vec<StageFileInfo>>,
3842
// files that
3943
// - are listed as candidates to be copied
@@ -42,9 +46,11 @@ pub struct StageTableInfo {
4246
// - may need to be purged as well (depends on the copy options)
4347
pub duplicated_files_detected: Vec<String>,
4448
pub is_select: bool,
45-
pub copy_into_location_options: CopyIntoLocationOptions,
4649
pub copy_into_table_options: CopyIntoTableOptions,
47-
pub stage_root: String,
50+
51+
// copy into location only
52+
pub copy_into_location_ordered: bool,
53+
pub copy_into_location_options: CopyIntoLocationOptions,
4854
}
4955

5056
impl StageTableInfo {

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ impl CopyIntoLocationInterpreter {
114114
copy_into_location_options: options.clone(),
115115
copy_into_table_options: Default::default(),
116116
stage_root: "".to_string(),
117+
copy_into_location_ordered: self.plan.is_ordered,
117118
},
118119
}));
119120

src/query/service/src/sessions/query_ctx.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1636,6 +1636,7 @@ impl TableContext for QueryContext {
16361636
copy_into_location_options: Default::default(),
16371637
copy_into_table_options: Default::default(),
16381638
stage_root,
1639+
copy_into_location_ordered: false,
16391640
};
16401641
OrcTable::try_create(info).await
16411642
}
@@ -1655,6 +1656,7 @@ impl TableContext for QueryContext {
16551656
copy_into_location_options: Default::default(),
16561657
copy_into_table_options: Default::default(),
16571658
stage_root,
1659+
copy_into_location_ordered: false,
16581660
};
16591661
StageTable::try_create(info)
16601662
}
@@ -1692,6 +1694,7 @@ impl TableContext for QueryContext {
16921694
copy_into_location_options: Default::default(),
16931695
copy_into_table_options: Default::default(),
16941696
stage_root,
1697+
copy_into_location_ordered: false,
16951698
};
16961699
StageTable::try_create(info)
16971700
}

src/query/sql/src/planner/binder/copy_into_location.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,13 @@ impl Binder {
109109
.await
110110
}
111111
}?;
112+
let mut is_ordered = false;
113+
if let Plan::Query { s_expr, .. } = &query {
114+
let p = s_expr.derive_relational_prop()?;
115+
if !p.orderings.is_empty() {
116+
is_ordered = true;
117+
}
118+
}
112119

113120
let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), &stmt.dst).await?;
114121

@@ -140,6 +147,7 @@ impl Binder {
140147
path,
141148
from: Box::new(query),
142149
options: stmt.options.clone(),
150+
is_ordered,
143151
}))
144152
}
145153
}

src/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ impl Binder {
224224
copy_into_location_options: Default::default(),
225225
copy_into_table_options: stmt.options.clone(),
226226
stage_root: "".to_string(),
227+
copy_into_location_ordered: false,
227228
},
228229
values_consts: vec![],
229230
required_source_schema: required_values_schema.clone(),
@@ -404,6 +405,7 @@ impl Binder {
404405
copy_into_location_options: Default::default(),
405406
copy_into_table_options: options,
406407
stage_root: "".to_string(),
408+
copy_into_location_ordered: false,
407409
},
408410
write_mode,
409411
query: None,

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,13 @@ pub async fn optimize(opt_ctx: Arc<OptimizerContext>, plan: Plan) -> Result<Plan
149149
path,
150150
from,
151151
options,
152+
is_ordered,
152153
}) => Ok(Plan::CopyIntoLocation(CopyIntoLocationPlan {
153154
stage,
154155
path,
155156
from: Box::new(Box::pin(optimize(opt_ctx, *from)).await?),
156157
options,
158+
is_ordered,
157159
})),
158160
Plan::CopyIntoTable(mut plan) if !plan.no_file_to_copy => {
159161
plan.enable_distributed = opt_ctx.get_enable_distributed_optimization()

src/query/sql/src/planner/plans/copy_into_location.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub struct CopyIntoLocationPlan {
3131
pub path: String,
3232
pub from: Box<Plan>,
3333
pub options: CopyIntoLocationOptions,
34+
pub is_ordered: bool,
3435
}
3536

3637
impl CopyIntoLocationPlan {

src/query/storages/stage/src/append/do_append.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ impl StageTable {
3535

3636
let fmt = self.table_info.stage_info.file_format_params.clone();
3737
let mem_limit = settings.get_max_memory_usage()? as usize;
38-
let max_threads = settings.get_max_threads()? as usize;
38+
let mut max_threads = settings.get_max_threads()? as usize;
39+
if self.table_info.copy_into_location_ordered {
40+
max_threads = 1;
41+
pipeline.try_resize(1)?;
42+
}
3943

4044
let op = StageTable::get_op(&self.table_info.stage_info)?;
4145
let query_id = ctx.get_id();

src/query/storages/stage/src/append/parquet_file/writer_processor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub struct ParquetFileWriter {
5252
arrow_schema: Arc<Schema>,
5353
compression: Compression,
5454

55-
input_data: Vec<DataBlock>,
55+
input_data: VecDeque<DataBlock>,
5656

5757
input_bytes: usize,
5858
row_counts: usize,
@@ -150,7 +150,7 @@ impl ParquetFileWriter {
150150
unload_output,
151151
unload_output_blocks: None,
152152
writer,
153-
input_data: Vec::new(),
153+
input_data: VecDeque::new(),
154154
input_bytes: 0,
155155
file_to_write: None,
156156
data_accessor,
@@ -231,11 +231,11 @@ impl Processor for ParquetFileWriter {
231231
} else if self.input.has_data() {
232232
let block = self.input.pull_data().unwrap()?;
233233
if self.targe_file_size.is_none() {
234-
self.input_data.push(block);
234+
self.input_data.push_back(block);
235235
} else {
236236
let block_meta = block.get_owned_meta().unwrap();
237237
let blocks = BlockBatch::downcast_from(block_meta).unwrap();
238-
self.input_data.extend_from_slice(&blocks.blocks);
238+
self.input_data.extend(blocks.blocks);
239239
}
240240

241241
self.input.set_not_need_data();
@@ -247,7 +247,7 @@ impl Processor for ParquetFileWriter {
247247
}
248248

249249
fn process(&mut self) -> Result<()> {
250-
while let Some(b) = self.input_data.pop() {
250+
while let Some(b) = self.input_data.pop_front() {
251251
self.input_bytes += b.memory_size();
252252
self.row_counts += b.num_rows();
253253
let batch = b.to_record_batch(&self.table_info.schema)?;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
statement ok
2+
create or replace stage s1;
3+
4+
# not ordered csv
5+
statement ok
6+
copy /*+ set_var(max_block_size=10) */ INTO @s1 from (select * from numbers(10000)) file_format=(type=csv) max_file_size=100;
7+
8+
query
9+
SELECT COUNT(*)<10000 FROM (SELECT $1 AS a, rank() OVER (ORDER BY metadata$filename, metadata$file_row_number, $1) AS rank FROM '@s1' ( FILE_FORMAT => 'csv', )) WHERE a + 1 = rank
10+
----
11+
1
12+
13+
statement ok
14+
remove @s1;
15+
16+
# ordered csv
17+
18+
statement ok
19+
copy /*+ set_var(max_block_size=10) */ INTO @s1 from (select * from numbers(10000) order by number) file_format=(type=csv) max_file_size=100;
20+
21+
query
22+
SELECT COUNT(*) FROM (SELECT $1 AS a, rank() OVER (ORDER BY metadata$filename, metadata$file_row_number, $1) AS rank FROM '@s1' ( FILE_FORMAT => 'csv', )) WHERE a + 1 = rank
23+
----
24+
10000
25+
26+
# data_af2ab6dc-8725-46e5-a601-3dad9c512769_0000_00000770.csv
27+
query
28+
SELECT * from list_stage(location => '@s1') where substr(name, 43, 4) != '0000'
29+
----
30+
31+
statement ok
32+
remove @s1;
33+
34+
35+
# ordered parquet
36+
37+
statement ok
38+
copy /*+ set_var(max_block_size=10) */ INTO @s1 from (select * from numbers(10000) order by number) file_format=(type=parquet) max_file_size=100;
39+
40+
query
41+
SELECT COUNT(*) FROM (SELECT $1 AS a, rank() OVER (ORDER BY metadata$filename, metadata$file_row_number, $1) AS rank FROM '@s1' ( FILE_FORMAT => 'parquet', )) WHERE a + 1 = rank
42+
----
43+
10000
44+
45+
statement ok
46+
remove @s1;
47+
48+
# ordered ndjson
49+
50+
statement ok
51+
copy /*+ set_var(max_block_size=10) */ INTO @s1 from (select * from numbers(10000) order by number) file_format=(type=ndjson) max_file_size=100;
52+
53+
query
54+
SELECT COUNT(*) FROM (SELECT $1:number::int AS a, rank() OVER (ORDER BY metadata$filename, metadata$file_row_number, a) AS rank FROM '@s1' ( FILE_FORMAT => 'ndjson', )) WHERE a + 1 = rank
55+
----
56+
10000
57+
58+
statement ok
59+
drop stage s1;

0 commit comments

Comments
 (0)