Skip to content

Slow aggregrate query with array_agg, Polars is 4 times faster for equal query #17446

@valkum

Description

@valkum

Describe the bug

I encountered a slow query, which I already mentioned in Discord but got no answer. I went ahead and created a repro case with data I can share that showcases this slow query.

Plans:

DataFrame()
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: df.name, df.group, CASE WHEN array_element(markets, Int64(1)) IS NOT NULL THEN markets ELSE List() END AS CASE WHEN markets[Int64(1)] IS NOT NULL THEN markets ELSE NULL END                                                          |
|               |   Aggregate: groupBy=[[df.name, df.group]], aggr=[[array_agg(market) AS markets]]                                                                                                                                                                 |
|               |     Projection: df.name, df.group, CASE WHEN df.market IS NOT NULL AND df.price IS NOT NULL THEN named_struct(Utf8("market"), CAST(df.market AS Dictionary(UInt16, Utf8)), Utf8("price"), df.price) ELSE Struct({market:,price:}) END AS market   |
|               |       TableScan: df projection=[name, group, market, price]                                                                                                                                                                                       |
| physical_plan | ProjectionExec: expr=[name@0 as name, group@1 as group, CASE WHEN array_element(markets@2, 1) IS NOT NULL THEN markets@2 END as CASE WHEN markets[Int64(1)] IS NOT NULL THEN markets ELSE NULL END]                                               |
|               |   AggregateExec: mode=FinalPartitioned, gby=[name@0 as name, group@1 as group], aggr=[markets], ordering_mode=Sorted                                                                                                                              |
|               |     SortExec: expr=[name@0 ASC NULLS LAST, group@1 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                  |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                 |
|               |         RepartitionExec: partitioning=Hash([name@0, group@1], 10), input_partitions=10                                                                                                                                                            |
|               |           AggregateExec: mode=Partial, gby=[name@0 as name, group@1 as group], aggr=[markets], ordering_mode=Sorted                                                                                                                               |
|               |             ProjectionExec: expr=[name@0 as name, group@1 as group, CASE WHEN market@2 IS NOT NULL AND price@3 IS NOT NULL THEN named_struct(market, CAST(market@2 AS Dictionary(UInt16, Utf8)), price, price@3) END as market]                   |
|               |               DataSourceExec: file_groups={10 groups: [, ...]}, projection=[name, group, market, price], output_ordering=[name@0 ASC NULLS LAST, group@1 ASC NULLS LAST], file_type=parquet                                                       |
|               |                                                                                                                                                                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Hyperfine reports:

Benchmark 1: uv run test_polars.py sample-1m.parquet
  Time (mean ± σ):     300.2 ms ± 104.0 ms    [User: 538.6 ms, System: 140.1 ms]
  Range (min … max):   249.2 ms … 585.3 ms    10 runs
 
  Warning: The first benchmarking run for this command was significantly slower than the rest (585.3 ms). This could be caused by (filesystem) caches that were not filled until after the first run. You should consider using the '--warmup' option to fill those caches before the actual benchmark. Alternatively, use the '--prepare' option to clear the caches before each timing run.
 
Benchmark 2: uv run test_datafusion.py sample-1m.parquet
  Time (mean ± σ):      1.320 s ±  0.062 s    [User: 4.919 s, System: 0.258 s]
  Range (min … max):    1.264 s …  1.444 s    10 runs
 
Summary
  uv run test_polars.py sample-1m.parquet ran
    4.40 ± 1.54 times faster than uv run test_datafusion.py sample-1m.parquet

To Reproduce

See https://github.com/valkum/polars-datafusion-comparison/tree/main for full repro case.

Expected behavior

No response

Additional context

I am not sure if a similar file and task is part of the benchmarks. As the 1m sample file is 14MB in size, thus it makes no sense to include such a file in the git repo. A smaller sample file (e.g. 10k) only results a 1.49x difference by Polars. Feel free to store a generated file to include this in the benchmarks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is neededperformanceMake DataFusion faster

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions