Skip to content

Snowflake: CREATE DYNAMIC TABLE #1960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions src/ast/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use serde::{Deserialize, Serialize};
#[cfg(feature = "visitor")]
use sqlparser_derive::{Visit, VisitMut};

use crate::display_utils::{indented_list, DisplayCommaSeparated, Indent, NewLine, SpaceOrNewline};
use crate::{
ast::{InitializeKind, RefreshModeKind, TableVersion},
display_utils::{indented_list, DisplayCommaSeparated, Indent, NewLine, SpaceOrNewline},
};

pub use super::ddl::{ColumnDef, TableConstraint};

Expand Down Expand Up @@ -135,6 +138,7 @@ pub struct CreateTable {
pub or_replace: bool,
pub temporary: bool,
pub external: bool,
pub dynamic: bool,
pub global: Option<bool>,
pub if_not_exists: bool,
pub transient: bool,
Expand All @@ -155,6 +159,7 @@ pub struct CreateTable {
pub without_rowid: bool,
pub like: Option<ObjectName>,
pub clone: Option<ObjectName>,
pub version: Option<TableVersion>,
// For Hive dialect, the table comment is after the column definitions without `=`,
// so the `comment` field is optional and different than the comment field in the general options list.
// [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable)
Expand Down Expand Up @@ -232,6 +237,21 @@ pub struct CreateTable {
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
/// Snowflake "TARGET_LAG" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub target_lag: Option<String>,
/// Snowflake "WAREHOUSE" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub warehouse: Option<Ident>,
/// Snowflake "REFRESH_MODE" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub refresh_mode: Option<RefreshModeKind>,
/// Snowflake "INITIALIZE" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub initialize: Option<InitializeKind>,
/// Snowflake "REQUIRE USER" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub require_user: bool,
}

impl Display for CreateTable {
Expand All @@ -245,7 +265,7 @@ impl Display for CreateTable {
// `CREATE TABLE t (a INT) AS SELECT a from t2`
write!(
f,
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}TABLE {if_not_exists}{name}",
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{dynamic}{iceberg}TABLE {if_not_exists}{name}",
or_replace = if self.or_replace { "OR REPLACE " } else { "" },
external = if self.external { "EXTERNAL " } else { "" },
global = self.global
Expand All @@ -263,6 +283,7 @@ impl Display for CreateTable {
volatile = if self.volatile { "VOLATILE " } else { "" },
// Only for Snowflake
iceberg = if self.iceberg { "ICEBERG " } else { "" },
dynamic = if self.dynamic { "DYNAMIC " } else { "" },
name = self.name,
)?;
if let Some(on_cluster) = &self.on_cluster {
Expand Down Expand Up @@ -304,6 +325,10 @@ impl Display for CreateTable {
write!(f, " CLONE {c}")?;
}

if let Some(version) = &self.version {
write!(f, " {version}")?;
}

match &self.hive_distribution {
HiveDistributionStyle::PARTITIONED { columns } => {
write!(f, " PARTITIONED BY ({})", display_comma_separated(columns))?;
Expand Down Expand Up @@ -406,27 +431,27 @@ impl Display for CreateTable {
write!(f, " {options}")?;
}
if let Some(external_volume) = self.external_volume.as_ref() {
write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?;
write!(f, " EXTERNAL_VOLUME='{external_volume}'")?;
}

if let Some(catalog) = self.catalog.as_ref() {
write!(f, " CATALOG = '{catalog}'")?;
write!(f, " CATALOG='{catalog}'")?;
}

if self.iceberg {
if let Some(base_location) = self.base_location.as_ref() {
write!(f, " BASE_LOCATION = '{base_location}'")?;
write!(f, " BASE_LOCATION='{base_location}'")?;
}
}

if let Some(catalog_sync) = self.catalog_sync.as_ref() {
write!(f, " CATALOG_SYNC = '{catalog_sync}'")?;
write!(f, " CATALOG_SYNC='{catalog_sync}'")?;
}

if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() {
write!(
f,
" STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}"
" STORAGE_SERIALIZATION_POLICY={storage_serialization_policy}"
)?;
}

Expand Down Expand Up @@ -480,6 +505,26 @@ impl Display for CreateTable {
write!(f, " WITH TAG ({})", display_comma_separated(tag.as_slice()))?;
}

if let Some(target_lag) = &self.target_lag {
write!(f, " TARGET_LAG='{target_lag}'")?;
}

if let Some(warehouse) = &self.warehouse {
write!(f, " WAREHOUSE={warehouse}")?;
}

if let Some(refresh_mode) = &self.refresh_mode {
write!(f, " REFRESH_MODE={refresh_mode}")?;
}

if let Some(initialize) = &self.initialize {
write!(f, " INITIALIZE={initialize}")?;
}

if self.require_user {
write!(f, " REQUIRE USER")?;
}

if self.on_commit.is_some() {
let on_commit = match self.on_commit {
Some(OnCommit::DeleteRows) => "ON COMMIT DELETE ROWS",
Expand Down
90 changes: 70 additions & 20 deletions src/ast/helpers/stmt_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use sqlparser_derive::{Visit, VisitMut};
use super::super::dml::CreateTable;
use crate::ast::{
ClusteredBy, ColumnDef, CommentDef, CreateTableOptions, Expr, FileFormat,
HiveDistributionStyle, HiveFormat, Ident, ObjectName, OnCommit, OneOrManyWithParens, Query,
RowAccessPolicy, Statement, StorageSerializationPolicy, TableConstraint, Tag,
WrappedCollection,
HiveDistributionStyle, HiveFormat, Ident, InitializeKind, ObjectName, OnCommit,
OneOrManyWithParens, Query, RefreshModeKind, RowAccessPolicy, Statement,
StorageSerializationPolicy, TableConstraint, TableVersion, Tag, WrappedCollection,
};

use crate::parser::ParserError;
Expand Down Expand Up @@ -73,6 +73,7 @@ pub struct CreateTableBuilder {
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
pub dynamic: bool,
pub name: ObjectName,
pub columns: Vec<ColumnDef>,
pub constraints: Vec<TableConstraint>,
Expand All @@ -84,6 +85,7 @@ pub struct CreateTableBuilder {
pub without_rowid: bool,
pub like: Option<ObjectName>,
pub clone: Option<ObjectName>,
pub version: Option<TableVersion>,
pub comment: Option<CommentDef>,
pub on_commit: Option<OnCommit>,
pub on_cluster: Option<Ident>,
Expand All @@ -109,6 +111,11 @@ pub struct CreateTableBuilder {
pub catalog_sync: Option<String>,
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
pub table_options: CreateTableOptions,
pub target_lag: Option<String>,
pub warehouse: Option<Ident>,
pub refresh_mode: Option<RefreshModeKind>,
pub initialize: Option<InitializeKind>,
pub require_user: bool,
}

impl CreateTableBuilder {
Expand All @@ -122,6 +129,7 @@ impl CreateTableBuilder {
transient: false,
volatile: false,
iceberg: false,
dynamic: false,
name,
columns: vec![],
constraints: vec![],
Expand All @@ -133,6 +141,7 @@ impl CreateTableBuilder {
without_rowid: false,
like: None,
clone: None,
version: None,
comment: None,
on_commit: None,
on_cluster: None,
Expand All @@ -158,6 +167,11 @@ impl CreateTableBuilder {
catalog_sync: None,
storage_serialization_policy: None,
table_options: CreateTableOptions::None,
target_lag: None,
warehouse: None,
refresh_mode: None,
initialize: None,
require_user: false,
}
}
pub fn or_replace(mut self, or_replace: bool) -> Self {
Expand Down Expand Up @@ -200,6 +214,11 @@ impl CreateTableBuilder {
self
}

pub fn dynamic(mut self, dynamic: bool) -> Self {
self.dynamic = dynamic;
self
}

pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
self.columns = columns;
self
Expand Down Expand Up @@ -249,6 +268,11 @@ impl CreateTableBuilder {
self
}

pub fn version(mut self, version: Option<TableVersion>) -> Self {
self.version = version;
self
}

pub fn comment_after_column_def(mut self, comment: Option<CommentDef>) -> Self {
self.comment = comment;
self
Expand Down Expand Up @@ -383,24 +407,29 @@ impl CreateTableBuilder {
self
}

/// Returns true if the statement has exactly one source of info on the schema of the new table.
/// This is Snowflake-specific, some dialects allow more than one source.
pub(crate) fn validate_schema_info(&self) -> bool {
let mut sources = 0;
if !self.columns.is_empty() {
sources += 1;
}
if self.query.is_some() {
sources += 1;
}
if self.like.is_some() {
sources += 1;
}
if self.clone.is_some() {
sources += 1;
}
pub fn target_lag(mut self, target_lag: Option<String>) -> Self {
self.target_lag = target_lag;
self
}

pub fn warehouse(mut self, warehouse: Option<Ident>) -> Self {
self.warehouse = warehouse;
self
}

sources == 1
pub fn refresh_mode(mut self, refresh_mode: Option<RefreshModeKind>) -> Self {
self.refresh_mode = refresh_mode;
self
}

pub fn initialize(mut self, initialize: Option<InitializeKind>) -> Self {
self.initialize = initialize;
self
}

pub fn require_user(mut self, require_user: bool) -> Self {
self.require_user = require_user;
self
}

pub fn build(self) -> Statement {
Expand All @@ -413,6 +442,7 @@ impl CreateTableBuilder {
transient: self.transient,
volatile: self.volatile,
iceberg: self.iceberg,
dynamic: self.dynamic,
name: self.name,
columns: self.columns,
constraints: self.constraints,
Expand All @@ -424,6 +454,7 @@ impl CreateTableBuilder {
without_rowid: self.without_rowid,
like: self.like,
clone: self.clone,
version: self.version,
comment: self.comment,
on_commit: self.on_commit,
on_cluster: self.on_cluster,
Expand All @@ -449,6 +480,11 @@ impl CreateTableBuilder {
catalog_sync: self.catalog_sync,
storage_serialization_policy: self.storage_serialization_policy,
table_options: self.table_options,
target_lag: self.target_lag,
warehouse: self.warehouse,
refresh_mode: self.refresh_mode,
initialize: self.initialize,
require_user: self.require_user,
})
}
}
Expand All @@ -469,6 +505,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
transient,
volatile,
iceberg,
dynamic,
name,
columns,
constraints,
Expand All @@ -480,6 +517,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
without_rowid,
like,
clone,
version,
comment,
on_commit,
on_cluster,
Expand All @@ -505,13 +543,19 @@ impl TryFrom<Statement> for CreateTableBuilder {
catalog_sync,
storage_serialization_policy,
table_options,
target_lag,
warehouse,
refresh_mode,
initialize,
require_user,
}) => Ok(Self {
or_replace,
temporary,
external,
global,
if_not_exists,
transient,
dynamic,
name,
columns,
constraints,
Expand All @@ -523,6 +567,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
without_rowid,
like,
clone,
version,
comment,
on_commit,
on_cluster,
Expand Down Expand Up @@ -550,6 +595,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
catalog_sync,
storage_serialization_policy,
table_options,
target_lag,
warehouse,
refresh_mode,
initialize,
require_user,
}),
_ => Err(ParserError::ParserError(format!(
"Expected create table statement, but received: {stmt}"
Expand Down
Loading