Description
Describe the bug
When creating the unnest logical plan directly, the projection push down optimiser does not eliminate unused columns.
Example plan :
CREATE TABLE foo (list_column ARRAY, int_a INTEGER, int_b INTEGER) AS VALUES (MAKE_ARRAY(1, 2, 3), 1,1);
DESCRIBE SELECT unnest(list_column), int_a FROM foo;
From within a SQL context, the projection pushdown is working correctly and eliminates the "int_b" column on the table scan
+---------------+---------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: __unnest_placeholder(foo.list_column,depth=1) AS UNNEST(foo.list_column), foo.int_a |
| | Unnest: lists[__unnest_placeholder(foo.list_column)|depth=1] structs[] |
| | Projection: foo.list_column AS __unnest_placeholder(foo.list_column), foo.int_a |
| | TableScan: foo projection=[list_column, int_a] |
| physical_plan | ProjectionExec: expr=[__unnest_placeholder(foo.list_column,depth=1)@0 as UNNEST(foo.list_column), int_a@1 as int_a] |
| | UnnestExec |
| | ProjectionExec: expr=[list_column@0 as __unnest_placeholder(foo.list_column), int_a@1 as int_a] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------+
Creating a logical plan directly via for example the LogicalPlanBuilder :
#[tokio::test]
async fn test_unnest_push_down_projection_via_logical_plan() -> Result<(), Box<dyn Error>> {
let ctx = SessionContext::new();
let df = ctx.sql("CREATE TABLE foo (list_column ARRAY<INTEGER>, int_a INTEGER, int_b INTEGER) AS VALUES (MAKE_ARRAY(1, 2, 3), 1,1);").await?;
let batches = df.collect().await?;
pretty::print_batches(&batches)?;
let table_source = provider_as_source(ctx.table_provider("foo").await?);
let builder = LogicalPlanBuilder::scan("foo", table_source, None)?;
let builder = builder.unnest_column(Column::from("list_column"))?;
let builder = builder.project(vec![col("list_column"), col("int_a")])?;
let plan = builder.build()?;
let plan = ctx.state().optimize(&plan)?;
println!("{}", plan);
Ok(())
}
boils down to the following optimized logical plan :
Projection: list_column, foo.int_a
Unnest: lists[foo.list_column|depth=1] structs[]
TableScan: foo projection=[list_column, int_a, int_b]
=> Column "int_b" is not projected away on the table scan.
To Reproduce
#[tokio::test]
async fn test_unnest_push_down_projection_via_logical_plan() -> Result<(), Box<dyn Error>> {
let ctx = SessionContext::new();
let df = ctx.sql("CREATE TABLE foo (list_column ARRAY<INTEGER>, int_a INTEGER, int_b INTEGER) AS VALUES (MAKE_ARRAY(1, 2, 3), 1,1);").await?;
let batches = df.collect().await?;
pretty::print_batches(&batches)?;
let table_source = provider_as_source(ctx.table_provider("foo").await?);
let builder = LogicalPlanBuilder::scan("foo", table_source, None)?;
let builder = builder.unnest_column(Column::from("list_column"))?;
let builder = builder.project(vec![col("list_column"), col("int_a")])?;
let plan = builder.build()?;
let plan = ctx.state().optimize(&plan)?;
println!("{}", plan);
Ok(())
}
Expected behavior
Even when creating an unnest logica plan directly, it should project away unneeded columns.
Additional context
It seems the projection is a one time effort currently done on the level of SQL to logical plan translation.
See select.rs:
let plan = LogicalPlanBuilder::from(intermediate_plan) .project(inner_projection_exprs)? .unnest_columns_with_options(unnest_col_vec, unnest_options)? .build()?;
whereas in the actual optimization rule, there is no evaluation anymore for the unnest logical plan.
optimize_projections/mod.rs:
LogicalPlan::Unnest(Unnest {
dependency_indices, ..
}) => {
vec![RequiredIndices::new_from_indices(
dependency_indices.clone(),
)]
}