Skip to content

Unnest logical plan lacks decent projection push down #16623

Open
@bert-beyondloops

Description

@bert-beyondloops

Describe the bug

When creating the unnest logical plan directly, the projection push down optimiser does not eliminate unused columns.

Example plan :

CREATE TABLE foo (list_column ARRAY, int_a INTEGER, int_b INTEGER) AS VALUES (MAKE_ARRAY(1, 2, 3), 1,1);

DESCRIBE SELECT unnest(list_column), int_a FROM foo;

From within a SQL context, the projection pushdown is working correctly and eliminates the "int_b" column on the table scan

+---------------+---------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                |
+---------------+---------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: __unnest_placeholder(foo.list_column,depth=1) AS UNNEST(foo.list_column), foo.int_a                     |
|               |   Unnest: lists[__unnest_placeholder(foo.list_column)|depth=1] structs[]                                            |
|               |     Projection: foo.list_column AS __unnest_placeholder(foo.list_column), foo.int_a                                 |
|               |       TableScan: foo projection=[list_column, int_a]                                                                |
| physical_plan | ProjectionExec: expr=[__unnest_placeholder(foo.list_column,depth=1)@0 as UNNEST(foo.list_column), int_a@1 as int_a] |
|               |   UnnestExec                                                                                                        |
|               |     ProjectionExec: expr=[list_column@0 as __unnest_placeholder(foo.list_column), int_a@1 as int_a]                 |
|               |       DataSourceExec: partitions=1, partition_sizes=[1]                                                             |
|               |                                                                                                                     |
+---------------+---------------------------------------------------------------------------------------------------------------------+

Creating a logical plan directly via for example the LogicalPlanBuilder :

#[tokio::test]
    async fn test_unnest_push_down_projection_via_logical_plan() -> Result<(), Box<dyn Error>> {
        let ctx = SessionContext::new();
        let df = ctx.sql("CREATE TABLE foo (list_column ARRAY<INTEGER>,  int_a INTEGER,  int_b INTEGER) AS VALUES (MAKE_ARRAY(1, 2, 3), 1,1);").await?;
        let batches = df.collect().await?;
        pretty::print_batches(&batches)?;

        let table_source = provider_as_source(ctx.table_provider("foo").await?);
        let builder = LogicalPlanBuilder::scan("foo", table_source, None)?;
        let builder = builder.unnest_column(Column::from("list_column"))?;
        let builder = builder.project(vec![col("list_column"), col("int_a")])?;

        let plan = builder.build()?;
        let plan = ctx.state().optimize(&plan)?;
        println!("{}", plan);

        Ok(())
    }

boils down to the following optimized logical plan :

Projection: list_column, foo.int_a
  Unnest: lists[foo.list_column|depth=1] structs[]
    TableScan: foo projection=[list_column, int_a, int_b]

=> Column "int_b" is not projected away on the table scan.

To Reproduce

#[tokio::test]
   async fn test_unnest_push_down_projection_via_logical_plan() -> Result<(), Box<dyn Error>> {
       let ctx = SessionContext::new();
       let df = ctx.sql("CREATE TABLE foo (list_column ARRAY<INTEGER>,  int_a INTEGER,  int_b INTEGER) AS VALUES (MAKE_ARRAY(1, 2, 3), 1,1);").await?;
       let batches = df.collect().await?;
       pretty::print_batches(&batches)?;

       let table_source = provider_as_source(ctx.table_provider("foo").await?);
       let builder = LogicalPlanBuilder::scan("foo", table_source, None)?;
       let builder = builder.unnest_column(Column::from("list_column"))?;
       let builder = builder.project(vec![col("list_column"), col("int_a")])?;

       let plan = builder.build()?;
       let plan = ctx.state().optimize(&plan)?;
       println!("{}", plan);

       Ok(())
   }

Expected behavior

Even when creating an unnest logica plan directly, it should project away unneeded columns.

Additional context

It seems the projection is a one time effort currently done on the level of SQL to logical plan translation.

See select.rs:

let plan = LogicalPlanBuilder::from(intermediate_plan) .project(inner_projection_exprs)? .unnest_columns_with_options(unnest_col_vec, unnest_options)? .build()?;

whereas in the actual optimization rule, there is no evaluation anymore for the unnest logical plan.

optimize_projections/mod.rs:

LogicalPlan::Unnest(Unnest {
    dependency_indices, ..
}) => {
    vec![RequiredIndices::new_from_indices(
        dependency_indices.clone(),
    )]
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions