Skip to content

Commit b2b971b

Browse files
Omkar KulkarniOmkar Kulkarni
authored andcommitted
Merge branch 'SERVE-657-update-naming' of github.com:ray-project/ray into SERVE-657-update-naming
Signed-off-by: Omkar Kulkarni <omkar@omkar-JKJHCX74L6.local>
2 parents 71a57be + 263505d commit b2b971b

File tree

9 files changed

+664
-629
lines changed

9 files changed

+664
-629
lines changed

.editorconfig

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ indent_size = 4
99

1010
[*.{h,hpp,hxx,cpp,cxx,cc,c}]
1111
indent_size = 2
12-
wrap_width = 90
12+
max_line_length = 90
1313

1414
[*.html]
1515
indent_size = 2
@@ -22,7 +22,7 @@ indent_size = 2
2222

2323
[*.{py,pyx,pxd,pxi}]
2424
indent_size = 4
25-
wrap_width = 79
25+
max_line_length = 79
2626

2727
[*.sh]
2828
indent_size = 2

python/ray/data/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,20 @@ py_test(
704704
],
705705
)
706706

707+
py_test(
708+
name = "test_operator_fusion",
709+
size = "medium",
710+
srcs = ["tests/test_operator_fusion.py"],
711+
tags = [
712+
"exclusive",
713+
"team:data",
714+
],
715+
deps = [
716+
":conftest",
717+
"//:ray_lib",
718+
],
719+
)
720+
707721
py_test(
708722
name = "test_execution_optimizer",
709723
size = "medium",

python/ray/data/_internal/logical/optimizers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from ray.data._internal.logical.rules.inherit_target_max_block_size import (
1414
InheritTargetMaxBlockSizeRule,
1515
)
16-
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
16+
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
1717
from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule
1818
from ray.data._internal.logical.rules.set_read_parallelism import SetReadParallelismRule
1919
from ray.data._internal.logical.rules.zero_copy_map_fusion import (
@@ -35,7 +35,7 @@
3535
[
3636
InheritTargetMaxBlockSizeRule,
3737
SetReadParallelismRule,
38-
OperatorFusionRule,
38+
FuseOperators,
3939
EliminateBuildOutputBlocks,
4040
ConfigureMapTaskMemoryUsingOutputSize,
4141
]
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
1+
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
22
from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule
33

4-
__all__ = ["ReorderRandomizeBlocksRule", "OperatorFusionRule"]
4+
__all__ = ["ReorderRandomizeBlocksRule", "FuseOperators"]

python/ray/data/_internal/logical/rules/operator_fusion.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
3939

4040

41-
class OperatorFusionRule(Rule):
41+
class FuseOperators(Rule):
4242
"""Fuses linear chains of compatible physical operators."""
4343

4444
def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
@@ -54,21 +54,21 @@ def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
5454
# Update output dependencies after fusion.
5555
# TODO(hchen): Instead of updating the depdencies manually,
5656
# we need a better abstraction for manipulating the DAG.
57-
self._remove_output_depes(fused_dag)
58-
self._update_output_depes(fused_dag)
57+
self._remove_output_deps(fused_dag)
58+
self._update_output_deps(fused_dag)
5959

6060
new_plan = PhysicalPlan(fused_dag, self._op_map, plan.context)
6161
return new_plan
6262

63-
def _remove_output_depes(self, op: PhysicalOperator) -> None:
63+
def _remove_output_deps(self, op: PhysicalOperator) -> None:
6464
for input in op._input_dependencies:
6565
input._output_dependencies = []
66-
self._remove_output_depes(input)
66+
self._remove_output_deps(input)
6767

68-
def _update_output_depes(self, op: PhysicalOperator) -> None:
68+
def _update_output_deps(self, op: PhysicalOperator) -> None:
6969
for input in op._input_dependencies:
7070
input._output_dependencies.append(op)
71-
self._update_output_depes(input)
71+
self._update_output_deps(input)
7272

7373
def _fuse_map_operators_in_dag(self, dag: PhysicalOperator) -> MapOperator:
7474
"""Starting at the given operator, traverses up the DAG of operators
@@ -305,6 +305,7 @@ def _get_fused_map_operator(
305305
# Merge minimum block sizes.
306306
down_min_rows_per_bundled_input = down_logical_op._min_rows_per_bundled_input
307307
up_min_rows_per_bundled_input = up_logical_op._min_rows_per_bundled_input
308+
308309
if (
309310
down_min_rows_per_bundled_input is not None
310311
and up_min_rows_per_bundled_input is not None

python/ray/data/_internal/logical/rules/zero_copy_map_fusion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
)
1111
from ray.data._internal.logical.interfaces.optimizer import Rule
1212
from ray.data._internal.logical.interfaces.physical_plan import PhysicalPlan
13-
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
13+
from ray.data._internal.logical.rules.operator_fusion import FuseOperators
1414

1515

1616
class ZeroCopyMapFusionRule(Rule):
@@ -27,7 +27,7 @@ class ZeroCopyMapFusionRule(Rule):
2727

2828
@classmethod
2929
def dependencies(cls) -> List[Type[Rule]]:
30-
return [OperatorFusionRule]
30+
return [FuseOperators]
3131

3232
def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
3333
self._traverse(plan.dag)

0 commit comments

Comments
 (0)