Skip to content

Adapt Dask on Ray to the new Dask Task class #52589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

HiromuHota
Copy link

@HiromuHota HiromuHota commented Apr 24, 2025

Why are these changes needed?

"Dask on Ray" (DoR) is broken in dask==2024.11.0 or later as reported in #48689 because Dask removed a private function in dask/dask#11378 that DoR has been relying on. Not only dask/dask#11378, Dask has migrated their task data structure to a new format (the high-level motivation is described in dask/dask#9969). Since this migration spans across a series of PRs between 2024.11.0 and 2025.1.0, it's not realistic to copy what's been removed and paste them in Ray.

This PR adapts Dask on Ray to the change to keep its functionality.
The current PR as of 7c9def3 is compatible only with dask>=2024.11.0,<2025.1.0 because Dask made another major change in 2025.1.0, breaking the shuffle optimization introduced in #13951.
I can make further changes in this PR if we want to support dask>=2025.1.0 and drop support dask<2025.1.0. Supporting both would technically be possible but that would make codebase complicated. Let me know how Ray would like to approach this.

Related issue number

Closes #48689

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Hiromu Hota <hiromu.hota@gmail.com>
Signed-off-by: Hiromu Hota <hiromu.hota@gmail.com>
@HiromuHota HiromuHota marked this pull request as ready for review April 24, 2025 22:11
@HiromuHota
Copy link
Author

Since CI doesn't run unit tests with a newer version of dask, this is the result of unit tests that I locally ran.

(ray) ubuntu@ip-10-1-24-215:~/workspace/ray/python$ python -m pytest ray/util/dask/tests
Test session starts (platform: linux, Python 3.10.12, pytest 7.4.4, pytest-sugar 0.9.5)
rootdir: /home/ubuntu/workspace/ray
configfile: pytest.ini
plugins: aiohttp-0.3.0, shutil-1.7.0, virtualenv-1.7.0, fugue-0.8.7, timeout-2.1.0, typeguard-2.13.3, forked-1.4.0, httpserver-1.0.6, sphinx-0.5.1.dev0, lazy-fixture-0.6.3, docker-tools-3.1.3, sugar-0.9.5, repeat-0.9.3, asyncio-0.17.0, mock-3.14.0, rerunfailures-11.1.2, remotedata-0.3.2, anyio-3.7.1
timeout: 180.0s
timeout method: signal
timeout func_only: False
collecting ... 
 python/ray/util/dask/tests/test_dask_callback.py ✓✓✓✓✓✓✓✓                                                                                                            57% █████▊    
 python/ray/util/dask/tests/test_dask_optimization.py ✓✓                                                                                                              71% ███████▎  
 python/ray/util/dask/tests/test_dask_scheduler.py ✓✓✓✓                                                                                                              100% ██████████

Results (60.70s):
      14 passed
(ray) ubuntu@ip-10-1-24-215:~/workspace/ray/python$ pip list | grep dask
dask                                   2024.12.0

@hainesmichaelc hainesmichaelc added the community-contribution Contributed by the community label Apr 28, 2025
@masoudcharkhabi masoudcharkhabi added data Ray Data-related issues stability labels Apr 28, 2025
@robertnishihara
Copy link
Collaborator

I can make further changes in this PR if we want to support dask>=2025.1.0 and drop support dask<2025.1.0. Supporting both would technically be possible but that would make codebase complicated. Let me know how Ray would like to approach this.

What do you think is the right approach here? At first glance, it seems to make sense to optimize for the latest version.

@HiromuHota
Copy link
Author

@robertnishihara

I think it'd be safer for Dask on Ray users to phase the migration because of the changes in Dask side:

  1. Dask introduced another major change in 2025.1.0 that removed the legacy Dask Dataframe implementation and fully switched to the new one (release note). There would be an inherent risk due to this major change.
  2. The shuffle optimization, introduced in [dask-on-ray] Add multiple return DataFrame shuffle optimization. #13951, that I mentioned in the description was broken by this removal. I'm not sure if it is worth fixing it because Dask introduced a better shuffling method "p2p" in 2023 (blog), meanwhile [dask-on-ray] Add multiple return DataFrame shuffle optimization. #13951 was done in 2021 to solve Dask task based shuffle inefficient to execute in Ray due to dummy map split tasks #13959, by making the complexity of N^2 into N and the newer shuffling "p2p" addressed the same problem. This indicates that the custom optimization for Dask on Ray in [dask-on-ray] Add multiple return DataFrame shuffle optimization. #13951 is no longer relevant and Dask on Ray users can benefit from Dask's optimization for free.

So, I'd suggest to split the migration into two: ship this PR as is, wait Ray to be released at least once (e.g., 2.44.2), and then do another migration without #13951 to be released in a later version of Ray. This way, it'd be easier to identify the root cause if some issue happens.

@rbavery
Copy link

rbavery commented May 6, 2025

Just my own two cents, but I think that Dask-on-ray has been in a quasi maintained state long enough that I don't think many (or any?) are using it in production. So I think it is safe to optimize for the latest version. Selfishly, we are keen to use the latest version of Dask and try Dask on Ray with it. There's been some crucial optimizations made after 2025.1.0

https://docs.dask.org/en/stable/changelog.html#reducing-memory-pressure-for-xarray-workloads
https://docs.dask.org/en/stable/changelog.html#automatically-adjust-chunksizes-in-xarray-apply-ufunc

However I'm just happy that this is seeing some activity again and looking forward to trying whenever it is available.

Signed-off-by: Hiromu Hota <hiromu.hota@gmail.com>
@HiromuHota
Copy link
Author

I made a further change to support dask>=2025.1.0.
As of f8fe686, the support matrix looks like this:

dask dask_expr Dask on Ray
<2024.11.0 N/A No
>=2024.11.0,<2025.1.0 Not installed Yes
>=2024.11.0,<2025.1.0 Installed Yes w/o custom optimization
>=2025.1.0 N/A Yes w/o custom optimization

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution Contributed by the community data Ray Data-related issues stability
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Util] Cannot import Dask scheduler
5 participants