Skip to content

[Data] Avoid merging map ops in cases when it leads to substantial parallelism reduction #52570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 25 commits into
base: master
Choose a base branch
from

Conversation

alexeykudinkin
Copy link
Contributor

Why are these changes needed?

These changes are needed to prevent cases when Map Operator fusion could lead to substantial parallelism reduction.

Consider following scenario:

  1. Upstream MapOp specifies min_rows_per_input_bundle=1
  2. Downstreeam MapOp specifies min_rows_per_input_bundle=100

For a dataset of 100 rows and 10 blocks (10 rows / block) if we do fuse in this case, fused operator's parallelism will be just 1 task (determined by downstream) substantially reducing upstream's parallelism.

This is a big issue when we fuse Read ops with subsequent Map operations.

This change:

  1. Avoids fusion of Read ops with downstream Map ops that have batch_size specified
  2. Adjusts fusion sequence to avoid fusing operators with substantial reduction in estimated parallelism
  3. Adds telemetry

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner April 24, 2025 02:19
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Apr 24, 2025
@alexeykudinkin alexeykudinkin changed the base branch from ak/op-fus-fix to master April 24, 2025 02:23
@alexeykudinkin alexeykudinkin removed request for a team, simonsays1980 and sven1977 April 24, 2025 02:23
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…undle`

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…_rows_per_bundled_input` is not specified

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…never it has `min_num_rows_per_input_bundle` specified

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…uction (by more than > 4x)

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

# Do not fuse read op with downstream map op in case when downstream has
# `min_rows_per_input_bundle` specified (to avoid reducing reading parallelism)
if upstream_op.is_read_op() and ds_bundle_min_rows_req is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to read ops, I think we should disable fusion as long as the previous map op doesn't preserve num rows (e.g., read, filter, map_batches, flat_map, etc)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for map_batches, typically it preserves num rows.
But today we don't enforce that.
related issue #36295
One option is to enforce that by default, and add a flag to allow violation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to enforce that by default, and add a flag to allow violation.

I don't think we can do that anymore with our public API -- i can totally see that being too limiting.

Regardless, though preserving num-rows for proper limit push-downs is an important topic but tangential to this change.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants