diff --git a/src/query/ast/src/ast/statements/call.rs b/src/query/ast/src/ast/statements/call.rs index 52ba987a10aeb..1d78018d13c8a 100644 --- a/src/query/ast/src/ast/statements/call.rs +++ b/src/query/ast/src/ast/statements/call.rs @@ -19,10 +19,11 @@ use derive_visitor::Drive; use derive_visitor::DriveMut; use crate::ast::write_comma_separated_string_list; +use crate::ast::Identifier; #[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] pub struct CallStmt { - pub name: String, + pub name: Identifier, pub args: Vec, } diff --git a/src/query/ast/src/ast/statements/procedure.rs b/src/query/ast/src/ast/statements/procedure.rs index ecb226a0d69f6..1931edc29cc02 100644 --- a/src/query/ast/src/ast/statements/procedure.rs +++ b/src/query/ast/src/ast/statements/procedure.rs @@ -21,16 +21,17 @@ use derive_visitor::DriveMut; use crate::ast::write_comma_separated_list; use crate::ast::CreateOption; use crate::ast::Expr; +use crate::ast::Identifier; use crate::ast::TypeName; -#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct ExecuteImmediateStmt { - pub script: String, + pub script: Expr, } impl Display for ExecuteImmediateStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "EXECUTE IMMEDIATE $$\n{}\n$$", self.script)?; + write!(f, "EXECUTE IMMEDIATE {}", self.script)?; Ok(()) } } @@ -170,7 +171,7 @@ impl Display for DescProcedureStmt { #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct CallProcedureStmt { - pub name: String, + pub name: Identifier, pub args: Vec, } diff --git a/src/query/ast/src/ast/statements/script.rs b/src/query/ast/src/ast/statements/script.rs index 9c599b1d9af36..219671233b0e7 100644 --- a/src/query/ast/src/ast/statements/script.rs +++ b/src/query/ast/src/ast/statements/script.rs @@ -115,6 +115,47 @@ impl Display for ReturnItem { } } +#[derive(Debug, Clone, PartialEq)] +pub struct DeclareCursor { + pub span: Span, + pub name: Identifier, + pub stmt: Option, + pub resultset: Option, +} + +impl Display for DeclareCursor { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let DeclareCursor { + name, + stmt, + resultset, + .. + } = self; + if let Some(stmt) = stmt { + write!(f, "{name} CURSOR FOR {stmt}") + } else if let Some(resultset) = resultset { + write!(f, "{name} CURSOR FOR {resultset}") + } else { + write!(f, "{name} CURSOR") + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum IterableItem { + Resultset(Identifier), + Cursor(Identifier), +} + +impl Display for IterableItem { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + IterableItem::Resultset(name) => write!(f, "{name}"), + IterableItem::Cursor(name) => write!(f, "{name}"), + } + } +} + #[derive(Debug, Clone, PartialEq)] pub enum ScriptStatement { LetVar { @@ -123,6 +164,9 @@ pub enum ScriptStatement { LetStatement { declare: DeclareSet, }, + LetCursor { + declare: DeclareCursor, + }, RunStatement { span: Span, stmt: Statement, @@ -132,6 +176,19 @@ pub enum ScriptStatement { name: Identifier, value: Expr, }, + OpenCursor { + span: Span, + cursor: Identifier, + }, + FetchCursor { + span: Span, + cursor: Identifier, + into_var: Identifier, + }, + CloseCursor { + span: Span, + cursor: Identifier, + }, Return { span: Span, value: Option, @@ -148,7 +205,7 @@ pub enum ScriptStatement { ForInSet { span: Span, variable: Identifier, - resultset: Identifier, + iterable: IterableItem, body: Vec, label: Option, }, @@ -204,8 +261,14 @@ impl Display for ScriptStatement { match self { ScriptStatement::LetVar { declare, .. } => write!(f, "LET {declare}"), ScriptStatement::LetStatement { declare, .. } => write!(f, "LET {declare}"), + ScriptStatement::LetCursor { declare, .. } => write!(f, "LET {declare}"), ScriptStatement::RunStatement { stmt, .. } => write!(f, "{stmt}"), ScriptStatement::Assign { name, value, .. } => write!(f, "{name} := {value}"), + ScriptStatement::OpenCursor { cursor, .. } => write!(f, "OPEN {cursor}"), + ScriptStatement::FetchCursor { + cursor, into_var, .. + } => write!(f, "FETCH {cursor} INTO {into_var}"), + ScriptStatement::CloseCursor { cursor, .. } => write!(f, "CLOSE {cursor}"), ScriptStatement::Return { value, .. } => { if let Some(value) = value { write!(f, "RETURN {value}") @@ -242,12 +305,12 @@ impl Display for ScriptStatement { } ScriptStatement::ForInSet { variable, - resultset, + iterable, body, label, .. } => { - writeln!(f, "FOR {variable} IN {resultset} DO")?; + writeln!(f, "FOR {variable} IN {iterable} DO")?; for stmt in body { writeln!( f, diff --git a/src/query/ast/src/ast/statements/task.rs b/src/query/ast/src/ast/statements/task.rs index 4ed43b4127a64..2dff48c74955b 100644 --- a/src/query/ast/src/ast/statements/task.rs +++ b/src/query/ast/src/ast/statements/task.rs @@ -24,6 +24,7 @@ use crate::ast::quote::QuotedString; use crate::ast::write_comma_separated_string_list; use crate::ast::write_comma_separated_string_map; use crate::ast::Expr; +use crate::ast::Identifier; use crate::ast::ShowLimit; #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] @@ -370,7 +371,7 @@ impl Display for ShowTasksStmt { #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct ExecuteTaskStmt { - pub name: String, + pub name: Identifier, } impl Display for ExecuteTaskStmt { @@ -381,7 +382,7 @@ impl Display for ExecuteTaskStmt { #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct DescribeTaskStmt { - pub name: String, + pub name: Identifier, } impl Display for DescribeTaskStmt { diff --git a/src/query/ast/src/parser/script.rs b/src/query/ast/src/parser/script.rs index ca77d3e783db7..928a9ced515e8 100644 --- a/src/query/ast/src/parser/script.rs +++ b/src/query/ast/src/parser/script.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use nom::branch::alt; use nom::combinator::consumed; use nom::combinator::map; use nom_rule::rule; @@ -23,6 +24,25 @@ use crate::parser::input::Input; use crate::parser::statement::*; use crate::parser::token::*; +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, PartialEq)] +pub enum ScriptBlockOrStmt { + ScriptBlock(ScriptBlock), + Statement(Statement), +} + +pub fn script_block_or_stmt(i: Input) -> IResult { + alt(( + map(script_block, ScriptBlockOrStmt::ScriptBlock), + map( + consumed(rule! { + #statement + }), + |(_, stmt)| ScriptBlockOrStmt::Statement(stmt.stmt), + ), + ))(i) +} + pub fn script_block(i: Input) -> IResult { map( consumed(rule! { @@ -79,6 +99,53 @@ pub fn declare_set(i: Input) -> IResult { )(i) } +pub fn declare_cursor(i: Input) -> IResult { + map( + consumed(rule! { + #ident ~ CURSOR ~ ^FOR ~ ^#cursor_target + }), + |(span, (name, _, _, target))| match target { + CursorTarget::Resultset(resultset) => DeclareCursor { + span: transform_span(span.tokens), + name, + stmt: None, + resultset: Some(resultset), + }, + CursorTarget::Statement(stmt) => DeclareCursor { + span: transform_span(span.tokens), + name, + stmt: Some(stmt), + resultset: None, + }, + }, + )(i) +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum CursorTarget { + Resultset(Identifier), + Statement(Statement), +} + +pub(crate) fn cursor_target(i: Input) -> IResult { + // Try identifier first, then statement + let resultset = map(ident, CursorTarget::Resultset); + let statement = map(statement_body, CursorTarget::Statement); + + rule!( + #resultset + | #statement + )(i) +} + +pub(crate) fn iterable_item(i: Input) -> IResult { + // For now, we'll treat all identifiers as potential iterables + // The compiler will determine if it's a cursor or resultset + // based on what was actually declared + map(ident, IterableItem::Resultset)(i) +} + pub fn script_stmts(i: Input) -> IResult> { semicolon_terminated_list1(script_stmt)(i) } @@ -96,6 +163,40 @@ pub fn script_stmt(i: Input) -> IResult { }, |(_, declare)| ScriptStatement::LetStatement { declare }, ); + let let_cursor_stmt = map( + rule! { + LET ~ #declare_cursor + }, + |(_, declare)| ScriptStatement::LetCursor { declare }, + ); + let open_cursor_stmt = map( + consumed(rule! { + OPEN ~ #ident + }), + |(span, (_, cursor))| ScriptStatement::OpenCursor { + span: transform_span(span.tokens), + cursor, + }, + ); + let fetch_cursor_stmt = map( + consumed(rule! { + FETCH ~ #ident ~ ^INTO ~ ^#ident + }), + |(span, (_, cursor, _, into_var))| ScriptStatement::FetchCursor { + span: transform_span(span.tokens), + cursor, + into_var, + }, + ); + let close_cursor_stmt = map( + consumed(rule! { + CLOSE ~ #ident + }), + |(span, (_, cursor))| ScriptStatement::CloseCursor { + span: transform_span(span.tokens), + cursor, + }, + ); let run_stmt = map( consumed(rule! { #statement_body @@ -173,14 +274,14 @@ pub fn script_stmt(i: Input) -> IResult { ); let for_in_set_stmt = map( consumed(rule! { - FOR ~ ^#ident ~ ^IN ~ #ident ~ ^DO + FOR ~ ^#ident ~ ^IN ~ #iterable_item ~ ^DO ~ ^#semicolon_terminated_list1(script_stmt) ~ ^END ~ ^FOR ~ #ident? }), - |(span, (_, variable, _, resultset, _, body, _, _, label))| ScriptStatement::ForInSet { + |(span, (_, variable, _, iterable, _, body, _, _, label))| ScriptStatement::ForInSet { span: transform_span(span.tokens), variable, - resultset, + iterable, body, label, }, @@ -299,24 +400,48 @@ pub fn script_stmt(i: Input) -> IResult { }, ); - rule!( + let cursor_stmts = rule!( + #let_cursor_stmt + | #open_cursor_stmt + | #fetch_cursor_stmt + | #close_cursor_stmt + ); + + let assignment_stmts = rule!( #let_stmt_stmt | #let_var_stmt - | #run_stmt | #assign_stmt - | #return_set_stmt + ); + + let control_flow_stmts = rule!( + #return_set_stmt | #return_stmt_stmt | #return_var_stmt | #return_stmt - | #for_loop_stmt + | #break_stmt + | #continue_stmt + ); + + let loop_stmts = rule!( + #for_loop_stmt | #for_in_set_stmt | #for_in_stmt_stmt | #while_loop_stmt | #repeat_loop_stmt | #loop_stmt - | #break_stmt - | #continue_stmt - | #case_stmt + ); + + let conditional_stmts = rule!( + #case_stmt | #if_stmt + ); + + rule!( + #assignment_stmts + | #cursor_stmts + | #control_flow_stmts + | #loop_stmts + | #conditional_stmts + | #run_stmt )(i) } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b0f1f89dc90ca..6f9fd2c3e1400 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -226,22 +226,14 @@ pub fn statement_body(i: Input) -> IResult { rule! { EXECUTE ~ TASK ~ #ident }, - |(_, _, task)| { - Statement::ExecuteTask(ExecuteTaskStmt { - name: task.to_string(), - }) - }, + |(_, _, task)| Statement::ExecuteTask(ExecuteTaskStmt { name: task }), ); let desc_task = map( rule! { ( DESC | DESCRIBE ) ~ TASK ~ #ident }, - |(_, _, task)| { - Statement::DescribeTask(DescribeTaskStmt { - name: task.to_string(), - }) - }, + |(_, _, task)| Statement::DescribeTask(DescribeTaskStmt { name: task }), ); let merge = map( @@ -2005,12 +1997,7 @@ pub fn statement_body(i: Input) -> IResult { rule! { CALL ~ #ident ~ "(" ~ #comma_separated_list0(parameter_to_string) ~ ")" }, - |(_, name, _, args, _)| { - Statement::Call(CallStmt { - name: name.to_string(), - args, - }) - }, + |(_, name, _, args, _)| Statement::Call(CallStmt { name, args }), ); let vacuum_temporary_tables = map( @@ -2019,7 +2006,7 @@ pub fn statement_body(i: Input) -> IResult { }, |(_, _, _, opt_limit)| { Statement::Call(CallStmt { - name: "fuse_vacuum_temporary_table".to_string(), + name: Identifier::from_name(None, "fuse_vacuum_temporary_table"), args: opt_limit.map(|v| v.1.to_string()).into_iter().collect(), }) }, @@ -2418,7 +2405,7 @@ pub fn statement_body(i: Input) -> IResult { let execute_immediate = map( rule! { - EXECUTE ~ IMMEDIATE ~ #code_string + EXECUTE ~ IMMEDIATE ~ #expr }, |(_, _, script)| Statement::ExecuteImmediate(ExecuteImmediateStmt { script }), ); @@ -2556,7 +2543,7 @@ pub fn statement_body(i: Input) -> IResult { }, |(_, _, name, _, opt_args, _)| { Statement::CallProcedure(CallProcedureStmt { - name: name.to_string(), + name, args: opt_args.unwrap_or_default(), }) }, diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index c02cfc468fa77..91338814c8cd5 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -459,10 +459,14 @@ pub enum TokenKind { CONTENT_TYPE, #[token("CONTINUE", ignore(ascii_case))] CONTINUE, + #[token("CURSOR", ignore(ascii_case))] + CURSOR, #[token("CHAR", ignore(ascii_case))] CHAR, #[token("CHECK", ignore(ascii_case))] CHECK, + #[token("CLOSE", ignore(ascii_case))] + CLOSE, #[token("COLUMN", ignore(ascii_case))] COLUMN, #[token("CACHE", ignore(ascii_case))] @@ -633,6 +637,8 @@ pub enum TokenKind { ELSEIF, #[token("FALSE", ignore(ascii_case))] FALSE, + #[token("FETCH", ignore(ascii_case))] + FETCH, #[token("FIELDS", ignore(ascii_case))] FIELDS, #[token("FIELD_DELIMITER", ignore(ascii_case))] @@ -912,6 +918,8 @@ pub enum TokenKind { OFFSET, #[token("ON", ignore(ascii_case))] ON, + #[token("OPEN", ignore(ascii_case))] + OPEN, #[token("ON_CREATE", ignore(ascii_case))] ON_CREATE, #[token("ON_SCHEDULE", ignore(ascii_case))] @@ -1701,7 +1709,7 @@ impl TokenKind { | TokenKind::CREATE | TokenKind::ATTACH | TokenKind::EXCEPT - // | TokenKind::FETCH + | TokenKind::FETCH | TokenKind::FOR | TokenKind::FROM // | TokenKind::GRANT @@ -1833,7 +1841,7 @@ impl TokenKind { | TokenKind::CREATE | TokenKind::ATTACH | TokenKind::EXCEPT - // | TokenKind::FETCH + | TokenKind::FETCH | TokenKind::FROM | TokenKind::GRANT | TokenKind::GROUP diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 9567c4144026e..5c8f6bc437615 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -1581,6 +1581,10 @@ fn test_script() { r#"select :a + 1"#, r#"select IDENTIFIER(:b)"#, r#"select a.IDENTIFIER(:b).c + minus(:d)"#, + r#"EXECUTE TASK IDENTIFIER(:my_task)"#, + r#"DESC TASK IDENTIFIER(:my_task)"#, + r#"CALL IDENTIFIER(:test)(a)"#, + r#"call PROCEDURE IDENTIFIER(:proc_name)()"#, ]; for case in cases { diff --git a/src/query/ast/tests/it/testdata/script.txt b/src/query/ast/tests/it/testdata/script.txt index 489132d2fc24c..c0a7795ab6bbf 100644 --- a/src/query/ast/tests/it/testdata/script.txt +++ b/src/query/ast/tests/it/testdata/script.txt @@ -471,14 +471,16 @@ ForInSet { quote: None, ident_type: None, }, - resultset: Identifier { - span: Some( - 11..20, - ), - name: "resultset", - quote: None, - ident_type: None, - }, + iterable: Resultset( + Identifier { + span: Some( + 11..20, + ), + name: "resultset", + quote: None, + ident_type: None, + }, + ), body: [ Continue { span: Some( @@ -1917,6 +1919,106 @@ RunStatement { } +---------- Input ---------- +EXECUTE TASK IDENTIFIER(:my_task) +---------- Output --------- +EXECUTE TASK IDENTIFIER(:my_task) +---------- AST ------------ +RunStatement { + span: Some( + 0..33, + ), + stmt: ExecuteTask( + ExecuteTaskStmt { + name: Identifier { + span: Some( + 13..33, + ), + name: "my_task", + quote: None, + ident_type: Hole, + }, + }, + ), +} + + +---------- Input ---------- +DESC TASK IDENTIFIER(:my_task) +---------- Output --------- +DESCRIBE TASK IDENTIFIER(:my_task) +---------- AST ------------ +RunStatement { + span: Some( + 0..30, + ), + stmt: DescribeTask( + DescribeTaskStmt { + name: Identifier { + span: Some( + 10..30, + ), + name: "my_task", + quote: None, + ident_type: Hole, + }, + }, + ), +} + + +---------- Input ---------- +CALL IDENTIFIER(:test)(a) +---------- Output --------- +CALL IDENTIFIER(:test)('a') +---------- AST ------------ +RunStatement { + span: Some( + 0..25, + ), + stmt: Call( + CallStmt { + name: Identifier { + span: Some( + 5..22, + ), + name: "test", + quote: None, + ident_type: Hole, + }, + args: [ + "a", + ], + }, + ), +} + + +---------- Input ---------- +call PROCEDURE IDENTIFIER(:proc_name)() +---------- Output --------- +CALL PROCEDURE IDENTIFIER(:proc_name)() +---------- AST ------------ +RunStatement { + span: Some( + 0..39, + ), + stmt: CallProcedure( + CallProcedureStmt { + name: Identifier { + span: Some( + 15..37, + ), + name: "proc_name", + quote: None, + ident_type: Hole, + }, + args: [], + }, + ), +} + + ---------- Input ---------- BEGIN LOOP diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 485295cc1449e..5f2c805672f74 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -18382,7 +18382,14 @@ CALL system$test('a') ---------- AST ------------ Call( CallStmt { - name: "system$test", + name: Identifier { + span: Some( + 5..16, + ), + name: "system$test", + quote: None, + ident_type: None, + }, args: [ "a", ], @@ -18397,7 +18404,14 @@ CALL system$test('a') ---------- AST ------------ Call( CallStmt { - name: "system$test", + name: Identifier { + span: Some( + 5..16, + ), + name: "system$test", + quote: None, + ident_type: None, + }, args: [ "a", ], @@ -24293,7 +24307,14 @@ EXECUTE TASK MyTask ---------- AST ------------ ExecuteTask( ExecuteTaskStmt { - name: "MyTask", + name: Identifier { + span: Some( + 13..19, + ), + name: "MyTask", + quote: None, + ident_type: None, + }, }, ) @@ -24305,7 +24326,14 @@ DESCRIBE TASK MyTask ---------- AST ------------ DescribeTask( DescribeTaskStmt { - name: "MyTask", + name: Identifier { + span: Some( + 10..16, + ), + name: "MyTask", + quote: None, + ident_type: None, + }, }, ) @@ -27030,17 +27058,18 @@ BEGIN END; $$ ---------- Output --------- -EXECUTE IMMEDIATE $$ -BEGIN - LOOP - RETURN 1; - END LOOP; -END; -$$ +EXECUTE IMMEDIATE 'BEGIN\n LOOP\n RETURN 1;\n END LOOP;\nEND;' ---------- AST ------------ ExecuteImmediate( ExecuteImmediateStmt { - script: "BEGIN\n LOOP\n RETURN 1;\n END LOOP;\nEND;", + script: Literal { + span: Some( + 18..75, + ), + value: String( + "BEGIN\n LOOP\n RETURN 1;\n END LOOP;\nEND;", + ), + }, }, ) @@ -27733,7 +27762,14 @@ CALL PROCEDURE p1() ---------- AST ------------ CallProcedure( CallProcedureStmt { - name: "p1", + name: Identifier { + span: Some( + 15..17, + ), + name: "p1", + quote: None, + ident_type: None, + }, args: [], }, ) @@ -27746,7 +27782,14 @@ CALL PROCEDURE p1(1, 'x', '2022-02-02'::DATE) ---------- AST ------------ CallProcedure( CallProcedureStmt { - name: "p1", + name: Identifier { + span: Some( + 15..17, + ), + name: "p1", + quote: None, + ident_type: None, + }, args: [ Literal { span: Some( diff --git a/src/query/script/src/compiler.rs b/src/query/script/src/compiler.rs index 96df51cf964c9..83be81b8b5f59 100644 --- a/src/query/script/src/compiler.rs +++ b/src/query/script/src/compiler.rs @@ -25,6 +25,7 @@ use databend_common_ast::ast::FunctionCall; use databend_common_ast::ast::Identifier; use databend_common_ast::ast::IdentifierType; use databend_common_ast::ast::Indirection; +use databend_common_ast::ast::IterableItem; use databend_common_ast::ast::Literal; use databend_common_ast::ast::Query; use databend_common_ast::ast::ReturnItem; @@ -104,6 +105,56 @@ impl Compiler { )?); self.declare_ref(&declare.name, RefItem::Set(to_set))?; } + ScriptStatement::LetCursor { declare } => { + if let Some(stmt) = &declare.stmt { + // LET cursor CURSOR FOR statement + let to_set = SetRef::new( + declare.name.span, + &declare.name.name, + &mut self.ref_allocator, + ); + output.append(&mut self.compile_sql_statement( + declare.span, + stmt, + to_set.clone(), + )?); + self.declare_ref(&declare.name, RefItem::Cursor(to_set))?; + } else if let Some(resultset) = &declare.resultset { + // LET cursor CURSOR FOR resultset + let resultset_ref = self.lookup_set(resultset)?; + self.declare_ref(&declare.name, RefItem::Cursor(resultset_ref))?; + } + } + ScriptStatement::OpenCursor { cursor, .. } => { + let cursor_set = self.lookup_cursor(cursor)?; + let cursor_iter = + IterRef::new(cursor.span, &cursor.name, &mut self.ref_allocator); + output.push(ScriptIR::Iter { + set: cursor_set, + to_iter: cursor_iter.clone(), + }); + self.declare_ref(cursor, RefItem::Iter(cursor_iter))?; + } + ScriptStatement::FetchCursor { + cursor, into_var, .. + } => { + let cursor_iter = self.lookup_iter(cursor)?; + let to_var = + VarRef::new(into_var.span, &into_var.name, &mut self.ref_allocator); + output.push(ScriptIR::Read { + iter: cursor_iter.clone(), + column: ColumnAccess::Position(0), + to_var: to_var.clone(), + }); + output.push(ScriptIR::Next { iter: cursor_iter }); + self.declare_ref(into_var, RefItem::Var(to_var))?; + } + ScriptStatement::CloseCursor { cursor, .. } => { + // CLOSE cursor - verify the cursor has an active iterator and remove it + let _cursor_iter = self.lookup_iter(cursor)?; + // Remove the iterator from the current scope, keeping the cursor (SetRef) for re-opening + self.remove_cursor_iter(cursor)?; + } ScriptStatement::RunStatement { span, stmt } => { let to_set = SetRef::new_internal(*span, "unused_result", &mut self.ref_allocator); @@ -162,11 +213,30 @@ impl Compiler { ScriptStatement::ForInSet { span, variable, - resultset, + iterable, body, label, } => { - let set = self.lookup_set(resultset)?; + let set = match iterable { + IterableItem::Resultset(name) => { + // Try to look up as a resultset first, then as a cursor + if let Ok(set) = self.lookup_set(name) { + set + } else if let Ok(cursor_set) = self.lookup_cursor(name) { + cursor_set + } else { + return Err(ErrorCode::ScriptSemanticError(format!( + "`{}` is not a resultset or cursor", + name.name + )) + .set_span(name.span)); + } + } + IterableItem::Cursor(cursor) => { + // Explicitly specified as cursor + self.lookup_cursor(cursor)? + } + }; output.append(&mut self.compile_for_in(*span, variable, set, body, label)?); } ScriptStatement::ForInStatement { @@ -774,7 +844,7 @@ impl Compiler { let index = self.compiler.lookup_var(ident); match index { Ok(index) => { - *ident = Identifier::from_name(ident.span, index.to_string()); + *ident = Identifier::from_name(ident.span, index.index.to_string()); ident.ident_type = IdentifierType::Hole; } Err(e) => { @@ -937,6 +1007,34 @@ impl Compiler { Err(ErrorCode::ScriptSemanticError(format!("`{name}` is not defined")).set_span(ident.span)) } + fn lookup_cursor(&self, ident: &Identifier) -> Result { + let RefItem::Cursor(cursor) = self.lookup_ref(ident)? else { + let name = self.normalize_ident(ident); + return Err( + ErrorCode::ScriptSemanticError(format!("`{name}` is not a cursor")) + .set_span(ident.span), + ); + }; + Ok(cursor) + } + + fn remove_cursor_iter(&mut self, cursor_name: &Identifier) -> Result<()> { + let name = self.normalize_ident(cursor_name); + // Find and remove the iterator from the current scope + for scope in self.scopes.iter_mut().rev() { + if scope.items.contains_key(&name) + && matches!(scope.items.get(&name), Some(RefItem::Iter(_))) + { + scope.items.remove(&name); + return Ok(()); + } + } + Err( + ErrorCode::ScriptSemanticError(format!("cursor `{}` is not open", name)) + .set_span(cursor_name.span), + ) + } + fn current_loop(&self, span: Span) -> Result { for scope in self.scopes.iter().rev() { if let Some(loop_item) = &scope.loop_item { @@ -1080,6 +1178,7 @@ enum RefItem { Var(VarRef), Set(SetRef), Iter(IterRef), + Cursor(SetRef), } #[derive(Debug, Clone)] diff --git a/src/query/script/tests/it/testdata/script-error.txt b/src/query/script/tests/it/testdata/script-error.txt index 6bb52166823aa..c76af3a83d98d 100644 --- a/src/query/script/tests/it/testdata/script-error.txt +++ b/src/query/script/tests/it/testdata/script-error.txt @@ -103,7 +103,7 @@ error: | 1 | LET x := 1; 2 | FOR r IN x DO - | ^ `x` is not a set + | ^ `x` is not a resultset or cursor ---------- Input ---------- diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 08eeb507345c4..fbcb9ce85e30e 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -32,6 +32,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; +use databend_common_expression::DataSchemaRef; use databend_common_expression::SendableDataBlockStream; use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_pipeline_core::always_callback; @@ -96,6 +97,10 @@ pub trait Interpreter: Sync + Send { } } + async fn get_dynamic_schema(&self) -> Option { + None + } + async fn execute_inner(&self, ctx: Arc) -> Result { { let mutation_status = ctx.get_mutation_status(); diff --git a/src/query/service/src/interpreters/interpreter_execute_immediate.rs b/src/query/service/src/interpreters/interpreter_execute_immediate.rs index 94799c782d09c..a1ed0f02f9522 100644 --- a/src/query/service/src/interpreters/interpreter_execute_immediate.rs +++ b/src/query/service/src/interpreters/interpreter_execute_immediate.rs @@ -16,20 +16,20 @@ use std::sync::Arc; use databend_common_ast::ast::DeclareItem; use databend_common_ast::ast::ScriptStatement; -use databend_common_ast::parser::run_parser; -use databend_common_ast::parser::script::script_block; -use databend_common_ast::parser::tokenize_sql; -use databend_common_ast::parser::ParseMode; use databend_common_exception::Result; -use databend_common_expression::block_debug::box_render; -use databend_common_expression::types::StringType; +use databend_common_expression::types::DataType; +use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; -use databend_common_expression::FromData; +use databend_common_expression::DataField; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::Scalar; use databend_common_script::compile; use databend_common_script::Executor; use databend_common_script::ReturnValue; use databend_common_sql::plans::ExecuteImmediatePlan; use databend_common_storages_fuse::TableContext; +use tokio::sync::Mutex; use crate::interpreters::util::ScriptClient; use crate::interpreters::Interpreter; @@ -40,11 +40,68 @@ use crate::sessions::QueryContext; pub struct ExecuteImmediateInterpreter { ctx: Arc, plan: ExecuteImmediatePlan, + // schema is only known after execute + state: ProcedureState, +} + +#[derive(Debug)] +pub(crate) struct ProcedureState { + schema: Mutex>, +} + +impl ProcedureState { + pub fn new() -> Self { + Self { + schema: Mutex::new(None), + } + } + + pub async fn get_schema(&self) -> Option { + self.schema.lock().await.clone() + } + + pub async fn set_null_schema(&self) { + let mut w = self.schema.lock().await; + *w = None; + } + + pub fn null_result() -> DataBlock { + DataBlock::new( + vec![BlockEntry::new_const_column( + DataType::String.wrap_nullable(), + Scalar::Null, + 1, + )], + 1, + ) + } + + pub async fn set_scalar_schema(&self, scalar: &Scalar) { + let mut w = self.schema.lock().await; + *w = Some(DataSchemaRefExt::create(vec![DataField::new( + "Result", + scalar.as_ref().infer_data_type(), + )])); + } + + pub fn scalar_result(scalar: Scalar) -> DataBlock { + let typ = scalar.as_ref().infer_data_type(); + DataBlock::new(vec![BlockEntry::new_const_column(typ, scalar, 1)], 1) + } + + pub async fn set_schema(&self, schema: DataSchemaRef) { + let mut w = self.schema.lock().await; + *w = Some(schema); + } } impl ExecuteImmediateInterpreter { pub fn try_create(ctx: Arc, plan: ExecuteImmediatePlan) -> Result { - Ok(ExecuteImmediateInterpreter { ctx, plan }) + Ok(ExecuteImmediateInterpreter { + ctx, + plan, + state: ProcedureState::new(), + }) } } @@ -58,21 +115,15 @@ impl Interpreter for ExecuteImmediateInterpreter { false } + async fn get_dynamic_schema(&self) -> Option { + self.state.get_schema().await + } + #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { let res: Result<_> = try { - let settings = self.ctx.get_settings(); - let sql_dialect = settings.get_sql_dialect()?; - let tokens = tokenize_sql(&self.plan.script)?; - let mut ast = run_parser( - &tokens, - sql_dialect, - ParseMode::Template, - false, - script_block, - )?; - + let mut ast = self.plan.script_block.clone(); let mut src = vec![]; for declare in ast.declares { match declare { @@ -89,32 +140,24 @@ impl Interpreter for ExecuteImmediateInterpreter { ctx: self.ctx.clone(), }; let mut executor = Executor::load(ast.span, client, compiled); + let settings = self.ctx.get_settings(); let script_max_steps = settings.get_script_max_steps()?; let result = executor.run(script_max_steps as usize).await?; match result { Some(ReturnValue::Var(scalar)) => { - PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - StringType::from_data(vec![scalar.to_string()]), - ])])? + self.state.set_scalar_schema(&scalar).await; + let block = ProcedureState::scalar_result(scalar); + PipelineBuildResult::from_blocks(vec![block])? } Some(ReturnValue::Set(set)) => { - let rendered_table = box_render( - &set.schema, - &[set.block.clone()], - usize::MAX, - usize::MAX, - usize::MAX, - true, - )?; - let lines = rendered_table.lines().map(|x| x.to_string()).collect(); - PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - StringType::from_data(lines), - ])])? + self.state.set_schema(set.schema).await; + PipelineBuildResult::from_blocks(vec![set.block])? + } + None => { + self.state.set_null_schema().await; + PipelineBuildResult::from_blocks(vec![ProcedureState::null_result()])? } - None => PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - StringType::from_data(Vec::::new()), - ])])?, } }; diff --git a/src/query/service/src/interpreters/interpreter_procedure_call.rs b/src/query/service/src/interpreters/interpreter_procedure_call.rs index 8e264910c8eb2..fbc59c0b7aadb 100644 --- a/src/query/service/src/interpreters/interpreter_procedure_call.rs +++ b/src/query/service/src/interpreters/interpreter_procedure_call.rs @@ -23,16 +23,14 @@ use databend_common_ast::parser::script::script_block; use databend_common_ast::parser::tokenize_sql; use databend_common_ast::parser::ParseMode; use databend_common_exception::Result; -use databend_common_expression::block_debug::box_render; -use databend_common_expression::types::StringType; -use databend_common_expression::DataBlock; -use databend_common_expression::FromData; +use databend_common_expression::DataSchemaRef; use databend_common_script::compile; use databend_common_script::Executor; use databend_common_script::ReturnValue; use databend_common_sql::plans::CallProcedurePlan; use databend_common_storages_fuse::TableContext; +use crate::interpreters::interpreter_execute_immediate::ProcedureState; use crate::interpreters::util::ScriptClient; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -42,11 +40,16 @@ use crate::sessions::QueryContext; pub struct CallProcedureInterpreter { ctx: Arc, plan: CallProcedurePlan, + state: ProcedureState, } impl CallProcedureInterpreter { pub fn try_create(ctx: Arc, plan: CallProcedurePlan) -> Result { - Ok(CallProcedureInterpreter { ctx, plan }) + Ok(CallProcedureInterpreter { + ctx, + plan, + state: ProcedureState::new(), + }) } } @@ -60,6 +63,10 @@ impl Interpreter for CallProcedureInterpreter { false } + async fn get_dynamic_schema(&self) -> Option { + self.state.get_schema().await + } + #[fastrace::trace] #[async_backtrace::framed] async fn execute2(&self) -> Result { @@ -105,27 +112,17 @@ impl Interpreter for CallProcedureInterpreter { match result { Some(ReturnValue::Var(scalar)) => { - PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - StringType::from_data(vec![scalar.to_string()]), - ])])? + self.state.set_scalar_schema(&scalar).await; + PipelineBuildResult::from_blocks(vec![ProcedureState::scalar_result(scalar)])? } Some(ReturnValue::Set(set)) => { - let rendered_table = box_render( - &set.schema, - &[set.block.clone()], - usize::MAX, - usize::MAX, - usize::MAX, - true, - )?; - let lines = rendered_table.lines().map(|x| x.to_string()).collect(); - PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - StringType::from_data(lines), - ])])? + self.state.set_schema(set.schema).await; + PipelineBuildResult::from_blocks(vec![set.block])? + } + None => { + self.state.set_null_schema().await; + PipelineBuildResult::from_blocks(vec![ProcedureState::null_result()])? } - None => PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - StringType::from_data(Vec::::new()), - ])])?, } }; diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index d5747a750d8d4..488d54b0db5cd 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -251,6 +251,15 @@ impl Executor { } } + pub fn update_schema(this: &Arc>, schema: DataSchemaRef) { + let mut guard = this.lock(); + match &mut guard.state { + Starting(_) => {} + Running(r) => r.schema = QueryResponseField::from_schema(schema), + Stopped(f) => f.schema = QueryResponseField::from_schema(schema), + } + } + pub fn get_query_duration_ms(&self) -> i64 { match &self.state { Starting(ExecuteStarting { ctx, .. }) | Running(ExecuteRunning { ctx, .. }) => { @@ -375,7 +384,9 @@ impl ExecuteState { .await .with_context(make_error)?; let has_result_set = plan.has_result_set(); - let schema = if has_result_set { + // For dynamic schema, we just return empty schema and update it later. + let is_dynamic_schema = plan.is_dynamic_schema(); + let schema = if has_result_set && !is_dynamic_schema { // check has_result_set first for safety QueryResponseField::from_schema(plan.schema()) } else { @@ -397,6 +408,7 @@ impl ExecuteState { let res = Self::pull_and_send( interpreter, + is_dynamic_schema, plan.schema(), ctx_clone, block_sender, @@ -420,6 +432,7 @@ impl ExecuteState { #[fastrace::trace(name = "ExecuteState::pull_and_send")] async fn pull_and_send( interpreter: Arc, + is_dynamic_schema: bool, schema: DataSchemaRef, ctx: Arc, mut sender: Sender, @@ -444,6 +457,11 @@ impl ExecuteState { sender.abort(); } Some(Ok(block)) => { + if is_dynamic_schema { + if let Some(schema) = interpreter.get_dynamic_schema().await { + Executor::update_schema(&executor, schema); + } + } Self::send_data_block(&mut sender, &executor, block) .await .with_context(make_error)?; diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 83d0b85173aa7..a9661a4ed2572 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -426,7 +426,11 @@ impl InteractiveWorkerBase { let has_result_set = plan.has_result_set(); let (blocks, extra_info) = Self::exec_query(interpreter.clone(), &context).await?; - let schema = plan.schema(); + let mut schema = plan.schema(); + if let Some(real_schema) = interpreter.get_dynamic_schema().await { + schema = real_schema; + } + let format = context.get_format_settings()?; Ok(( QueryResult::create( diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 58033fe0b6415..07761bfa20bd5 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -818,8 +818,8 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("enable_experimental_procedure", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Enables the experimental feature for 'PROCEDURE'. In default disable the experimental feature", + value: UserSettingValue::UInt64(1), + desc: "Enables the experimental feature for 'PROCEDURE'. In default enable the experimental feature", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 7f9099690a036..a3e5f052d9cec 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -862,6 +862,25 @@ impl Binder { normalize_identifier(ident, &self.name_resolution_ctx) } + /// Bind an expr to a scalar expression. + /// Used to fold expr to a constant expr. + pub(crate) fn bind_expr( + &mut self, + expr: &databend_common_ast::ast::Expr, + ) -> Result { + let mut temp_ctx = BindContext::new(); + let mut type_checker = TypeChecker::try_create( + &mut temp_ctx, + self.ctx.clone(), + &self.name_resolution_ctx, + self.metadata.clone(), + &[], + false, + )?; + let (scalar, _) = *type_checker.resolve(expr)?; + Ok(scalar) + } + pub(crate) fn opt_hints_set_var( &mut self, bind_context: &mut BindContext, diff --git a/src/query/sql/src/planner/binder/call.rs b/src/query/sql/src/planner/binder/call.rs index 6725b15bb9a21..2064fc6eb79ab 100644 --- a/src/query/sql/src/planner/binder/call.rs +++ b/src/query/sql/src/planner/binder/call.rs @@ -28,13 +28,14 @@ impl Binder { bind_context: &mut BindContext, stmt: &CallStmt, ) -> Result { - let table_function_name = stmt.name.split('$').next_back().unwrap(); + let function_name = stmt.name.to_string(); + let table_function_name = function_name.split('$').next_back().unwrap(); let query = if table_function_name.eq_ignore_ascii_case("search_tables") { if stmt.args.len() != 1 { return Err(ErrorCode::NumberArgumentsNotMatch(format!( "Incorrect number of arguments to function {}. Expected 1, got {}", - stmt.name, + function_name, stmt.args.len() ))); } diff --git a/src/query/sql/src/planner/binder/ddl/procedure.rs b/src/query/sql/src/planner/binder/ddl/procedure.rs index abe35af36b9b5..acd65cad271af 100644 --- a/src/query/sql/src/planner/binder/ddl/procedure.rs +++ b/src/query/sql/src/planner/binder/ddl/procedure.rs @@ -22,9 +22,15 @@ use databend_common_ast::ast::ProcedureIdentity as AstProcedureIdentity; use databend_common_ast::ast::ProcedureLanguage; use databend_common_ast::ast::ProcedureType; use databend_common_ast::ast::ShowOptions; +use databend_common_ast::parser::run_parser; +use databend_common_ast::parser::script::script_block_or_stmt; +use databend_common_ast::parser::script::ScriptBlockOrStmt; +use databend_common_ast::parser::tokenize_sql; +use databend_common_ast::parser::ParseMode; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; +use databend_common_expression::Scalar; use databend_common_meta_app::principal::GetProcedureReq; use databend_common_meta_app::principal::ProcedureIdentity; use databend_common_meta_app::principal::ProcedureMeta; @@ -34,6 +40,7 @@ use databend_common_users::UserApiProvider; use crate::binder::show::get_show_options; use crate::plans::CallProcedurePlan; +use crate::plans::ConstantExpr; use crate::plans::CreateProcedurePlan; use crate::plans::DescProcedurePlan; use crate::plans::DropProcedurePlan; @@ -55,9 +62,42 @@ impl Binder { stmt: &ExecuteImmediateStmt, ) -> Result { let ExecuteImmediateStmt { script } = stmt; - Ok(Plan::ExecuteImmediate(Box::new(ExecuteImmediatePlan { - script: script.clone(), - }))) + let script = self.bind_expr(script)?; + let script = match script { + ScalarExpr::ConstantExpr(ConstantExpr { + value: Scalar::String(value), + .. + }) => value, + _ => { + return Err(ErrorCode::InvalidArgument( + "immediate script must be a string", + )) + } + }; + + let settings = self.ctx.get_settings(); + let sql_dialect = settings.get_sql_dialect()?; + let tokens = tokenize_sql(&script)?; + let ast = run_parser( + &tokens, + sql_dialect, + ParseMode::Template, + false, + script_block_or_stmt, + )?; + + match ast { + ScriptBlockOrStmt::ScriptBlock(script_block) => { + Ok(Plan::ExecuteImmediate(Box::new(ExecuteImmediatePlan { + script_block, + script, + }))) + } + ScriptBlockOrStmt::Statement(stmt) => { + let binder = self.clone(); + binder.bind(&stmt).await + } + } } pub async fn bind_create_procedure(&mut self, stmt: &CreateProcedureStmt) -> Result { @@ -151,6 +191,7 @@ impl Binder { } arg_types.push(arg_type.to_string()); } + let name = name.to_string(); let procedure_ident = ProcedureIdentity::new(name, arg_types.join(",")); let req = GetProcedureReq { inner: ProcedureNameIdent::new(tenant.clone(), procedure_ident.clone()), diff --git a/src/query/sql/src/planner/binder/ddl/task.rs b/src/query/sql/src/planner/binder/ddl/task.rs index 09680987bdcb8..55672b2a8fd03 100644 --- a/src/query/sql/src/planner/binder/ddl/task.rs +++ b/src/query/sql/src/planner/binder/ddl/task.rs @@ -136,7 +136,7 @@ impl Binder { let plan = CreateTaskPlan { create_option: create_option.clone().into(), tenant, - task_name: name.to_string(), + task_name: name.clone(), warehouse: warehouse.clone(), schedule_opts: schedule_opts.clone(), suspend_task_after_num_failures: *suspend_task_after_num_failures, @@ -195,7 +195,7 @@ impl Binder { let plan = AlterTaskPlan { if_exists: *if_exists, tenant, - task_name: name.to_string(), + task_name: name.clone(), alter_options: options.clone(), }; Ok(Plan::AlterTask(Box::new(plan))) @@ -213,7 +213,7 @@ impl Binder { let plan = DropTaskPlan { if_exists: *if_exists, tenant, - task_name: name.to_string(), + task_name: name.clone(), }; Ok(Plan::DropTask(Box::new(plan))) } @@ -223,13 +223,11 @@ impl Binder { &mut self, stmt: &DescribeTaskStmt, ) -> Result { - let DescribeTaskStmt { name } = stmt; - let tenant = self.ctx.get_tenant(); let plan = DescribeTaskPlan { tenant, - task_name: name.to_string(), + task_name: stmt.name.to_string(), }; Ok(Plan::DescribeTask(Box::new(plan))) } @@ -239,13 +237,11 @@ impl Binder { &mut self, stmt: &ExecuteTaskStmt, ) -> Result { - let ExecuteTaskStmt { name } = stmt; - let tenant = self.ctx.get_tenant(); let plan = ExecuteTaskPlan { tenant, - task_name: name.to_string(), + task_name: stmt.name.to_string(), }; Ok(Plan::ExecuteTask(Box::new(plan))) } diff --git a/src/query/sql/src/planner/plans/ddl/procedure.rs b/src/query/sql/src/planner/plans/ddl/procedure.rs index 48e1dd0f453ad..9bdf2b48b5f5a 100644 --- a/src/query/sql/src/planner/plans/ddl/procedure.rs +++ b/src/query/sql/src/planner/plans/ddl/procedure.rs @@ -13,6 +13,7 @@ // limitations under the License. use databend_common_ast::ast::Expr; +use databend_common_ast::ast::ScriptBlock; use databend_common_expression::types::DataType; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; @@ -27,12 +28,16 @@ use databend_common_meta_app::tenant::Tenant; #[derive(Clone, Debug, PartialEq)] pub struct ExecuteImmediatePlan { + pub script_block: ScriptBlock, pub script: String, } impl ExecuteImmediatePlan { pub fn schema(&self) -> DataSchemaRef { - DataSchemaRefExt::create(vec![DataField::new("Result", DataType::String)]) + DataSchemaRefExt::create(vec![DataField::new( + "Result", + DataType::String.wrap_nullable(), + )]) } } diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 9843190d20832..c22570ee71347 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -604,6 +604,13 @@ impl Plan { !self.schema().fields().is_empty() } + pub fn is_dynamic_schema(&self) -> bool { + matches!( + self, + Plan::ExecuteImmediate { .. } | Plan::CallProcedure { .. } + ) + } + pub fn remove_exchange_for_select(&self) -> Self { if let Plan::Query { s_expr, diff --git a/tests/sqllogictests/suites/base/15_procedure/15_0001_execute_immediate.test b/tests/sqllogictests/suites/base/15_procedure/15_0001_execute_immediate.test index 27ab5f52cb8bc..a82bfdba68942 100644 --- a/tests/sqllogictests/suites/base/15_procedure/15_0001_execute_immediate.test +++ b/tests/sqllogictests/suites/base/15_procedure/15_0001_execute_immediate.test @@ -11,6 +11,26 @@ BEGIN END; $$; ---- +NULL + +query I +EXECUTE IMMEDIATE $$ +BEGIN + LET RES RESULTSET := select number % 3 a, number % 4 b from numbers(10); + LET SUM := 0; + CREATE OR REPLACE TABLE abc(a int, b int); + for x in RES do + LET A := x.a; + LET B := x.b; + INSERT INTO abc VALUES (:A, :B); + SUM := SUM + A; + SUM := SUM + B; + end for; + return SUM; +END; +$$; +---- +22 query I EXECUTE IMMEDIATE $$ @@ -63,7 +83,7 @@ BEGIN END; $$; -query I +query IRT EXECUTE IMMEDIATE $$ BEGIN CREATE OR REPLACE TABLE t1 (a INT, b FLOAT, c STRING); @@ -72,12 +92,7 @@ BEGIN END; $$; ---- -┌─────────────────────────────────────────┐ -│ a │ b │ c │ -│ Int32 NULL │ Float32 NULL │ String NULL │ -├────────────┼──────────────┼─────────────┤ -│ 1 │ 2 │ '3' │ -└─────────────────────────────────────────┘ +1 2.0 3 query I EXECUTE IMMEDIATE $$ @@ -89,12 +104,7 @@ BEGIN END; $$; ---- -┌───────┐ -│ 2 + 1 │ -│ UInt8 │ -├───────┤ -│ 3 │ -└───────┘ +3 query I EXECUTE IMMEDIATE $$ @@ -104,12 +114,7 @@ BEGIN END; $$; ---- -┌───────┐ -│ 1 + 1 │ -│ UInt8 │ -├───────┤ -│ 2 │ -└───────┘ +2 query error divided by zero EXECUTE IMMEDIATE $$ @@ -156,6 +161,62 @@ $$; ---- 100 +query I +EXECUTE IMMEDIATE $$ +BEGIN + let x RESULTSET := EXECUTE IMMEDIATE 'select 333'; + RETURN TABLE(x); +END; +$$; +---- +333 + +## cursors + +query I +EXECUTE IMMEDIATE $$ +BEGIN + LET c1 CURSOR FOR SELECT number + 10 FROM numbers(10); + OPEN c1; + FETCH c1 INTO var_for_column_value; + FETCH c1 INTO var_for_column_value; + CLOSE c1; + RETURN var_for_column_value; +END; +$$; +---- +11 + + +query I +EXECUTE IMMEDIATE $$ +BEGIN + LET S RESULTSET := (select number + 10 a, number + 20 b FROM numbers(10)); + LET c1 CURSOR FOR S; + OPEN c1; + FETCH c1 INTO var_for_column_value; + FETCH c1 INTO var_for_column_value; + CLOSE c1; + RETURN var_for_column_value; +END; +$$; +---- +11 + +query I +EXECUTE IMMEDIATE $$ +BEGIN + LET S RESULTSET := (select number + 10 a, number + 20 b FROM numbers(10)); + LET c1 CURSOR FOR S; + LET sum := 0; + for c in c1 do + sum := sum + c.a + c.b; + end for; + RETURN sum; +END; +$$; +---- +390 statement ok drop database test_procedure; diff --git a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test index 2550e58b9483e..368608eb2c587 100644 --- a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test +++ b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test @@ -135,10 +135,6 @@ drop procedure p1(UInt8, UInt8); statement ok drop procedure p1(int); -query T -select count(name) from system.procedures ----- -0 statement ok drop procedure if exists not_exists_p(); @@ -146,11 +142,9 @@ drop procedure if exists not_exists_p(); statement error 3130 drop procedure not_exists_p(); -statement ok -drop procedure if exists sum_even_numbers(Int, Int); statement ok -CREATE PROCEDURE sum_even_numbers(start_val Int, end_val Int) +CREATE OR REPLACE PROCEDURE sum_even_numbers(start_val Int, end_val Int) RETURNS UInt8 NOT NULL LANGUAGE SQL COMMENT='Calculate the sum of all even numbers' @@ -187,8 +181,30 @@ $$; query T call procedure p2('x'); ---- -'x' +x statement ok drop procedure p2(string); + +statement ok +CREATE PROCEDURE if not exists p3(x STRING) RETURNS Int32 NOT NULL LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET y RESULTSET := (select number, number + 3, :x from numbers(5) order by 1, 2); + RETURN TABLE(y); +END; +$$; + +query T +call procedure p3('x'); +---- +0 3 x +1 4 x +2 5 x +3 6 x +4 7 x + +statement ok +drop procedure p3(string); + + diff --git a/tests/sqllogictests/suites/base/15_procedure/15_0009_procedure_call.test b/tests/sqllogictests/suites/base/15_procedure/15_0009_procedure_call.test new file mode 100644 index 0000000000000..7f3a8d08be075 --- /dev/null +++ b/tests/sqllogictests/suites/base/15_procedure/15_0009_procedure_call.test @@ -0,0 +1,58 @@ +statement ok +set global enable_experimental_procedure=1; + +statement ok +CREATE OR REPLACE PROCEDURE p1() RETURNS int not null LANGUAGE SQL AS $$ +BEGIN + RETURN 1; +END; +$$; + +statement ok +CREATE OR REPLACE PROCEDURE p2() RETURNS int not null LANGUAGE SQL AS $$ +BEGIN + LET p := 'p1'; + call procedure IDENTIFIER(:p)(); + RETURN 1; +END; +$$; + +query T +EXECUTE IMMEDIATE $$ +BEGIN + LET p := 'p1'; + LET s RESULTSET := call procedure IDENTIFIER(:p)(); + RETURN TABLE(s); +END; +$$; +---- +1 + +query I +CALL PROCEDURE p2(); +---- +1 + +## returns null if no return stmt is defined +query T +EXECUTE IMMEDIATE $$ +BEGIN + select 11; +END; +$$; +---- +NULL + +query TT +EXECUTE IMMEDIATE $$ +BEGIN + let x RESULTSET := (select number , number + 1 from numbers(5) order by 1, 2); + RETURN TABLE(x); +END; +$$; +---- +0 1 +1 2 +2 3 +3 4 +4 5 diff --git a/tests/suites/0_stateless/18_rbac/18_0017_procedure_rbac.result b/tests/suites/0_stateless/18_rbac/18_0017_procedure_rbac.result index f4d67943b8527..d16e5b87f765e 100644 --- a/tests/suites/0_stateless/18_rbac/18_0017_procedure_rbac.result +++ b/tests/suites/0_stateless/18_rbac/18_0017_procedure_rbac.result @@ -65,7 +65,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege [AccessProcedur 27. Creating greet procedure... 28. Granting dev_role access to greet... 29. dev_user calls greet (should succeed)... -'Hello, Databend!' +Hello, Databend! 30. test_user calls greet (should fail)... Error: APIError: QueryFailed: [1063]Permission denied: privilege [AccessProcedure] is required on PROCEDURE for user 'test_user'@'%' with roles [public,test_role,test_with_access]. Note: Please ensure that your current role have the appropriate permissions to create a new Object === Cleaning up test environment ===