diff --git a/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts b/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts index 330ad15ea1b1b..df71822d98cc0 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts @@ -367,7 +367,6 @@ export class BigqueryQuery extends BaseQuery { templates.types.decimal = 'BIGDECIMAL({{ precision }},{{ scale }})'; templates.types.binary = 'BYTES'; templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; - templates.join_types.full = 'FULL'; templates.statements.time_series_select = 'SELECT DATETIME(TIMESTAMP(f)) date_from, DATETIME(TIMESTAMP(t)) date_to \n' + 'FROM (\n' + '{% for time_item in seria %}' + diff --git a/packages/cubejs-testing-drivers/fixtures/athena.json b/packages/cubejs-testing-drivers/fixtures/athena.json index 8d1305f70af30..988029d607cbd 100644 --- a/packages/cubejs-testing-drivers/fixtures/athena.json +++ b/packages/cubejs-testing-drivers/fixtures/athena.json @@ -163,6 +163,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "Custom Granularities ", @@ -198,6 +199,7 @@ "Tesseract: querying BigECommerce with Retail Calendar: totalCountRetailWeekAgo", "SQL API: Timeshift measure from cube", "querying BigECommerce: multi-stage group by time dimension", + "querying BigECommerce: SeveralMultiStageMeasures", "querying BigECommerce: rolling window by 2 week", "querying custom granularities ECommerce: count by three_months_by_march + no dimension", diff --git a/packages/cubejs-testing-drivers/fixtures/bigquery.json b/packages/cubejs-testing-drivers/fixtures/bigquery.json index 18b0c0c541776..3f628e10bdda0 100644 --- a/packages/cubejs-testing-drivers/fixtures/bigquery.json +++ b/packages/cubejs-testing-drivers/fixtures/bigquery.json @@ -177,6 +177,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "SKIPPED SQL API (Need work)", @@ -220,6 +221,7 @@ "querying BigECommerce: rolling count_distinct_approx window by 2 week", "querying BigECommerce: rolling count_distinct_approx window by 2 month", "querying BigECommerce: totalProfitYearAgo", + "querying BigECommerce: SeveralMultiStageMeasures", "SQL API: post-aggregate percentage of total", "SQL API: Simple Rollup", "SQL API: Rollup over exprs", diff --git a/packages/cubejs-testing-drivers/fixtures/clickhouse.json b/packages/cubejs-testing-drivers/fixtures/clickhouse.json index 00dad1947ed5c..7649744f2d5e8 100644 --- a/packages/cubejs-testing-drivers/fixtures/clickhouse.json +++ b/packages/cubejs-testing-drivers/fixtures/clickhouse.json @@ -214,6 +214,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "Custom Granularities ", diff --git a/packages/cubejs-testing-drivers/fixtures/databricks-jdbc.json b/packages/cubejs-testing-drivers/fixtures/databricks-jdbc.json index 9bab4cf3deabf..891f07703c780 100644 --- a/packages/cubejs-testing-drivers/fixtures/databricks-jdbc.json +++ b/packages/cubejs-testing-drivers/fixtures/databricks-jdbc.json @@ -230,6 +230,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "Custom Granularities ", @@ -274,6 +275,7 @@ "Tesseract: querying BigECommerce with Retail Calendar: totalCountRetailMonthAgo", "Tesseract: querying BigECommerce with Retail Calendar: totalCountRetailWeekAgo", "Tesseract: SQL API: Timeshift measure from cube", + "querying BigECommerce: SeveralMultiStageMeasures", "---- Different results comparing to baseQuery version. Need to investigate ----", "querying ECommerce: dimensions", diff --git a/packages/cubejs-testing-drivers/fixtures/mssql.json b/packages/cubejs-testing-drivers/fixtures/mssql.json index 8e5d8505cfe88..337e725466bb1 100644 --- a/packages/cubejs-testing-drivers/fixtures/mssql.json +++ b/packages/cubejs-testing-drivers/fixtures/mssql.json @@ -156,6 +156,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "SKIPPED SQL API (Need work)", diff --git a/packages/cubejs-testing-drivers/fixtures/mysql.json b/packages/cubejs-testing-drivers/fixtures/mysql.json index 6d441b63b8f1a..6cf977f9c3424 100644 --- a/packages/cubejs-testing-drivers/fixtures/mysql.json +++ b/packages/cubejs-testing-drivers/fixtures/mysql.json @@ -152,6 +152,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "Custom Granularities ", diff --git a/packages/cubejs-testing-drivers/fixtures/postgres.json b/packages/cubejs-testing-drivers/fixtures/postgres.json index d8128ccbf18c4..50d09f2052adc 100644 --- a/packages/cubejs-testing-drivers/fixtures/postgres.json +++ b/packages/cubejs-testing-drivers/fixtures/postgres.json @@ -176,7 +176,8 @@ "querying BigECommerce: multi-stage group by time dimension", "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", - "querying SwitchSourceTest: filter by switch dimensions" + "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures" ], "tesseractSkip": [ "querying Products: dimensions -- doesn't work wo ordering", diff --git a/packages/cubejs-testing-drivers/fixtures/redshift.json b/packages/cubejs-testing-drivers/fixtures/redshift.json index 8ee72cac43de2..0bdfbe1db1136 100644 --- a/packages/cubejs-testing-drivers/fixtures/redshift.json +++ b/packages/cubejs-testing-drivers/fixtures/redshift.json @@ -188,6 +188,7 @@ "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures", "---------------------------------------", "SKIPPED SQL API (Need work) ", @@ -210,6 +211,7 @@ "querying BigECommerce: rolling window by 2 month without date range", "querying BigECommerce: rolling window YTD without date range", "querying custom granularities ECommerce: count by two_mo_by_feb + no dimension + rollingCountByLeading without date range", + "querying BigECommerce: SeveralMultiStageMeasures", "SQL API: Simple Rollup", "SQL API: Complex Rollup", diff --git a/packages/cubejs-testing-drivers/fixtures/snowflake.json b/packages/cubejs-testing-drivers/fixtures/snowflake.json index 4fc0568576afd..3ce4909af95c0 100644 --- a/packages/cubejs-testing-drivers/fixtures/snowflake.json +++ b/packages/cubejs-testing-drivers/fixtures/snowflake.json @@ -267,7 +267,8 @@ "querying BigECommerce: multi-stage group by time dimension", "querying SwitchSourceTest: simple cross join", "querying SwitchSourceTest: full cross join", - "querying SwitchSourceTest: filter by switch dimensions" + "querying SwitchSourceTest: filter by switch dimensions", + "querying BigECommerce: SeveralMultiStageMeasures" ], "tesseractSkip": [ "for the Customers.RollingExternal", diff --git a/packages/cubejs-testing-drivers/src/tests/testQueries.ts b/packages/cubejs-testing-drivers/src/tests/testQueries.ts index fa41cbd5d4a7f..1c09b2f53178d 100644 --- a/packages/cubejs-testing-drivers/src/tests/testQueries.ts +++ b/packages/cubejs-testing-drivers/src/tests/testQueries.ts @@ -1826,6 +1826,23 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten expect(response.rawData()).toMatchSnapshot(); }); + execute('querying BigECommerce: SeveralMultiStageMeasures', async () => { + const response = await client.load({ + measures: [ + 'BigECommerce.totalProfitYearAgo', + 'BigECommerce.percentageOfTotalForStatus', + 'BigECommerce.totalCountRetailMonthAgo', + 'BigECommerce.count', + ], + timeDimensions: [{ + dimension: 'BigECommerce.orderDate', + granularity: 'month', + dateRange: ['2020-01-01', '2020-12-31'], + }], + }); + expect(response.rawData()).toMatchSnapshot(); + }); + execute('querying BigECommerce: filtering with possible casts', async () => { const response = await client.load({ measures: [ diff --git a/packages/cubejs-testing-drivers/test/__snapshots__/postgres-full.test.ts.snap b/packages/cubejs-testing-drivers/test/__snapshots__/postgres-full.test.ts.snap index 3520b1f90d585..7adfa7770f980 100644 --- a/packages/cubejs-testing-drivers/test/__snapshots__/postgres-full.test.ts.snap +++ b/packages/cubejs-testing-drivers/test/__snapshots__/postgres-full.test.ts.snap @@ -15545,6 +15545,99 @@ Array [ ] `; +exports[`Queries with the @cubejs-backend/postgres-driver querying BigECommerce: SeveralMultiStageMeasures 1`] = ` +Array [ + Object { + "BigECommerce.count": "2", + "BigECommerce.orderDate": "2020-01-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-01-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": null, + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "1", + "BigECommerce.orderDate": "2020-02-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-02-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "2", + "BigECommerce.orderDate": "2020-03-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-03-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "1", + "BigECommerce.orderDate": "2020-04-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-04-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "5", + "BigECommerce.orderDate": "2020-05-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-05-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "7", + "BigECommerce.orderDate": "2020-06-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-06-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "5", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": null, + "BigECommerce.orderDate": "2020-07-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-07-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": null, + "BigECommerce.totalCountRetailMonthAgo": "7", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "6", + "BigECommerce.orderDate": "2020-09-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-09-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": null, + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "4", + "BigECommerce.orderDate": "2020-10-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-10-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "6", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "9", + "BigECommerce.orderDate": "2020-11-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-11-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "4", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "7", + "BigECommerce.orderDate": "2020-12-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-12-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "9", + "BigECommerce.totalProfitYearAgo": null, + }, +] +`; + exports[`Queries with the @cubejs-backend/postgres-driver querying BigECommerce: filtering with possible casts 1`] = ` Array [ Object { diff --git a/packages/cubejs-testing-drivers/test/__snapshots__/snowflake-full.test.ts.snap b/packages/cubejs-testing-drivers/test/__snapshots__/snowflake-full.test.ts.snap index c9de48eb41de1..f970e917cab33 100644 --- a/packages/cubejs-testing-drivers/test/__snapshots__/snowflake-full.test.ts.snap +++ b/packages/cubejs-testing-drivers/test/__snapshots__/snowflake-full.test.ts.snap @@ -15659,6 +15659,99 @@ Array [ ] `; +exports[`Queries with the @cubejs-backend/snowflake-driver querying BigECommerce: SeveralMultiStageMeasures 1`] = ` +Array [ + Object { + "BigECommerce.count": "2", + "BigECommerce.orderDate": "2020-01-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-01-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": null, + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "1", + "BigECommerce.orderDate": "2020-02-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-02-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "2", + "BigECommerce.orderDate": "2020-03-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-03-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "1", + "BigECommerce.orderDate": "2020-04-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-04-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "5", + "BigECommerce.orderDate": "2020-05-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-05-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "7", + "BigECommerce.orderDate": "2020-06-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-06-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "5", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": null, + "BigECommerce.orderDate": "2020-07-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-07-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": null, + "BigECommerce.totalCountRetailMonthAgo": "7", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "6", + "BigECommerce.orderDate": "2020-09-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-09-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": null, + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "4", + "BigECommerce.orderDate": "2020-10-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-10-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "6", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "9", + "BigECommerce.orderDate": "2020-11-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-11-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "4", + "BigECommerce.totalProfitYearAgo": null, + }, + Object { + "BigECommerce.count": "7", + "BigECommerce.orderDate": "2020-12-01T00:00:00.000", + "BigECommerce.orderDate.month": "2020-12-01T00:00:00.000", + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "9", + "BigECommerce.totalProfitYearAgo": null, + }, +] +`; + exports[`Queries with the @cubejs-backend/snowflake-driver querying BigECommerce: filtering with possible casts 1`] = ` Array [ Object { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs index 42b5117e80995..ae0aa72f637c9 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs @@ -2,6 +2,7 @@ use super::context::PushDownBuilderContext; use super::{LogicalNodeProcessor, ProcessableNode}; use crate::logical_plan::*; use crate::physical_plan_builder::context::MultiStageDimensionContext; +use crate::plan::join::JoinType; use crate::plan::schema::QualifiedColumnName; use crate::plan::*; use crate::planner::query_properties::OrderByItem; @@ -39,6 +40,10 @@ impl PhysicalPlanBuilder { (&self.query_tools, &self.plan_sql_templates) } + pub(super) fn templates(&self) -> &PlanSqlTemplates { + &self.plan_sql_templates + } + pub(super) fn process_node( &self, logical_node: &T, @@ -214,4 +219,59 @@ impl PhysicalPlanBuilder { } Ok(result) } + + pub(super) fn process_query_dimension( + &self, + dimension: &Rc, + references_builder: &ReferencesBuilder, + select_builder: &mut SelectBuilder, + context_factory: &mut SqlNodesFactory, + context: &PushDownBuilderContext, + ) -> Result<(), CubeError> { + if let Some(coalesce_ref) = self.dimension_coalesce_refs(dimension, select_builder.from()) { + select_builder.add_projection_coalesce_member(dimension, coalesce_ref, None)?; + } else { + references_builder.resolve_references_for_member( + dimension.clone(), + &None, + context_factory.render_references_mut(), + )?; + if context.measure_subquery { + select_builder.add_projection_member_without_schema(dimension, None); + } else { + select_builder.add_projection_member(dimension, None); + } + } + Ok(()) + } + + fn dimension_coalesce_refs( + &self, + dimension: &Rc, + from: &Rc, + ) -> Option> { + match &from.source { + FromSource::Join(join) => { + if join.joins.iter().any(|i| i.join_type == JoinType::Full) { + let mut result = vec![]; + let dim_alias = join.root.source.schema().resolve_member_alias(dimension); + result.push(QualifiedColumnName::new( + Some(join.root.alias.clone()), + dim_alias, + )); + for item in join.joins.iter() { + let dim_alias = item.from.source.schema().resolve_member_alias(dimension); + result.push(QualifiedColumnName::new( + Some(item.from.alias.clone()), + dim_alias, + )); + } + Some(result) + } else { + None + } + } + _ => None, + } + } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/full_join_aggregate_strategy.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/full_join_aggregate_strategy.rs new file mode 100644 index 0000000000000..30a512428c88a --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/full_join_aggregate_strategy.rs @@ -0,0 +1,191 @@ +use super::FullKeyAggregateStrategy; +use crate::logical_plan::{FullKeyAggregate, LogicalJoin, ResolvedMultipliedMeasures}; +use crate::physical_plan_builder::PhysicalPlanBuilder; +use crate::physical_plan_builder::PushDownBuilderContext; +use crate::plan::{ + Expr, From, FromSource, JoinBuilder, JoinCondition, QualifiedColumnName, Select, SelectBuilder, + SingleAliasedSource, +}; +use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; +use crate::planner::sql_evaluator::{MemberSymbol, ReferencesBuilder}; +use cubenativeutils::CubeError; +use itertools::Itertools; +use std::rc::Rc; + +pub(super) struct FullJoinFullKeyAggregateStrategy<'a> { + builder: &'a PhysicalPlanBuilder, +} + +impl<'a> FullJoinFullKeyAggregateStrategy<'a> { + pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc { + Rc::new(Self { builder }) + } + + fn full_join( + &self, + left_source: Rc, + dimensions: &Vec>, + ) -> Result, CubeError> { + let left_alias = "q_l".to_string(); + let right_alias = "q_r".to_string(); + let mut join_builder = + JoinBuilder::new_from_subselect(left_source.clone(), left_alias.clone()); + + let conditions = dimensions + .iter() + .map(|dim| -> Result<_, CubeError> { + let alias_in_left_query = left_source.schema().resolve_member_alias(dim); + let left_query_ref = Expr::Reference(QualifiedColumnName::new( + Some(left_alias.clone()), + alias_in_left_query, + )); + let alias_in_right_query = right_source.schema().resolve_member_alias(dim); + let right_query_ref = Expr::Reference(QualifiedColumnName::new( + Some(right_alias.clone()), + alias_in_right_query, + )); + + Ok(vec![(left_query_ref, right_query_ref)]) + }) + .collect::, _>>()?; + + join_builder.full_join_subselect( + right_source.clone(), + right_alias.clone(), + JoinCondition::new_dimension_join(conditions, true), + ); + let result = join_builder.build(); + Ok(From::new_from_join(result)) + } + + fn select_over_join_pair( + &self, + from: Rc, + dimensions: &Vec>, + measures: &Vec>, + context: &PushDownBuilderContext, + ) -> Result, CubeError> { + let query_tools = self.builder.query_tools(); + let mut context_factory = SqlNodesFactory::new(); + let references_builder = ReferencesBuilder::new(from.clone()); + let mut select_builder = SelectBuilder::new(from); + for dimension in dimensions.iter() { + self.builder.process_query_dimension( + dimension, + &references_builder, + &mut select_builder, + &mut context_factory, + &context, + )?; + } + + for measure in measures { + references_builder.resolve_references_for_member( + measure.clone(), + &None, + context_factory.render_references_mut(), + )?; + select_builder.add_projection_member(&measure, None); + } + let res = Rc::new(select_builder.build(query_tools.clone(), context_factory)); + Ok(res) + } +} + +impl FullKeyAggregateStrategy for FullJoinFullKeyAggregateStrategy<'_> { + fn process( + &self, + full_key_aggregate: &FullKeyAggregate, + context: &PushDownBuilderContext, + ) -> Result, CubeError> { + let query_tools = self.builder.query_tools(); + let mut data_queries = vec![]; + if let Some(resolved_multiplied_measures) = + full_key_aggregate.multiplied_measures_resolver() + { + match resolved_multiplied_measures { + ResolvedMultipliedMeasures::ResolveMultipliedMeasures( + resolve_multiplied_measures, + ) => { + for regular_measure_query in resolve_multiplied_measures + .regular_measure_subqueries + .iter() + { + let query = self + .builder + .process_node(regular_measure_query.as_ref(), &context)?; + data_queries.push((query, regular_measure_query.schema().measures.clone())); + } + for multiplied_measure_query in resolve_multiplied_measures + .aggregate_multiplied_subqueries + .iter() + { + let query = self + .builder + .process_node(multiplied_measure_query.as_ref(), &context)?; + data_queries + .push((query, multiplied_measure_query.schema.measures.clone())); + } + } + ResolvedMultipliedMeasures::PreAggregation(pre_agg_query) => { + let query = self + .builder + .process_node(pre_agg_query.as_ref(), &context)?; + data_queries.push((query, pre_agg_query.schema().measures.clone())); + } + } + } + + for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() { + let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?; + let multi_stage_source = SingleAliasedSource::new_from_table_reference( + multi_stage_ref.name().clone(), + multi_stage_schema.clone(), + None, + ); + let sql_context = SqlNodesFactory::new(); + + let data_select_builder = + SelectBuilder::new(From::new(FromSource::Single(multi_stage_source))); + let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context)); + data_queries.push((data_select, multi_stage_ref.symbols().clone())); + } + + if data_queries.is_empty() { + let empty_join = LogicalJoin::builder().build(); + return self.builder.process_node(&empty_join, context); + } + + if data_queries.len() == 1 { + let (select, _) = data_queries[0].clone(); + let result = From::new_from_subselect(select, "fk_aggregate".to_string()); + return Ok(result); + } + + let dimensions = full_key_aggregate + .schema() + .all_dimensions() + .cloned() + .collect_vec(); + let mut measures = vec![]; + + let mut queries_iter = data_queries.into_iter(); + let (left_query, mut query_measures) = queries_iter.next().unwrap(); + measures.append(&mut query_measures); + let (right_query, mut query_measures) = queries_iter.next().unwrap(); + measures.append(&mut query_measures); + let mut result = self.full_join(left_query, right_query, &dimensions)?; + for (query, mut query_measures) in queries_iter { + let left_query = self.select_over_join_pair(result, &dimensions, &measures, context)?; + result = self.full_join(left_query, query, &dimensions)?; + measures.append(&mut query_measures); + } + let result_query = self.select_over_join_pair(result, &dimensions, &measures, context)?; + + Ok(From::new_from_subselect( + result_query, + "full_aggregate".to_string(), + )) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/inner_join_aggregate_strategy.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/inner_join_aggregate_strategy.rs new file mode 100644 index 0000000000000..24129e2b15fcc --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/inner_join_aggregate_strategy.rs @@ -0,0 +1,127 @@ +use super::FullKeyAggregateStrategy; +use crate::logical_plan::{FullKeyAggregate, LogicalJoin, ResolvedMultipliedMeasures}; +use crate::physical_plan_builder::PhysicalPlanBuilder; +use crate::physical_plan_builder::PushDownBuilderContext; +use crate::plan::{ + Expr, From, FromSource, JoinBuilder, JoinCondition, QualifiedColumnName, SelectBuilder, + SingleAliasedSource, +}; +use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; +use cubenativeutils::CubeError; +use std::rc::Rc; + +pub(super) struct InnerJoinFullKeyAggregateStrategy<'a> { + builder: &'a PhysicalPlanBuilder, +} + +impl<'a> InnerJoinFullKeyAggregateStrategy<'a> { + pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc { + Rc::new(Self { builder }) + } +} + +impl FullKeyAggregateStrategy for InnerJoinFullKeyAggregateStrategy<'_> { + fn process( + &self, + full_key_aggregate: &FullKeyAggregate, + context: &PushDownBuilderContext, + ) -> Result, CubeError> { + let query_tools = self.builder.query_tools(); + let mut data_queries = vec![]; + if let Some(resolved_multiplied_measures) = + full_key_aggregate.multiplied_measures_resolver() + { + match resolved_multiplied_measures { + ResolvedMultipliedMeasures::ResolveMultipliedMeasures( + resolve_multiplied_measures, + ) => { + for regular_measure_query in resolve_multiplied_measures + .regular_measure_subqueries + .iter() + { + let query = self + .builder + .process_node(regular_measure_query.as_ref(), &context)?; + data_queries.push(query); + } + for multiplied_measure_query in resolve_multiplied_measures + .aggregate_multiplied_subqueries + .iter() + { + let query = self + .builder + .process_node(multiplied_measure_query.as_ref(), &context)?; + data_queries.push(query); + } + } + ResolvedMultipliedMeasures::PreAggregation(pre_agg_query) => { + let query = self + .builder + .process_node(pre_agg_query.as_ref(), &context)?; + data_queries.push(query); + } + } + } + + for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() { + let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?; + let multi_stage_source = SingleAliasedSource::new_from_table_reference( + multi_stage_ref.name().clone(), + multi_stage_schema.clone(), + None, + ); + let sql_context = SqlNodesFactory::new(); + + let data_select_builder = + SelectBuilder::new(From::new(FromSource::Single(multi_stage_source))); + let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context)); + data_queries.push(data_select); + } + + if data_queries.is_empty() { + let empty_join = LogicalJoin::builder().build(); + return self.builder.process_node(&empty_join, context); + } + + if data_queries.len() == 1 { + let select = data_queries[0].clone(); + let result = From::new_from_subselect(select, "fk_aggregate".to_string()); + return Ok(result); + } + + let mut join_builder = + JoinBuilder::new_from_subselect(data_queries[0].clone(), "q_0".to_string()); + + for (i, query) in data_queries.iter().skip(1).enumerate() { + let prev_alias = format!("q_{}", i); + let query_alias = format!("q_{}", i + 1); + let conditions = full_key_aggregate + .schema() + .all_dimensions() + .map(|dim| -> Result<_, CubeError> { + let alias_in_prev_query = data_queries[i].schema().resolve_member_alias(dim); + let prev_query_ref = Expr::Reference(QualifiedColumnName::new( + Some(prev_alias.clone()), + alias_in_prev_query, + )); + let alias_in_data_query = query.schema().resolve_member_alias(dim); + let data_query_ref = Expr::Reference(QualifiedColumnName::new( + Some(query_alias.clone()), + alias_in_data_query, + )); + + Ok(vec![(prev_query_ref, data_query_ref)]) + }) + .collect::, _>>()?; + + join_builder.inner_join_subselect( + query.clone(), + query_alias.clone(), + JoinCondition::new_dimension_join(conditions, true), + ); + } + + let result = join_builder.build(); + Ok(From::new_from_join(result)) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs similarity index 56% rename from rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate.rs rename to rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs index 38c240365260d..b893fae36e6c4 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/keys_aggregate_strategy.rs @@ -1,6 +1,7 @@ -use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; +use super::FullKeyAggregateStrategy; use crate::logical_plan::{FullKeyAggregate, LogicalJoin, ResolvedMultipliedMeasures}; use crate::physical_plan_builder::PhysicalPlanBuilder; +use crate::physical_plan_builder::PushDownBuilderContext; use crate::plan::{ Expr, From, FromSource, JoinBuilder, JoinCondition, QualifiedColumnName, SelectBuilder, SingleAliasedSource, Union, @@ -10,15 +11,7 @@ use crate::planner::sql_evaluator::ReferencesBuilder; use cubenativeutils::CubeError; use std::rc::Rc; -trait FullKeyAggregateStrategy { - fn process( - &self, - full_key_aggregate: &FullKeyAggregate, - context: &PushDownBuilderContext, - ) -> Result, CubeError>; -} - -struct KeysFullKeyAggregateStrategy<'a> { +pub(super) struct KeysFullKeyAggregateStrategy<'a> { builder: &'a PhysicalPlanBuilder, } @@ -181,148 +174,3 @@ impl FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'_> { Ok(From::new_from_join(result)) } } - -struct InnerJoinFullKeyAggregateStrategy<'a> { - builder: &'a PhysicalPlanBuilder, -} - -impl<'a> InnerJoinFullKeyAggregateStrategy<'a> { - pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc { - Rc::new(Self { builder }) - } -} - -impl FullKeyAggregateStrategy for InnerJoinFullKeyAggregateStrategy<'_> { - fn process( - &self, - full_key_aggregate: &FullKeyAggregate, - context: &PushDownBuilderContext, - ) -> Result, CubeError> { - let query_tools = self.builder.query_tools(); - let mut data_queries = vec![]; - if let Some(resolved_multiplied_measures) = - full_key_aggregate.multiplied_measures_resolver() - { - match resolved_multiplied_measures { - ResolvedMultipliedMeasures::ResolveMultipliedMeasures( - resolve_multiplied_measures, - ) => { - for regular_measure_query in resolve_multiplied_measures - .regular_measure_subqueries - .iter() - { - let query = self - .builder - .process_node(regular_measure_query.as_ref(), &context)?; - data_queries.push(query); - } - for multiplied_measure_query in resolve_multiplied_measures - .aggregate_multiplied_subqueries - .iter() - { - let query = self - .builder - .process_node(multiplied_measure_query.as_ref(), &context)?; - data_queries.push(query); - } - } - ResolvedMultipliedMeasures::PreAggregation(pre_agg_query) => { - let query = self - .builder - .process_node(pre_agg_query.as_ref(), &context)?; - data_queries.push(query); - } - } - } - - for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() { - let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?; - let multi_stage_source = SingleAliasedSource::new_from_table_reference( - multi_stage_ref.name().clone(), - multi_stage_schema.clone(), - None, - ); - let sql_context = SqlNodesFactory::new(); - - let data_select_builder = - SelectBuilder::new(From::new(FromSource::Single(multi_stage_source))); - let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context)); - data_queries.push(data_select); - } - - if data_queries.is_empty() { - let empty_join = LogicalJoin::builder().build(); - return self.builder.process_node(&empty_join, context); - } - - if data_queries.len() == 1 { - let select = data_queries[0].clone(); - let result = From::new_from_subselect(select, "fk_aggregate".to_string()); - return Ok(result); - } - - let mut join_builder = - JoinBuilder::new_from_subselect(data_queries[0].clone(), "q_0".to_string()); - - for (i, query) in data_queries.iter().skip(1).enumerate() { - let prev_alias = format!("q_{}", i); - let query_alias = format!("q_{}", i + 1); - let conditions = full_key_aggregate - .schema() - .all_dimensions() - .map(|dim| -> Result<_, CubeError> { - let alias_in_prev_query = data_queries[i].schema().resolve_member_alias(dim); - let prev_query_ref = Expr::Reference(QualifiedColumnName::new( - Some(prev_alias.clone()), - alias_in_prev_query, - )); - let alias_in_data_query = query.schema().resolve_member_alias(dim); - let data_query_ref = Expr::Reference(QualifiedColumnName::new( - Some(query_alias.clone()), - alias_in_data_query, - )); - - Ok(vec![(prev_query_ref, data_query_ref)]) - }) - .collect::, _>>()?; - - join_builder.inner_join_subselect( - query.clone(), - query_alias.clone(), - JoinCondition::new_dimension_join(conditions, true), - ); - } - - let result = join_builder.build(); - Ok(From::new_from_join(result)) - } -} - -pub struct FullKeyAggregateProcessor<'a> { - builder: &'a PhysicalPlanBuilder, -} - -impl<'a> LogicalNodeProcessor<'a, FullKeyAggregate> for FullKeyAggregateProcessor<'a> { - type PhysycalNode = Rc; - fn new(builder: &'a PhysicalPlanBuilder) -> Self { - Self { builder } - } - - fn process( - &self, - full_key_aggregate: &FullKeyAggregate, - context: &PushDownBuilderContext, - ) -> Result { - let strategy: Rc = - if full_key_aggregate.schema().has_dimensions() { - KeysFullKeyAggregateStrategy::new(self.builder) - } else { - InnerJoinFullKeyAggregateStrategy::new(self.builder) - }; - strategy.process(full_key_aggregate, context) - } -} - -impl ProcessableNode for FullKeyAggregate { - type ProcessorType<'a> = FullKeyAggregateProcessor<'a>; -} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs new file mode 100644 index 0000000000000..70cd25a3943ca --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/full_key_aggregate/mod.rs @@ -0,0 +1,52 @@ +mod full_join_aggregate_strategy; +mod inner_join_aggregate_strategy; +mod keys_aggregate_strategy; + +use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; +use crate::logical_plan::FullKeyAggregate; +use crate::physical_plan_builder::PhysicalPlanBuilder; +use crate::plan::From; +use cubenativeutils::CubeError; +use full_join_aggregate_strategy::FullJoinFullKeyAggregateStrategy; +use inner_join_aggregate_strategy::InnerJoinFullKeyAggregateStrategy; +use keys_aggregate_strategy::KeysFullKeyAggregateStrategy; +use std::rc::Rc; + +trait FullKeyAggregateStrategy { + fn process( + &self, + full_key_aggregate: &FullKeyAggregate, + context: &PushDownBuilderContext, + ) -> Result, CubeError>; +} + +pub struct FullKeyAggregateProcessor<'a> { + builder: &'a PhysicalPlanBuilder, +} + +impl<'a> LogicalNodeProcessor<'a, FullKeyAggregate> for FullKeyAggregateProcessor<'a> { + type PhysycalNode = Rc; + fn new(builder: &'a PhysicalPlanBuilder) -> Self { + Self { builder } + } + + fn process( + &self, + full_key_aggregate: &FullKeyAggregate, + context: &PushDownBuilderContext, + ) -> Result { + let strategy: Rc = + if !full_key_aggregate.schema().has_dimensions() { + InnerJoinFullKeyAggregateStrategy::new(self.builder) + } else if self.builder.templates().supports_full_join() { + FullJoinFullKeyAggregateStrategy::new(self.builder) + } else { + KeysFullKeyAggregateStrategy::new(self.builder) + }; + strategy.process(full_key_aggregate, context) + } +} + +impl ProcessableNode for FullKeyAggregate { + type ProcessorType<'a> = FullKeyAggregateProcessor<'a>; +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs index 53937c47c8f5d..fef3535028474 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/query.rs @@ -137,17 +137,14 @@ impl<'a> LogicalNodeProcessor<'a, Query> for QueryProcessor<'a> { select_builder.set_ctes(ctes); context_factory.set_ungrouped(logical_plan.modifers().ungrouped); - for member in logical_plan.schema().all_dimensions() { - references_builder.resolve_references_for_member( - member.clone(), - &None, - context_factory.render_references_mut(), + for dimension in logical_plan.schema().all_dimensions() { + self.builder.process_query_dimension( + dimension, + &references_builder, + &mut select_builder, + &mut context_factory, + &context, )?; - if context.measure_subquery { - select_builder.add_projection_member_without_schema(member, None); - } else { - select_builder.add_projection_member(member, None); - } } for (measure, exists) in self diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs index b86688efd9d63..aa8d38b6a9753 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs @@ -279,6 +279,10 @@ impl SelectBuilder { self.ctes = ctes; } + pub fn from(&self) -> &Rc { + &self.from + } + pub fn make_cube_references(from: Rc) -> HashMap { let mut refs = HashMap::new(); match &from.source { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs index e3c64d746d3cd..7be8764434f4c 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs @@ -296,6 +296,7 @@ pub struct Join { pub joins: Vec, } +#[derive(Clone, PartialEq, Eq)] pub enum JoinType { Inner, Left,