Skip to content

Bodo vs Dask comparison #411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jacobtomlinson opened this issue Jan 30, 2025 · 18 comments
Open

Bodo vs Dask comparison #411

jacobtomlinson opened this issue Jan 30, 2025 · 18 comments

Comments

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jan 30, 2025

I noticed that the folks over at Bodo published a blog post comparing the performance of Bodo, Dask, Spark and Modin + Ray.

Image

They published the benchmark code here, it would be interesting to try to reproduce and verify their results.

The Dask example uses dask-cloudprovider. I wonder what it would be like to use Coiled instead.

@phofl
Copy link

phofl commented Jan 30, 2025

Their instance type selection is our biggest foe here. Dask doesn't perform very well on these large instances. using more smaller instances with the same number of cores in aggregate would most likely perform a lot better

@phofl
Copy link

phofl commented Jan 30, 2025

So I ran this on Coiled and it's a lot faster with proper instances but the main problem is that their parquet files are not suited for distributed processing. They are huge and they don't really use row-groups. Splitting the row groups creates only 38 read tasks and some of them produce partitions with multiple GBs.

@jacobtomlinson
Copy link
Member Author

Thanks for running this @phofl, I was just doing the same but you got there first.

I also tried their standard pandas version, but I'm getting pyarrow.lib.ArrowNotImplementedError: Unsupported cast from string to null using function cast_null when trying to load the parquet files.

@phofl
Copy link

phofl commented Jan 30, 2025

Any chance you have an older arrow version installed?

@jacobtomlinson
Copy link
Member Author

jacobtomlinson commented Jan 30, 2025

I just created a fresh RAPIDS environment so I have arrow==1.3.0 and pyarrow==17.0.0.

@phofl
Copy link

phofl commented Jan 30, 2025

Ok, that's odd then...

I ran this with the dataset that we are hosting in a coiled s3 bucket, i.e.

dataset = "s3://coiled-datasets/uber-lyft-tlc/"

and that one finished the query in 27 seconds (minus the download of the computed results, I interrupted that because my wifi isn't that fast). They are running the query on a cloud machine, so it shouldn't matter much

@fjetter
Copy link
Member

fjetter commented Jan 30, 2025

parquet performance will also depend on the backend used. the pyarrow backend is / should be faster but it has still a lot of sharp edges and isn't the default

@phofl
Copy link

phofl commented Jan 30, 2025

parquet performance will also depend on the backend used. the pyarrow backend is / should be faster but it has still a lot of sharp edges and isn't the default

it technically is, but our fusing is pretty aggressive which means we only end up with 80 partitions since we only need 4 columns from the dataset when using the pyarrow backend but they are running on 128 cores... The cluster is overprovisioned by quite a bit

@fjetter
Copy link
Member

fjetter commented Jan 30, 2025

Well, they are running on a r6i.16xlarge machine which has 64 cores. This can't work 😂 The GIL will kill everything. We have moderate if not severe GIL issues already at 8 cores. 64 is impossible to run anything meaningfully. Even IO will be impossibly slow because it is using fastparquet by default, i.e. another python library that requires the GIL.

@fjetter
Copy link
Member

fjetter commented Jan 30, 2025

They are submitting a task that is internally calling compute. This will be using a worker client, i.e. the driver code is also a worker task. I'm not entirely sure how that'll impact performance but it is uncommon.

I just noticed that we do not have any good advice on https://docs.dask.org/en/stable/best-practices.html about instance sizes (see dask/dask#11705). There is a comment about avoiding thread counts above 10 but I think the recommendation should be lower. This docs section is not truly dealing with a distributed cluster.

@martindurant
Copy link
Member

fastparquet by default, i.e. another python library that requires the GIL.

fastparquet releases the GIL in its core decoding algorithms FWIW

I would suggest that the get_monthly_travels_weather function is pretty terribly written, you could do much better with some map_partitions. Yes, the operations should be fused, but there are LOADS of pandas temporaries in there. You could probably do it all in a little numba func, if anyone wants to try. I say this, because the main selling point on bodo is its JIT, apparently.

@scott-routledge2
Copy link

Hello! I wrote this code and wanted to provide some additional context that might be helpful to this discussion. First of all thank you for taking the time to check out our benchmark and reproduce the results! This investigation was really informative and I find myself learning a lot from reading it.

Firstly, the goal of this benchmark was to demonstrate how a typical Pandas user might write a simple workflow and then try to scale it (and doing so without drastically rewriting their code).

Regarding the parquet files, they were downloaded directly from here without any modification. It's important to note here that each system had to deal with the same large parquet files.

Regarding the instance size, this was primarily done to increase the amount of available memory since smaller instance sizes ran out of memory fairly quickly. I wasn't aware of Dask issues with large instances. Looking at the Dask distributed dashboard, looks like there is a task that requires all data on a single worker. Any ideas?

@hendrikmakait
Copy link
Member

Looking at the Dask distributed dashboard, looks like there is a task that requires all data on a single worker. Any ideas?

It looks like you call .compute() on the final result, whereas other libraries write the results to S3 in Parquet format. Calling .compute() has multiple issues that will affect runtime and the memory footprint:

  1. It will concatenate the entire result into a single partition, which might be what you see with the single task requiring all data.
  2. It will then transfer the entire result back to the client, which can take a while, depending on your connection and location.

I'd recommend writing to S3 to ensure this benchmark does not compare apples to oranges.

@martindurant
Copy link
Member

t will concatenate the entire result into a single partition

This is not generally true! The concatenation should happen in the client.

@hendrikmakait
Copy link
Member

This is not generally true! The concatenation should happen in the client.

It's true for dataframes, which is the API that is used here. See dask/dask-expr#1138 which introduces a flag to disable that behavior.

@martindurant
Copy link
Member

It's true for dataframes

I am truly astonished.

Unfortunately, the PR has no discussion or documentation on when the user should use the new optional flag.

@scott-routledge2
Copy link

Thanks. I opened a PR to update the code to write to S3 instead of compute here. I didn't notice a significant difference in this case, does the compute=True flag in to_parquet also transfer some result to the client?

@martindurant
Copy link
Member

does the compute=True flag in to_parquet also transfer some result to the client

No. You did the default and correct thing. The difference is, whether the operation waits for completion, or returns background-monitored futures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants