Skip to content

Commit a6062ff

Browse files
authored
Merge pull request #9123 from youngsofun/stage_table
feat(stage): support `select from @a_stage`
2 parents e41890f + 4702db8 commit a6062ff

File tree

37 files changed

+590
-370
lines changed

37 files changed

+590
-370
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/ast/src/ast/format/ast_format.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2196,6 +2196,31 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
21962196
let node = FormatTreeNode::with_children(format_ctx, vec![child]);
21972197
self.children.push(node);
21982198
}
2199+
TableReference::Stage {
2200+
span: _,
2201+
location,
2202+
files,
2203+
alias,
2204+
} => {
2205+
let mut children = Vec::new();
2206+
if !files.is_empty() {
2207+
let files = files.join(",");
2208+
let files = format!("files = {}", files);
2209+
children.push(FormatTreeNode::new(AstFormatContext::new(files)))
2210+
}
2211+
let stage_name = format!("Stage {:?}", location);
2212+
let format_ctx = if let Some(alias) = alias {
2213+
AstFormatContext::with_children_alias(
2214+
stage_name,
2215+
children.len(),
2216+
Some(format!("{}", alias)),
2217+
)
2218+
} else {
2219+
AstFormatContext::with_children(stage_name, children.len())
2220+
};
2221+
let node = FormatTreeNode::with_children(format_ctx, children);
2222+
self.children.push(node)
2223+
}
21992224
}
22002225
}
22012226

src/query/ast/src/ast/format/syntax/query.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,24 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc {
314314
.append(RcDoc::text(")")),
315315
_ => RcDoc::nil(),
316316
}),
317+
TableReference::Stage {
318+
span: _,
319+
location,
320+
files,
321+
alias,
322+
} => RcDoc::text(location.to_string())
323+
.append(if files.is_empty() {
324+
RcDoc::nil()
325+
} else {
326+
let files = files.join(",");
327+
let files = format!("FILES {}", files);
328+
RcDoc::text(files)
329+
})
330+
.append(if let Some(a) = alias {
331+
RcDoc::text(format!(" AS {a}"))
332+
} else {
333+
RcDoc::nil()
334+
}),
317335
}
318336
}
319337

src/query/ast/src/ast/query.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::ast::write_comma_separated_list;
1919
use crate::ast::write_period_separated_list;
2020
use crate::ast::Expr;
2121
use crate::ast::Identifier;
22+
use crate::ast::StageLocation;
2223
use crate::parser::token::Token;
2324

2425
/// Root node of a query tree
@@ -175,6 +176,12 @@ pub enum TableReference<'a> {
175176
span: &'a [Token<'a>],
176177
join: Join<'a>,
177178
},
179+
Stage {
180+
span: &'a [Token<'a>],
181+
location: StageLocation,
182+
files: Vec<String>,
183+
alias: Option<TableAlias<'a>>,
184+
},
178185
}
179186

180187
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -345,6 +352,21 @@ impl<'a> Display for TableReference<'a> {
345352
_ => {}
346353
}
347354
}
355+
TableReference::Stage {
356+
span: _,
357+
location,
358+
files,
359+
alias,
360+
} => {
361+
write!(f, "({location})")?;
362+
if !files.is_empty() {
363+
let files = files.join(",");
364+
write!(f, " FILES {files}")?;
365+
}
366+
if let Some(alias) = alias {
367+
write!(f, " AS {alias}")?;
368+
}
369+
}
348370
}
349371
Ok(())
350372
}

src/query/ast/src/parser/query.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use pratt::Associativity;
2121
use pratt::PrattParser;
2222
use pratt::Precedence;
2323

24+
use super::statement::stage_location;
2425
use crate::ast::*;
2526
use crate::input::Input;
2627
use crate::input::WithSpan;
@@ -257,6 +258,11 @@ pub enum TableReferenceElement<'a> {
257258
// ON expr | USING (ident, ...)
258259
JoinCondition(JoinCondition<'a>),
259260
Group(TableReference<'a>),
261+
Stage {
262+
location: StageLocation,
263+
files: Vec<String>,
264+
alias: Option<TableAlias<'a>>,
265+
},
260266
}
261267

262268
pub fn table_reference_element(i: Input) -> IResult<WithSpan<TableReferenceElement>> {
@@ -319,8 +325,20 @@ pub fn table_reference_element(i: Input) -> IResult<WithSpan<TableReferenceEleme
319325
|(_, table_ref, _)| TableReferenceElement::Group(table_ref),
320326
);
321327

328+
let aliased_stage = map(
329+
rule! {
330+
#stage_location ~ ( FILES ~ "=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")")? ~ #table_alias?
331+
},
332+
|(location, files, alias)| TableReferenceElement::Stage {
333+
location,
334+
alias,
335+
files: files.map(|v| v.3).unwrap_or_default(),
336+
},
337+
);
338+
322339
let (rest, (span, elem)) = consumed(rule! {
323340
#subquery
341+
| #aliased_stage
324342
| #table_function
325343
| #aliased_table
326344
| #group
@@ -381,6 +399,16 @@ impl<'a, I: Iterator<Item = WithSpan<'a, TableReferenceElement<'a>>>> PrattParse
381399
subquery,
382400
alias,
383401
},
402+
TableReferenceElement::Stage {
403+
location,
404+
files,
405+
alias,
406+
} => TableReference::Stage {
407+
span: input.span.0,
408+
location,
409+
files,
410+
alias,
411+
},
384412
_ => unreachable!(),
385413
};
386414
Ok(table_ref)

src/query/ast/src/visitors/walk.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ pub fn walk_table_reference<'a, V: Visitor<'a>>(
262262
TableReference::Join { join, .. } => {
263263
visitor.visit_join(join);
264264
}
265+
TableReference::Stage { .. } => {}
265266
}
266267
}
267268

src/query/ast/src/visitors/walk_mut.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ pub fn walk_table_reference_mut<'a, V: VisitorMut>(
262262
TableReference::Join { join, .. } => {
263263
visitor.visit_join(join);
264264
}
265+
TableReference::Stage { .. } => {}
265266
}
266267
}
267268

src/query/catalog/src/plan/datasource.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ use crate::plan::PartStatistics;
2828
use crate::plan::Partitions;
2929
use crate::plan::Projection;
3030
use crate::plan::PushDownInfo;
31+
use crate::plan::StageFileInfo;
3132

3233
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
3334
pub struct StageTableInfo {
3435
pub schema: DataSchemaRef,
3536
pub path: String,
3637
pub files: Vec<String>,
38+
pub pattern: String,
3739
pub user_stage_info: UserStageInfo,
40+
pub files_to_copy: Option<Vec<StageFileInfo>>,
3841
}
3942

4043
impl StageTableInfo {

src/query/catalog/src/plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod partition;
1919
mod partition_statistics;
2020
mod projection;
2121
mod pushdown;
22+
mod stage_file_info;
2223

2324
pub use datasource::*;
2425
pub use expression::Expression;
@@ -27,3 +28,5 @@ pub use partition::*;
2728
pub use partition_statistics::PartStatistics;
2829
pub use projection::Projection;
2930
pub use pushdown::*;
31+
pub use stage_file_info::StageFileInfo;
32+
pub use stage_file_info::StageFileStatus;

src/query/catalog/src/plan/partition_statistics.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
use std::fmt::Debug;
1616

17-
use common_meta_app::schema::TableInfo;
18-
1917
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug, Default)]
2018
pub struct PartStatistics {
2119
/// Total rows of the query read.
@@ -72,11 +70,11 @@ impl PartStatistics {
7270
*self = Self::default();
7371
}
7472

75-
pub fn get_description(&self, table_info: &TableInfo) -> String {
73+
pub fn get_description(&self, table_desc: &str) -> String {
7674
if self.read_rows > 0 {
7775
format!(
7876
"(Read from {} table, {} Read Rows:{}, Read Bytes:{}, Partitions Scanned:{}, Partitions Total:{})",
79-
table_info.desc,
77+
table_desc,
8078
if self.is_exact {
8179
"Exactly"
8280
} else {
@@ -88,7 +86,7 @@ impl PartStatistics {
8886
self.partitions_total,
8987
)
9088
} else {
91-
format!("(Read from {} table)", table_info.desc)
89+
format!("(Read from {} table)", table_desc)
9290
}
9391
}
9492
}

0 commit comments

Comments
 (0)