-
Notifications
You must be signed in to change notification settings - Fork 6.2k
[Data] Avoid merging map ops in cases when it leads to substantial parallelism reduction #52570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Data] Avoid merging map ops in cases when it leads to substantial parallelism reduction #52570
Conversation
f8ab19b
to
ab58245
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…undle` Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…_rows_per_bundled_input` is not specified Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…never it has `min_num_rows_per_input_bundle` specified Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…uction (by more than > 4x) Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
|
||
# Do not fuse read op with downstream map op in case when downstream has | ||
# `min_rows_per_input_bundle` specified (to avoid reducing reading parallelism) | ||
if upstream_op.is_read_op() and ds_bundle_min_rows_req is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to read ops, I think we should disable fusion as long as the previous map op doesn't preserve num rows (e.g., read, filter, map_batches, flat_map, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for map_batches, typically it preserves num rows.
But today we don't enforce that.
related issue #36295
One option is to enforce that by default, and add a flag to allow violation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option is to enforce that by default, and add a flag to allow violation.
I don't think we can do that anymore with our public API -- i can totally see that being too limiting.
Regardless, though preserving num-rows for proper limit push-downs is an important topic but tangential to this change.
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
ab58245
to
7e6d122
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Why are these changes needed?
These changes are needed to prevent cases when Map Operator fusion could lead to substantial parallelism reduction.
Consider following scenario:
min_rows_per_input_bundle=1
min_rows_per_input_bundle=100
For a dataset of 100 rows and 10 blocks (10 rows / block) if we do fuse in this case, fused operator's parallelism will be just 1 task (determined by downstream) substantially reducing upstream's parallelism.
This is a big issue when we fuse Read ops with subsequent Map operations.
This change:
batch_size
specifiedRelated issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.