From 6fdec5139278f34902794da72de2ce53597831be Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Tue, 22 Apr 2025 10:10:38 -0700 Subject: [PATCH 1/7] [data] support ray_remote_args and ray_remote_args_fn for preprocessor Signed-off-by: Xingyu Long --- python/ray/data/preprocessor.py | 25 +++++++++++++++++++++- python/ray/data/preprocessors/tokenizer.py | 15 ++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/python/ray/data/preprocessor.py b/python/ray/data/preprocessor.py index b9182dea3b54..b4b9a2fa145a 100644 --- a/python/ray/data/preprocessor.py +++ b/python/ray/data/preprocessor.py @@ -4,7 +4,7 @@ import pickle import warnings from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional, Callable from ray.air.util.data_batch_conversion import BatchFormat from ray.util.annotations import DeveloperAPI, PublicAPI @@ -47,6 +47,24 @@ class Preprocessor(abc.ABC): implemented method. """ + def __init__( + self, + ray_remote_args: Optional[Dict[str, Any]] = None, + ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, + ): + """ + Args: + ray_remote_args: Args to provide to :func:`ray.remote`. + ray_remote_args_fn: A function that returns a dictionary of remote args + passed to each map worker. The purpose of this argument is to generate + dynamic arguments for each actor/task, and will be called each time + prior to initializing the worker. Args returned from this dict will + always override the args in ``ray_remote_args``. Note: this is an + advanced, experimental feature. + """ + self._ray_remote_args = ray_remote_args + self._ray_remote_args_fn = ray_remote_args_fn + class FitStatus(str, Enum): """The fit status of preprocessor.""" @@ -225,6 +243,11 @@ def _transform(self, ds: "Dataset") -> "Dataset": # Our user-facing batch format should only be pandas or NumPy, other # formats {arrow, simple} are internal. kwargs = self._get_transform_config() + if self._ray_remote_args is not None: + kwargs = dict(kwargs, **self._ray_remote_args) + if self._ray_remote_args_fn is not None: + kwargs["ray_remote_args_fn"] = self._ray_remote_args_fn + if transform_type == BatchFormat.PANDAS: return ds.map_batches( self._transform_pandas, batch_format=BatchFormat.PANDAS, **kwargs diff --git a/python/ray/data/preprocessors/tokenizer.py b/python/ray/data/preprocessors/tokenizer.py index 8e44d3934579..151795666ec9 100644 --- a/python/ray/data/preprocessors/tokenizer.py +++ b/python/ray/data/preprocessors/tokenizer.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Optional +from typing import Callable, List, Optional, Dict, Any import pandas as pd @@ -59,6 +59,13 @@ class Tokenizer(Preprocessor): columns will be the same as the input columns. If not None, the length of ``output_columns`` must match the length of ``columns``, othwerwise an error will be raised. + ray_remote_args: Args to provide to :func:`ray.remote`. + ray_remote_args_fn: A function that returns a dictionary of remote args + passed to each map worker. The purpose of this argument is to generate + dynamic arguments for each actor/task, and will be called each time + prior to initializing the worker. Args returned from this dict will + always override the args in ``ray_remote_args``. Note: this is an + advanced, experimental feature. """ _is_fittable = False @@ -68,7 +75,13 @@ def __init__( columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, output_columns: Optional[List[str]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, ): + super().__init__( + ray_remote_args=ray_remote_args, + ray_remote_args_fn=ray_remote_args_fn, + ) self.columns = columns # TODO(matt): Add a more robust default tokenizer. self.tokenization_fn = tokenization_fn or simple_split_tokenizer From fe15bd7a07c29b5d71ea6ec53091f402896b6881 Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Wed, 23 Apr 2025 21:37:32 -0700 Subject: [PATCH 2/7] [data] add test for preprocessor Signed-off-by: Xingyu Long --- .../tests/preprocessors/test_preprocessors.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/python/ray/data/tests/preprocessors/test_preprocessors.py b/python/ray/data/tests/preprocessors/test_preprocessors.py index 0adc3d8a8889..8cf93b2b6f1f 100644 --- a/python/ray/data/tests/preprocessors/test_preprocessors.py +++ b/python/ray/data/tests/preprocessors/test_preprocessors.py @@ -165,6 +165,45 @@ def test_fit_twice(mocked_warn): mocked_warn.assert_called_once_with(msg) +def test_ray_remote_args_and_fn(): + batch_size = 2 + + ray_remote_args = {"num_cpus": 2} + + def func(df): + import os + + df["value"][:] = int(os.environ["__MY_TEST__"]) + return df + + class DummyPreprocessor(Preprocessor): + _is_fittable = False + + def _get_transform_config(self): + return {"batch_size": batch_size} + + def _transform_numpy(self, data): + assert ( + ray.get_runtime_context().get_assigned_resources()["CPU"] + == ray_remote_args["num_cpus"] + ) + assert len(data["value"]) == batch_size + func(data) + return data + + def _determine_transform_to_use(self): + return "numpy" + + prep = DummyPreprocessor( + ray_remote_args=ray_remote_args, + ray_remote_args_fn=lambda: {"runtime_env": {"env_vars": {"__MY_TEST__": "69"}}}, + ) + ds = ray.data.from_pandas(pd.DataFrame({"value": list(range(10))})) + ds = prep.transform(ds) + + assert sorted([x["value"] for x in ds.take(5)]) == [69, 69, 69, 69, 69] + + def test_transform_config(): """Tests that the transform_config of the Preprocessor is respected during transform.""" From 9d7f403f7175f4a0177be2718a364b9a2a84e138 Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Fri, 25 Apr 2025 20:52:20 -0700 Subject: [PATCH 3/7] [data] use nums_cpus, memory, batch_size, concurrency Signed-off-by: Xingyu Long --- python/ray/data/preprocessor.py | 43 ++++++++++++------- python/ray/data/preprocessors/tokenizer.py | 26 +++++------ .../tests/preprocessors/test_preprocessors.py | 20 +++------ 3 files changed, 47 insertions(+), 42 deletions(-) diff --git a/python/ray/data/preprocessor.py b/python/ray/data/preprocessor.py index b4b9a2fa145a..b4b7fe6ad532 100644 --- a/python/ray/data/preprocessor.py +++ b/python/ray/data/preprocessor.py @@ -4,7 +4,17 @@ import pickle import warnings from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional, Callable +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Union, + List, + Optional, + Callable, + Literal, + Tuple, +) from ray.air.util.data_batch_conversion import BatchFormat from ray.util.annotations import DeveloperAPI, PublicAPI @@ -49,21 +59,22 @@ class Preprocessor(abc.ABC): def __init__( self, - ray_remote_args: Optional[Dict[str, Any]] = None, - ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, + num_cpus: Optional[float] = None, + memory: Optional[float] = None, + batch_size: Union[int, None, Literal["default"]] = None, + concurrency: Optional[int] = None, ): """ Args: - ray_remote_args: Args to provide to :func:`ray.remote`. - ray_remote_args_fn: A function that returns a dictionary of remote args - passed to each map worker. The purpose of this argument is to generate - dynamic arguments for each actor/task, and will be called each time - prior to initializing the worker. Args returned from this dict will - always override the args in ``ray_remote_args``. Note: this is an - advanced, experimental feature. + num_cpus: The number of CPUs to reserve for each parallel map worker. + memory: The heap memory in bytes to reserve for each parallel map worker. + batch_size: The maximum number of rows to return. + concurrency: The maximum number of Ray workers to use concurrently. """ - self._ray_remote_args = ray_remote_args - self._ray_remote_args_fn = ray_remote_args_fn + self._num_cpus = num_cpus + self._memory = memory + self._batch_size = batch_size + self._concurrency = concurrency class FitStatus(str, Enum): """The fit status of preprocessor.""" @@ -243,10 +254,10 @@ def _transform(self, ds: "Dataset") -> "Dataset": # Our user-facing batch format should only be pandas or NumPy, other # formats {arrow, simple} are internal. kwargs = self._get_transform_config() - if self._ray_remote_args is not None: - kwargs = dict(kwargs, **self._ray_remote_args) - if self._ray_remote_args_fn is not None: - kwargs["ray_remote_args_fn"] = self._ray_remote_args_fn + kwargs["num_cpus"] = self._num_cpus + kwargs["memory"] = self._memory + kwargs["batch_size"] = self._batch_size + kwargs["concurrency"] = self._concurrency if transform_type == BatchFormat.PANDAS: return ds.map_batches( diff --git a/python/ray/data/preprocessors/tokenizer.py b/python/ray/data/preprocessors/tokenizer.py index 151795666ec9..4750a6ad6f4e 100644 --- a/python/ray/data/preprocessors/tokenizer.py +++ b/python/ray/data/preprocessors/tokenizer.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Optional, Dict, Any +from typing import Callable, List, Optional, Literal, Union import pandas as pd @@ -59,13 +59,10 @@ class Tokenizer(Preprocessor): columns will be the same as the input columns. If not None, the length of ``output_columns`` must match the length of ``columns``, othwerwise an error will be raised. - ray_remote_args: Args to provide to :func:`ray.remote`. - ray_remote_args_fn: A function that returns a dictionary of remote args - passed to each map worker. The purpose of this argument is to generate - dynamic arguments for each actor/task, and will be called each time - prior to initializing the worker. Args returned from this dict will - always override the args in ``ray_remote_args``. Note: this is an - advanced, experimental feature. + num_cpus: The number of CPUs to reserve for each parallel map worker. + memory: The heap memory in bytes to reserve for each parallel map worker. + batch_size: The maximum number of rows to return. + concurrency: The maximum number of Ray workers to use concurrently. """ _is_fittable = False @@ -75,12 +72,17 @@ def __init__( columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, output_columns: Optional[List[str]] = None, - ray_remote_args: Optional[Dict[str, Any]] = None, - ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, + *, + num_cpus: Optional[float] = None, + memory: Optional[float] = None, + batch_size: Union[int, None, Literal["default"]] = None, + concurrency: Optional[int] = None, ): super().__init__( - ray_remote_args=ray_remote_args, - ray_remote_args_fn=ray_remote_args_fn, + num_cpus=num_cpus, + memory=memory, + batch_size=batch_size, + concurrency=concurrency, ) self.columns = columns # TODO(matt): Add a more robust default tokenizer. diff --git a/python/ray/data/tests/preprocessors/test_preprocessors.py b/python/ray/data/tests/preprocessors/test_preprocessors.py index 8cf93b2b6f1f..e5640bd18497 100644 --- a/python/ray/data/tests/preprocessors/test_preprocessors.py +++ b/python/ray/data/tests/preprocessors/test_preprocessors.py @@ -165,17 +165,9 @@ def test_fit_twice(mocked_warn): mocked_warn.assert_called_once_with(msg) -def test_ray_remote_args_and_fn(): +def test_initialization_parameters(): batch_size = 2 - ray_remote_args = {"num_cpus": 2} - - def func(df): - import os - - df["value"][:] = int(os.environ["__MY_TEST__"]) - return df - class DummyPreprocessor(Preprocessor): _is_fittable = False @@ -185,23 +177,23 @@ def _get_transform_config(self): def _transform_numpy(self, data): assert ( ray.get_runtime_context().get_assigned_resources()["CPU"] - == ray_remote_args["num_cpus"] + == self._num_cpus ) assert len(data["value"]) == batch_size - func(data) return data def _determine_transform_to_use(self): return "numpy" prep = DummyPreprocessor( - ray_remote_args=ray_remote_args, - ray_remote_args_fn=lambda: {"runtime_env": {"env_vars": {"__MY_TEST__": "69"}}}, + num_cpus=2, + concurrency=2, + batch_size=batch_size, ) ds = ray.data.from_pandas(pd.DataFrame({"value": list(range(10))})) ds = prep.transform(ds) - assert sorted([x["value"] for x in ds.take(5)]) == [69, 69, 69, 69, 69] + assert [x["value"] for x in ds.take(5)] == [0, 1, 2, 3, 4] def test_transform_config(): From 06453911a6b11a30ad36ccd404ffe9df18f9ab89 Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Sat, 26 Apr 2025 15:05:28 -0700 Subject: [PATCH 4/7] [data] move parameters to transform, fit_transform func Signed-off-by: Xingyu Long --- python/ray/data/preprocessor.py | 82 +++++++++++-------- python/ray/data/preprocessors/tokenizer.py | 15 ---- .../tests/preprocessors/test_preprocessors.py | 52 +++--------- 3 files changed, 63 insertions(+), 86 deletions(-) diff --git a/python/ray/data/preprocessor.py b/python/ray/data/preprocessor.py index b4b7fe6ad532..1a1567c5a2ff 100644 --- a/python/ray/data/preprocessor.py +++ b/python/ray/data/preprocessor.py @@ -11,9 +11,7 @@ Union, List, Optional, - Callable, Literal, - Tuple, ) from ray.air.util.data_batch_conversion import BatchFormat @@ -57,25 +55,6 @@ class Preprocessor(abc.ABC): implemented method. """ - def __init__( - self, - num_cpus: Optional[float] = None, - memory: Optional[float] = None, - batch_size: Union[int, None, Literal["default"]] = None, - concurrency: Optional[int] = None, - ): - """ - Args: - num_cpus: The number of CPUs to reserve for each parallel map worker. - memory: The heap memory in bytes to reserve for each parallel map worker. - batch_size: The maximum number of rows to return. - concurrency: The maximum number of Ray workers to use concurrently. - """ - self._num_cpus = num_cpus - self._memory = memory - self._batch_size = batch_size - self._concurrency = concurrency - class FitStatus(str, Enum): """The fit status of preprocessor.""" @@ -147,7 +126,15 @@ def fit(self, ds: "Dataset") -> "Preprocessor": self._fitted = True return fitted_ds - def fit_transform(self, ds: "Dataset") -> "Dataset": + def fit_transform( + self, + ds: "Dataset", + *, + transform_num_cpus: Optional[float] = None, + transform_memory: Optional[float] = None, + transform_batch_size: Union[int, None, Literal["default"]] = None, + transform_concurrency: Optional[int] = None, + ) -> "Dataset": """Fit this Preprocessor to the Dataset and then transform the Dataset. Calling it more than once will overwrite all previously fitted state: @@ -156,18 +143,40 @@ def fit_transform(self, ds: "Dataset") -> "Dataset": Args: ds: Input Dataset. + transform_num_cpus: The number of CPUs to reserve for each parallel map worker. + transform_memory: The heap memory in bytes to reserve for each parallel map worker. + transform_batch_size: The maximum number of rows to return. + transform_concurrency: The maximum number of Ray workers to use concurrently. Returns: ray.data.Dataset: The transformed Dataset. """ self.fit(ds) - return self.transform(ds) + return self.transform( + ds, + num_cpus=transform_num_cpus, + memory=transform_memory, + batch_size=transform_batch_size, + concurrency=transform_concurrency, + ) - def transform(self, ds: "Dataset") -> "Dataset": + def transform( + self, + ds: "Dataset", + *, + num_cpus: Optional[float] = None, + memory: Optional[float] = None, + batch_size: Union[int, None, Literal["default"]] = None, + concurrency: Optional[int] = None, + ) -> "Dataset": """Transform the given dataset. Args: ds: Input Dataset. + num_cpus: The number of CPUs to reserve for each parallel map worker. + memory: The heap memory in bytes to reserve for each parallel map worker. + batch_size: The maximum number of rows to return. + concurrency: The maximum number of Ray workers to use concurrently. Returns: ray.data.Dataset: The transformed Dataset. @@ -184,7 +193,7 @@ def transform(self, ds: "Dataset") -> "Dataset": "`fit` must be called before `transform`, " "or simply use fit_transform() to run both steps" ) - transformed_ds = self._transform(ds) + transformed_ds = self._transform(ds, num_cpus, memory, batch_size, concurrency) return transformed_ds def transform_batch(self, data: "DataBatchType") -> "DataBatchType": @@ -246,18 +255,27 @@ def _determine_transform_to_use(self) -> BatchFormat: "for Preprocessor transforms." ) - def _transform(self, ds: "Dataset") -> "Dataset": - # TODO(matt): Expose `batch_size` or similar configurability. - # The default may be too small for some datasets and too large for others. + def _transform( + self, + ds: "Dataset", + num_cpus: Optional[float] = None, + memory: Optional[float] = None, + batch_size: Union[int, None, Literal["default"]] = None, + concurrency: Optional[int] = None, + ) -> "Dataset": transform_type = self._determine_transform_to_use() # Our user-facing batch format should only be pandas or NumPy, other # formats {arrow, simple} are internal. kwargs = self._get_transform_config() - kwargs["num_cpus"] = self._num_cpus - kwargs["memory"] = self._memory - kwargs["batch_size"] = self._batch_size - kwargs["concurrency"] = self._concurrency + if num_cpus is not None: + kwargs["num_cpus"] = num_cpus + if memory is not None: + kwargs["memory"] = memory + if batch_size is not None: + kwargs["batch_size"] = batch_size + if concurrency is not None: + kwargs["concurrency"] = concurrency if transform_type == BatchFormat.PANDAS: return ds.map_batches( diff --git a/python/ray/data/preprocessors/tokenizer.py b/python/ray/data/preprocessors/tokenizer.py index 4750a6ad6f4e..80d93f22c43b 100644 --- a/python/ray/data/preprocessors/tokenizer.py +++ b/python/ray/data/preprocessors/tokenizer.py @@ -59,10 +59,6 @@ class Tokenizer(Preprocessor): columns will be the same as the input columns. If not None, the length of ``output_columns`` must match the length of ``columns``, othwerwise an error will be raised. - num_cpus: The number of CPUs to reserve for each parallel map worker. - memory: The heap memory in bytes to reserve for each parallel map worker. - batch_size: The maximum number of rows to return. - concurrency: The maximum number of Ray workers to use concurrently. """ _is_fittable = False @@ -72,18 +68,7 @@ def __init__( columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, output_columns: Optional[List[str]] = None, - *, - num_cpus: Optional[float] = None, - memory: Optional[float] = None, - batch_size: Union[int, None, Literal["default"]] = None, - concurrency: Optional[int] = None, ): - super().__init__( - num_cpus=num_cpus, - memory=memory, - batch_size=batch_size, - concurrency=concurrency, - ) self.columns = columns # TODO(matt): Add a more robust default tokenizer. self.tokenization_fn = tokenization_fn or simple_split_tokenizer diff --git a/python/ray/data/tests/preprocessors/test_preprocessors.py b/python/ray/data/tests/preprocessors/test_preprocessors.py index e5640bd18497..1eea2073e97e 100644 --- a/python/ray/data/tests/preprocessors/test_preprocessors.py +++ b/python/ray/data/tests/preprocessors/test_preprocessors.py @@ -165,8 +165,11 @@ def test_fit_twice(mocked_warn): mocked_warn.assert_called_once_with(msg) -def test_initialization_parameters(): +def test_transform_all_configs(): batch_size = 2 + num_cpus = 2 + concurrency = 2 + memory = 1024 class DummyPreprocessor(Preprocessor): _is_fittable = False @@ -175,9 +178,9 @@ def _get_transform_config(self): return {"batch_size": batch_size} def _transform_numpy(self, data): + assert ray.get_runtime_context().get_assigned_resources()["CPU"] == num_cpus assert ( - ray.get_runtime_context().get_assigned_resources()["CPU"] - == self._num_cpus + ray.get_runtime_context().get_assigned_resources()["memory"] == memory ) assert len(data["value"]) == batch_size return data @@ -185,46 +188,17 @@ def _transform_numpy(self, data): def _determine_transform_to_use(self): return "numpy" - prep = DummyPreprocessor( - num_cpus=2, - concurrency=2, - batch_size=batch_size, - ) + prep = DummyPreprocessor() ds = ray.data.from_pandas(pd.DataFrame({"value": list(range(10))})) - ds = prep.transform(ds) - + ds = prep.transform( + ds, + num_cpus=num_cpus, + memory=memory, + concurrency=concurrency, + ) assert [x["value"] for x in ds.take(5)] == [0, 1, 2, 3, 4] -def test_transform_config(): - """Tests that the transform_config of - the Preprocessor is respected during transform.""" - - batch_size = 2 - - class DummyPreprocessor(Preprocessor): - _is_fittable = False - - def _transform_numpy(self, data): - assert len(data["value"]) == batch_size - return data - - def _transform_pandas(self, data): - raise RuntimeError( - "Pandas transform should not be called with numpy batch format." - ) - - def _get_transform_config(self): - return {"batch_size": 2} - - def _determine_transform_to_use(self): - return "numpy" - - prep = DummyPreprocessor() - ds = ray.data.from_pandas(pd.DataFrame({"value": list(range(4))})) - prep.transform(ds) - - @pytest.mark.parametrize("dataset_format", ["simple", "pandas", "arrow"]) def test_transform_all_formats(create_dummy_preprocessors, dataset_format): ( From 895b97fe53808ac0a69ab4cd1ae211e7900e55df Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Tue, 29 Apr 2025 20:33:44 -0700 Subject: [PATCH 5/7] [data] update the arguments based on recent comment Signed-off-by: Xingyu Long --- python/ray/data/preprocessor.py | 40 ++++++++++------------ python/ray/data/preprocessors/tokenizer.py | 2 +- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/python/ray/data/preprocessor.py b/python/ray/data/preprocessor.py index 1a1567c5a2ff..2e7d69ff4265 100644 --- a/python/ray/data/preprocessor.py +++ b/python/ray/data/preprocessor.py @@ -4,15 +4,7 @@ import pickle import warnings from enum import Enum -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Union, - List, - Optional, - Literal, -) +from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional from ray.air.util.data_batch_conversion import BatchFormat from ray.util.annotations import DeveloperAPI, PublicAPI @@ -132,7 +124,7 @@ def fit_transform( *, transform_num_cpus: Optional[float] = None, transform_memory: Optional[float] = None, - transform_batch_size: Union[int, None, Literal["default"]] = None, + transform_batch_size: Optional[int] = None, transform_concurrency: Optional[int] = None, ) -> "Dataset": """Fit this Preprocessor to the Dataset and then transform the Dataset. @@ -143,10 +135,10 @@ def fit_transform( Args: ds: Input Dataset. - transform_num_cpus: The number of CPUs to reserve for each parallel map worker. - transform_memory: The heap memory in bytes to reserve for each parallel map worker. - transform_batch_size: The maximum number of rows to return. - transform_concurrency: The maximum number of Ray workers to use concurrently. + transform_num_cpus: [experimental] The number of CPUs to reserve for each parallel map worker. + transform_memory: [experimental] The heap memory in bytes to reserve for each parallel map worker. + transform_batch_size: [experimental] The maximum number of rows to return. + transform_concurrency: [experimental] The maximum number of Ray workers to use concurrently. Returns: ray.data.Dataset: The transformed Dataset. @@ -164,19 +156,19 @@ def transform( self, ds: "Dataset", *, + batch_size: Optional[int] = None, num_cpus: Optional[float] = None, memory: Optional[float] = None, - batch_size: Union[int, None, Literal["default"]] = None, concurrency: Optional[int] = None, ) -> "Dataset": """Transform the given dataset. Args: ds: Input Dataset. - num_cpus: The number of CPUs to reserve for each parallel map worker. - memory: The heap memory in bytes to reserve for each parallel map worker. - batch_size: The maximum number of rows to return. - concurrency: The maximum number of Ray workers to use concurrently. + batch_size: [experimental] Advanced configuration for adjusting input size for each worker. + num_cpus: [experimental] The number of CPUs to reserve for each parallel map worker. + memory: [experimental] The heap memory in bytes to reserve for each parallel map worker. + concurrency: [experimental] The maximum number of Ray workers to use concurrently. Returns: ray.data.Dataset: The transformed Dataset. @@ -193,7 +185,13 @@ def transform( "`fit` must be called before `transform`, " "or simply use fit_transform() to run both steps" ) - transformed_ds = self._transform(ds, num_cpus, memory, batch_size, concurrency) + transformed_ds = self._transform( + ds, + batch_size=batch_size, + num_cpus=num_cpus, + memory=memory, + concurrency=concurrency, + ) return transformed_ds def transform_batch(self, data: "DataBatchType") -> "DataBatchType": @@ -258,9 +256,9 @@ def _determine_transform_to_use(self) -> BatchFormat: def _transform( self, ds: "Dataset", + batch_size: Optional[int], num_cpus: Optional[float] = None, memory: Optional[float] = None, - batch_size: Union[int, None, Literal["default"]] = None, concurrency: Optional[int] = None, ) -> "Dataset": transform_type = self._determine_transform_to_use() diff --git a/python/ray/data/preprocessors/tokenizer.py b/python/ray/data/preprocessors/tokenizer.py index 80d93f22c43b..8e44d3934579 100644 --- a/python/ray/data/preprocessors/tokenizer.py +++ b/python/ray/data/preprocessors/tokenizer.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Optional, Literal, Union +from typing import Callable, List, Optional import pandas as pd From 831466f2d11ce97a44b05a8fc19c225a98e2ad84 Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Tue, 29 Apr 2025 22:51:11 -0700 Subject: [PATCH 6/7] [data] fix the test, test_chain Signed-off-by: Xingyu Long --- python/ray/data/preprocessors/chain.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/python/ray/data/preprocessors/chain.py b/python/ray/data/preprocessors/chain.py index e608f8cf2f86..018612ab9abb 100644 --- a/python/ray/data/preprocessors/chain.py +++ b/python/ray/data/preprocessors/chain.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from ray.air.util.data_batch_conversion import BatchFormat from ray.data import Dataset @@ -79,9 +79,22 @@ def fit_transform(self, ds: Dataset) -> Dataset: ds = preprocessor.fit_transform(ds) return ds - def _transform(self, ds: Dataset) -> Dataset: + def _transform( + self, + ds: Dataset, + batch_size: Optional[int], + num_cpus: Optional[float] = None, + memory: Optional[float] = None, + concurrency: Optional[int] = None, + ) -> Dataset: for preprocessor in self.preprocessors: - ds = preprocessor.transform(ds) + ds = preprocessor.transform( + ds, + batch_size=batch_size, + num_cpus=num_cpus, + memory=memory, + concurrency=concurrency, + ) return ds def _transform_batch(self, df: "DataBatchType") -> "DataBatchType": From 9f2becc108c7a40f78e14baf8ff414fbc28ddbf0 Mon Sep 17 00:00:00 2001 From: Xingyu Long Date: Wed, 30 Apr 2025 20:46:14 -0700 Subject: [PATCH 7/7] [data] minor changes for the test Signed-off-by: Xingyu Long --- python/ray/data/tests/preprocessors/test_preprocessors.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/data/tests/preprocessors/test_preprocessors.py b/python/ray/data/tests/preprocessors/test_preprocessors.py index 1eea2073e97e..48e2b1b25d75 100644 --- a/python/ray/data/tests/preprocessors/test_preprocessors.py +++ b/python/ray/data/tests/preprocessors/test_preprocessors.py @@ -185,6 +185,11 @@ def _transform_numpy(self, data): assert len(data["value"]) == batch_size return data + def _transform_pandas(self, data): + raise RuntimeError( + "Pandas transform should not be called with numpy batch format." + ) + def _determine_transform_to_use(self): return "numpy"