From 6e1a87a2584323a55e8fc6dbeb21d7c325ffd507 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 18 Mar 2025 10:24:18 +0800 Subject: [PATCH 01/12] Support llm on custom resources beyond GPU. Signed-off-by: liuxsh9 --- .../ray/llm/_internal/batch/processor/base.py | 5 +++ .../batch/processor/vllm_engine_proc.py | 1 + .../batch/stages/vllm_engine_stage.py | 32 ++++++++++++------- .../_internal/serve/configs/server_models.py | 6 ++++ .../serve/deployments/llm/vllm/vllm_models.py | 11 ++++++- python/ray/util/accelerators/accelerators.py | 2 ++ 6 files changed, 45 insertions(+), 12 deletions(-) diff --git a/python/ray/llm/_internal/batch/processor/base.py b/python/ray/llm/_internal/batch/processor/base.py index a05981c8d185..4ff6d9de4540 100644 --- a/python/ray/llm/_internal/batch/processor/base.py +++ b/python/ray/llm/_internal/batch/processor/base.py @@ -26,6 +26,11 @@ class ProcessorConfig(BaseModelExtended): "You can tune the batch size to balance the throughput and fault-tolerance " "based on your use case. Defaults to 64.", ) + resources_per_worker: Optional[Dict[str, float]] = Field( + default=None, + description="This will override the default resources config for actors/workers, " + "the default resource config for LLM Stage may be something like {'GPU': 1}." + ) accelerator_type: Optional[str] = Field( default=None, description="The accelerator type used by the LLM stage in a processor. " diff --git a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py index 97dc83227cdf..1a356b00b6f6 100644 --- a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py +++ b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py @@ -190,6 +190,7 @@ def build_vllm_engine_processor( # This is used to make sure we overlap batches to avoid the tail # latency of each batch. max_concurrency=config.max_concurrent_batches, + resources=config.resources_per_worker, accelerator_type=config.accelerator_type, runtime_env=config.runtime_env, ), diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index c5099347858d..dcd6533e2ab3 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -573,12 +573,14 @@ def __del__(self): self.llm.shutdown() -def _ray_scheduling_strategy_fn(num_gpus_per_instance: int, accelerator_type: str): +def _ray_scheduling_strategy_fn( + num_workers_per_instance: int, accelerator_type: str, resources: Optional[Dict[str, float]] = None +): """ Create a Ray scheduling strategy for vLLM engine. Args: - num_gpus_per_instance: The number of GPUs per instance. + num_workers_per_instance: The number of workers per instance. accelerator_type: The accelerator type. Returns: @@ -586,13 +588,13 @@ def _ray_scheduling_strategy_fn(num_gpus_per_instance: int, accelerator_type: st """ def _get_bundle() -> Dict[str, float]: - bundle: Dict[str, float] = {"GPU": 1, "CPU": 1} + bundle: Dict[str, float] = resources if resources else {"GPU": 1, "CPU": 1} if accelerator_type: bundle[f"accelerator_type:{accelerator_type}"] = 0.001 return bundle pg = ray.util.placement_group( - [_get_bundle()] * num_gpus_per_instance, + [_get_bundle()] * num_workers_per_instance, strategy="STRICT_PACK", ) return dict( @@ -621,6 +623,7 @@ def post_init(cls, values): The updated values. """ map_batches_kwargs = values["map_batches_kwargs"] + resources_per_worker = map_batches_kwargs.get("resources") accelerator_type = map_batches_kwargs.get("accelerator_type", "") fn_constructor_kwargs = values["fn_constructor_kwargs"] engine_kwargs = fn_constructor_kwargs.get("engine_kwargs", {}) @@ -629,29 +632,36 @@ def post_init(cls, values): if accelerator_type: ray_remote_args["accelerator_type"] = accelerator_type - # Setup num_gpus required per vLLM engine. + # Setup num_workers required per vLLM engine. tp_size = engine_kwargs.get("tensor_parallel_size", 1) pp_size = engine_kwargs.get("pipeline_parallel_size", 1) - num_gpus = tp_size * pp_size + num_workers = tp_size * pp_size # Use the MP backend by default. engine_kwargs.setdefault("distributed_executor_backend", "mp") executor_backend = engine_kwargs.get("distributed_executor_backend") - # When Ray is used in the vLLM engine, we set num_gpus to 0 so that + # When Ray is used in the vLLM engine, we set num_devices to 0 so that # Ray Data won't reserve GPUs in advance. Instead, we specify scheduling # strategy in .map_batches() arguments and let vLLM Ray executor to # create placement groups for each TP/PP worker. - if executor_backend == "ray" and num_gpus > 1: + num_mp_workers = num_workers + if executor_backend == "ray" and num_workers > 1: # Note that we have to use partial() to pass a function # instead of an object. map_batches_kwargs["ray_remote_args_fn"] = partial( _ray_scheduling_strategy_fn, - num_gpus, + num_workers, accelerator_type, ) - num_gpus = 0 + num_mp_workers = 0 + + if not resources_per_worker: + map_batches_kwargs["num_gpus"] = num_mp_workers + else: + ray_remote_args["resources"] = { + key: value * num_mp_workers for key, value in resources_per_worker.items() + } - map_batches_kwargs["num_gpus"] = num_gpus map_batches_kwargs.update(ray_remote_args) return values diff --git a/python/ray/llm/_internal/serve/configs/server_models.py b/python/ray/llm/_internal/serve/configs/server_models.py index b47cba60c0d3..5fb90fafa0b3 100644 --- a/python/ray/llm/_internal/serve/configs/server_models.py +++ b/python/ray/llm/_internal/serve/configs/server_models.py @@ -190,6 +190,12 @@ class LLMConfig(BaseModelExtended): ), ) + resources_per_worker: Optional[Dict[str, float]] = Field( + default=None, + description="This will pass to config like `VLLMEngineConfig` and override " + "the resources config for the workers in vLLM engine." + ) + accelerator_type: Optional[str] = Field( default=None, description=f"The type of accelerator runs the model on. Only the following values are supported: {str([t.value for t in GPUType])}", diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 081bdbbefbc0..0d996f26653d 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -51,6 +51,11 @@ class VLLMEngineConfig(BaseModelExtended): None, description="Configuration for cloud storage mirror. This is for where the weights are downloaded from.", ) + resources_per_worker: Optional[Dict[str, float]] = Field( + default=None, + description="This overrides the vLLM engine worker's default resource configuration, " + "the number of resources returned by `placement_bundles`." + ) accelerator_type: Optional[GPUType] = Field( None, description="The type of accelerator to use. This is used to determine the placement group strategy.", @@ -104,6 +109,7 @@ def from_llm_config(cls, llm_config: LLMConfig) -> "VLLMEngineConfig": model_id=llm_config.model_id, hf_model_id=hf_model_id, mirror_config=mirror_config, + accelerator_name=llm_config.resources_per_worker, accelerator_type=llm_config.accelerator_type, engine_kwargs=llm_config.engine_kwargs, runtime_env=llm_config.runtime_env, @@ -134,7 +140,10 @@ def placement_strategy(self) -> str: @property def placement_bundles(self) -> List[Dict[str, float]]: - bundle = {"GPU": 1} + if not self.resources_per_worker: + bundle = {"GPU": 1} + else: + bundle = self.resources_per_worker if self.accelerator_type: bundle[self.ray_accelerator_type()] = 0.001 bundles = [bundle for _ in range(self.num_gpu_workers)] diff --git a/python/ray/util/accelerators/accelerators.py b/python/ray/util/accelerators/accelerators.py index c7099fdd109d..63c4f2f10ada 100644 --- a/python/ray/util/accelerators/accelerators.py +++ b/python/ray/util/accelerators/accelerators.py @@ -26,6 +26,8 @@ GOOGLE_TPU_V5P = "TPU-V5P" GOOGLE_TPU_V5LITEPOD = "TPU-V5LITEPOD" GOOGLE_TPU_V6E = "TPU-V6E" +HUAWEI_NPU_910B = "Ascend910B" +HUAWEI_NPU_910B4 = "Ascend910B4" # Use these instead of NVIDIA_A100 if you need a specific accelerator size. Note that # these labels are not auto-added to nodes, you'll have to add them manually in From d4b72cec1d31fed0dec6cd0feba175fdaa965b8a Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 18 Mar 2025 10:32:54 +0800 Subject: [PATCH 02/12] fix lint Signed-off-by: liuxsh9 --- python/ray/llm/_internal/batch/processor/base.py | 2 +- python/ray/llm/_internal/batch/stages/vllm_engine_stage.py | 7 +++++-- python/ray/llm/_internal/serve/configs/server_models.py | 2 +- .../_internal/serve/deployments/llm/vllm/vllm_models.py | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python/ray/llm/_internal/batch/processor/base.py b/python/ray/llm/_internal/batch/processor/base.py index 4ff6d9de4540..cbcfb379c900 100644 --- a/python/ray/llm/_internal/batch/processor/base.py +++ b/python/ray/llm/_internal/batch/processor/base.py @@ -29,7 +29,7 @@ class ProcessorConfig(BaseModelExtended): resources_per_worker: Optional[Dict[str, float]] = Field( default=None, description="This will override the default resources config for actors/workers, " - "the default resource config for LLM Stage may be something like {'GPU': 1}." + "the default resource config for LLM Stage may be something like {'GPU': 1}.", ) accelerator_type: Optional[str] = Field( default=None, diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index dcd6533e2ab3..3825e3e38ed2 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -574,7 +574,9 @@ def __del__(self): def _ray_scheduling_strategy_fn( - num_workers_per_instance: int, accelerator_type: str, resources: Optional[Dict[str, float]] = None + num_workers_per_instance: int, + accelerator_type: str, + resources: Optional[Dict[str, float]] = None, ): """ Create a Ray scheduling strategy for vLLM engine. @@ -660,7 +662,8 @@ def post_init(cls, values): map_batches_kwargs["num_gpus"] = num_mp_workers else: ray_remote_args["resources"] = { - key: value * num_mp_workers for key, value in resources_per_worker.items() + key: value * num_mp_workers + for key, value in resources_per_worker.items() } map_batches_kwargs.update(ray_remote_args) diff --git a/python/ray/llm/_internal/serve/configs/server_models.py b/python/ray/llm/_internal/serve/configs/server_models.py index 5fb90fafa0b3..8adb58c4a608 100644 --- a/python/ray/llm/_internal/serve/configs/server_models.py +++ b/python/ray/llm/_internal/serve/configs/server_models.py @@ -193,7 +193,7 @@ class LLMConfig(BaseModelExtended): resources_per_worker: Optional[Dict[str, float]] = Field( default=None, description="This will pass to config like `VLLMEngineConfig` and override " - "the resources config for the workers in vLLM engine." + "the resources config for the workers in vLLM engine.", ) accelerator_type: Optional[str] = Field( diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 0d996f26653d..5903fb304609 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -54,7 +54,7 @@ class VLLMEngineConfig(BaseModelExtended): resources_per_worker: Optional[Dict[str, float]] = Field( default=None, description="This overrides the vLLM engine worker's default resource configuration, " - "the number of resources returned by `placement_bundles`." + "the number of resources returned by `placement_bundles`.", ) accelerator_type: Optional[GPUType] = Field( None, From 87c5e9da55183c44accfcd88fe06f09db72abab4 Mon Sep 17 00:00:00 2001 From: liuxsh9 Date: Tue, 18 Mar 2025 11:03:46 +0800 Subject: [PATCH 03/12] fix typo and ray backend executor resources settitng. Signed-off-by: liuxsh9 --- python/ray/llm/_internal/batch/stages/vllm_engine_stage.py | 1 + .../ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index 3825e3e38ed2..2fc0d8425622 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -655,6 +655,7 @@ def post_init(cls, values): _ray_scheduling_strategy_fn, num_workers, accelerator_type, + resources_per_worker, ) num_mp_workers = 0 diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 5903fb304609..10a9c527e3f1 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -109,7 +109,7 @@ def from_llm_config(cls, llm_config: LLMConfig) -> "VLLMEngineConfig": model_id=llm_config.model_id, hf_model_id=hf_model_id, mirror_config=mirror_config, - accelerator_name=llm_config.resources_per_worker, + resources_per_worker=llm_config.resources_per_worker, accelerator_type=llm_config.accelerator_type, engine_kwargs=llm_config.engine_kwargs, runtime_env=llm_config.runtime_env, From 3035c7f23dc663c20546bae8eeb06261e0712dd0 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 11:04:15 -0700 Subject: [PATCH 04/12] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/data/llm.py | 3 ++ .../ray/llm/_internal/batch/processor/base.py | 7 +-- .../batch/processor/vllm_engine_proc.py | 2 +- .../batch/stages/vllm_engine_stage.py | 51 +++++++++++-------- .../_internal/serve/configs/server_models.py | 7 +-- .../serve/deployments/llm/vllm/vllm_models.py | 14 ++--- .../observability/usage_telemetry/usage.py | 2 +- 7 files changed, 51 insertions(+), 35 deletions(-) diff --git a/python/ray/data/llm.py b/python/ray/data/llm.py index 096c768ec4f4..eb6ab620484b 100644 --- a/python/ray/data/llm.py +++ b/python/ray/data/llm.py @@ -19,6 +19,9 @@ class ProcessorConfig(_ProcessorConfig): On the other hand, small batch sizes are more fault-tolerant and could reduce bubbles in the data pipeline. You can tune the batch size to balance the throughput and fault-tolerance based on your use case. + resources_per_bundle: The resource bundles for placement groups. + You can specify a custom device label e.g. {'NPU': 1}. + The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}. accelerator_type: The accelerator type used by the LLM stage in a processor. Default to None, meaning that only the CPU will be used. concurrency: The number of workers for data parallelism. Default to 1. diff --git a/python/ray/llm/_internal/batch/processor/base.py b/python/ray/llm/_internal/batch/processor/base.py index cbcfb379c900..20dd944d9b1e 100644 --- a/python/ray/llm/_internal/batch/processor/base.py +++ b/python/ray/llm/_internal/batch/processor/base.py @@ -26,10 +26,11 @@ class ProcessorConfig(BaseModelExtended): "You can tune the batch size to balance the throughput and fault-tolerance " "based on your use case. Defaults to 64.", ) - resources_per_worker: Optional[Dict[str, float]] = Field( + resources_per_bundle: Optional[Dict[str, float]] = Field( default=None, - description="This will override the default resources config for actors/workers, " - "the default resource config for LLM Stage may be something like {'GPU': 1}.", + description="This will override the default resource bundles for placement groups. " + "You can specify a custom device label e.g. {'NPU': 1}. " + "The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.", ) accelerator_type: Optional[str] = Field( default=None, diff --git a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py index 1a356b00b6f6..cb36d67ded11 100644 --- a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py +++ b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py @@ -190,7 +190,7 @@ def build_vllm_engine_processor( # This is used to make sure we overlap batches to avoid the tail # latency of each batch. max_concurrency=config.max_concurrent_batches, - resources=config.resources_per_worker, + resources_per_bundle=config.resources_per_bundle, accelerator_type=config.accelerator_type, runtime_env=config.runtime_env, ), diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index 2fc0d8425622..c9bef7e18033 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -574,29 +574,41 @@ def __del__(self): def _ray_scheduling_strategy_fn( - num_workers_per_instance: int, - accelerator_type: str, - resources: Optional[Dict[str, float]] = None, + num_bundles_per_replica: int, + accelerator_type: Optional[str] = None, + resources_per_bundle: Optional[Dict[str, float]] = None, ): - """ - Create a Ray scheduling strategy for vLLM engine. + """Create a Ray scheduling strategy for the engine. Args: - num_workers_per_instance: The number of workers per instance. - accelerator_type: The accelerator type. + num_bundles_per_replica: The number of device bundles per + engine replica. + accelerator_type: The accelerator type. If None, the + accelerator_type label will not be set. + resources_per_bundle: The custom resources per bundle. + If None, we default to 1xGPU + 1xCPU bundle. Returns: The Ray scheduling strategy. """ def _get_bundle() -> Dict[str, float]: - bundle: Dict[str, float] = resources if resources else {"GPU": 1, "CPU": 1} + + bundle = {} + # Custom resources + if resources_per_bundle: + bundle = resources_per_bundle + else: + # GPU bundles + bundle = {"GPU": 1, "CPU": 1} + + # Accelerator type if accelerator_type: bundle[f"accelerator_type:{accelerator_type}"] = 0.001 return bundle pg = ray.util.placement_group( - [_get_bundle()] * num_workers_per_instance, + [_get_bundle()] * num_bundles_per_replica, strategy="STRICT_PACK", ) return dict( @@ -625,7 +637,7 @@ def post_init(cls, values): The updated values. """ map_batches_kwargs = values["map_batches_kwargs"] - resources_per_worker = map_batches_kwargs.get("resources") + resources_per_bundle = map_batches_kwargs.get("resources_per_bundle") accelerator_type = map_batches_kwargs.get("accelerator_type", "") fn_constructor_kwargs = values["fn_constructor_kwargs"] engine_kwargs = fn_constructor_kwargs.get("engine_kwargs", {}) @@ -637,7 +649,7 @@ def post_init(cls, values): # Setup num_workers required per vLLM engine. tp_size = engine_kwargs.get("tensor_parallel_size", 1) pp_size = engine_kwargs.get("pipeline_parallel_size", 1) - num_workers = tp_size * pp_size + num_bundles_per_replica = tp_size * pp_size # Use the MP backend by default. engine_kwargs.setdefault("distributed_executor_backend", "mp") @@ -647,24 +659,23 @@ def post_init(cls, values): # Ray Data won't reserve GPUs in advance. Instead, we specify scheduling # strategy in .map_batches() arguments and let vLLM Ray executor to # create placement groups for each TP/PP worker. - num_mp_workers = num_workers - if executor_backend == "ray" and num_workers > 1: + if executor_backend == "ray" and num_bundles_per_replica > 1: # Note that we have to use partial() to pass a function # instead of an object. map_batches_kwargs["ray_remote_args_fn"] = partial( _ray_scheduling_strategy_fn, - num_workers, + num_bundles_per_replica, accelerator_type, - resources_per_worker, + resources_per_bundle, ) - num_mp_workers = 0 - if not resources_per_worker: - map_batches_kwargs["num_gpus"] = num_mp_workers + if not resources_per_bundle: + # Default to GPUs per bundle if custom resources are not specified. + ray_remote_args["num_gpus"] = num_bundles_per_replica else: ray_remote_args["resources"] = { - key: value * num_mp_workers - for key, value in resources_per_worker.items() + resource_key: resource_count * num_bundles_per_replica + for resource_key, resource_count in resources_per_bundle.items() } map_batches_kwargs.update(ray_remote_args) diff --git a/python/ray/llm/_internal/serve/configs/server_models.py b/python/ray/llm/_internal/serve/configs/server_models.py index 8adb58c4a608..8a19aa849ca3 100644 --- a/python/ray/llm/_internal/serve/configs/server_models.py +++ b/python/ray/llm/_internal/serve/configs/server_models.py @@ -190,10 +190,11 @@ class LLMConfig(BaseModelExtended): ), ) - resources_per_worker: Optional[Dict[str, float]] = Field( + resources_per_bundle: Optional[Dict[str, float]] = Field( default=None, - description="This will pass to config like `VLLMEngineConfig` and override " - "the resources config for the workers in vLLM engine.", + description="This will override the default resource bundles for placement groups. " + "You can specify a custom device label e.g. {'NPU': 1}. " + "The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.", ) accelerator_type: Optional[str] = Field( diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 10a9c527e3f1..ae99c857eb00 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -51,7 +51,7 @@ class VLLMEngineConfig(BaseModelExtended): None, description="Configuration for cloud storage mirror. This is for where the weights are downloaded from.", ) - resources_per_worker: Optional[Dict[str, float]] = Field( + resources_per_bundle: Optional[Dict[str, float]] = Field( default=None, description="This overrides the vLLM engine worker's default resource configuration, " "the number of resources returned by `placement_bundles`.", @@ -109,7 +109,7 @@ def from_llm_config(cls, llm_config: LLMConfig) -> "VLLMEngineConfig": model_id=llm_config.model_id, hf_model_id=hf_model_id, mirror_config=mirror_config, - resources_per_worker=llm_config.resources_per_worker, + resources_per_bundle=llm_config.resources_per_bundle, accelerator_type=llm_config.accelerator_type, engine_kwargs=llm_config.engine_kwargs, runtime_env=llm_config.runtime_env, @@ -128,7 +128,7 @@ def pipeline_parallel_degree(self) -> int: return self.engine_kwargs.get("pipeline_parallel_size", 1) @property - def num_gpu_workers(self) -> int: + def num_devices(self) -> int: return self.tensor_parallel_degree * self.pipeline_parallel_degree @property @@ -140,13 +140,13 @@ def placement_strategy(self) -> str: @property def placement_bundles(self) -> List[Dict[str, float]]: - if not self.resources_per_worker: - bundle = {"GPU": 1} + if self.resources_per_bundle: + bundle = self.resources_per_bundle else: - bundle = self.resources_per_worker + bundle = {"GPU": 1, "CPU": 1} if self.accelerator_type: bundle[self.ray_accelerator_type()] = 0.001 - bundles = [bundle for _ in range(self.num_gpu_workers)] + bundles = [bundle for _ in range(self.num_devices)] return bundles diff --git a/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py b/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py index fde8b8f09afd..466d703dcdfd 100644 --- a/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py +++ b/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py @@ -266,6 +266,6 @@ def push_telemetry_report_for_all_models( max_replicas=max_replicas, tensor_parallel_degree=engine_config.tensor_parallel_degree, gpu_type=model.accelerator_type or DEFAULT_GPU_TYPE, - num_gpus=engine_config.num_gpu_workers, + num_gpus=engine_config.num_devices, ) _push_telemetry_report(telemetry_model) From a9d48f3a1ace0dbb748b80c593d0ebb86d8509d3 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 11:46:32 -0700 Subject: [PATCH 05/12] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/_internal/batch/processor/vllm_engine_proc.py | 2 +- python/ray/llm/_internal/batch/stages/vllm_engine_stage.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py index cb36d67ded11..f0d1ac527062 100644 --- a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py +++ b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py @@ -190,7 +190,7 @@ def build_vllm_engine_processor( # This is used to make sure we overlap batches to avoid the tail # latency of each batch. max_concurrency=config.max_concurrent_batches, - resources_per_bundle=config.resources_per_bundle, + resources=config.resources_per_bundle, accelerator_type=config.accelerator_type, runtime_env=config.runtime_env, ), diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index c9bef7e18033..be119b107ea8 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -637,7 +637,7 @@ def post_init(cls, values): The updated values. """ map_batches_kwargs = values["map_batches_kwargs"] - resources_per_bundle = map_batches_kwargs.get("resources_per_bundle") + resources_per_bundle = map_batches_kwargs.get("resources") accelerator_type = map_batches_kwargs.get("accelerator_type", "") fn_constructor_kwargs = values["fn_constructor_kwargs"] engine_kwargs = fn_constructor_kwargs.get("engine_kwargs", {}) From a845100927d0e94a1e5b0cc98793adf683b8f5ed Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 18:29:17 -0700 Subject: [PATCH 06/12] wip Signed-off-by: Kourosh Hakhamaneshi --- .../serve/deployments/llm/vllm/vllm_models.py | 2 +- .../llm/tests/serve/configs/test_models.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index ae99c857eb00..c5f50909d1fa 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -143,7 +143,7 @@ def placement_bundles(self) -> List[Dict[str, float]]: if self.resources_per_bundle: bundle = self.resources_per_bundle else: - bundle = {"GPU": 1, "CPU": 1} + bundle = {"GPU": 1} if self.accelerator_type: bundle[self.ray_accelerator_type()] = 0.001 bundles = [bundle for _ in range(self.num_devices)] diff --git a/python/ray/llm/tests/serve/configs/test_models.py b/python/ray/llm/tests/serve/configs/test_models.py index 5312ed782193..6925f5c3af1c 100644 --- a/python/ray/llm/tests/serve/configs/test_models.py +++ b/python/ray/llm/tests/serve/configs/test_models.py @@ -211,6 +211,29 @@ def test_get_serve_options_without_accelerator_type(self): "name": "Test:test_model", } assert serve_options == expected_options + + + def test_resources_per_bundle(self): + """Test that resources_per_bundle is correctly parsed.""" + + # Test the default resource bundle + serve_options = LLMConfig( + model_loading_config=dict(model_id="test_model"), + engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), + ).get_serve_options(name_prefix="Test:") + assert serve_options["placement_group_bundles"] == [ + {"GPU": 1} for _ in range(6) + ] + + # Test the custom resource bundle + serve_options = LLMConfig( + model_loading_config=dict(model_id="test_model"), + engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), + resources_per_bundle={"XPU": 1}, + ).get_serve_options(name_prefix="Test:") + assert serve_options["placement_group_bundles"] == [ + {"XPU": 1} for _ in range(6) + ] if __name__ == "__main__": From fd051be93ea40fab3610bb311af2d63237759201 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 18:30:16 -0700 Subject: [PATCH 07/12] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/tests/serve/configs/test_models.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/llm/tests/serve/configs/test_models.py b/python/ray/llm/tests/serve/configs/test_models.py index 6925f5c3af1c..7a898500a7d8 100644 --- a/python/ray/llm/tests/serve/configs/test_models.py +++ b/python/ray/llm/tests/serve/configs/test_models.py @@ -211,11 +211,10 @@ def test_get_serve_options_without_accelerator_type(self): "name": "Test:test_model", } assert serve_options == expected_options - - + def test_resources_per_bundle(self): """Test that resources_per_bundle is correctly parsed.""" - + # Test the default resource bundle serve_options = LLMConfig( model_loading_config=dict(model_id="test_model"), From 93fadc1536745f61c1d6ccdf0296e63a732e3f2b Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 18:41:55 -0700 Subject: [PATCH 08/12] fixed test Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/tests/serve/configs/test_models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/llm/tests/serve/configs/test_models.py b/python/ray/llm/tests/serve/configs/test_models.py index 7a898500a7d8..88d5faba19a9 100644 --- a/python/ray/llm/tests/serve/configs/test_models.py +++ b/python/ray/llm/tests/serve/configs/test_models.py @@ -220,7 +220,7 @@ def test_resources_per_bundle(self): model_loading_config=dict(model_id="test_model"), engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), ).get_serve_options(name_prefix="Test:") - assert serve_options["placement_group_bundles"] == [ + assert serve_options["placement_group_bundles"] == [{'CPU': 1, 'GPU': 0}] + [ {"GPU": 1} for _ in range(6) ] @@ -230,7 +230,7 @@ def test_resources_per_bundle(self): engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), resources_per_bundle={"XPU": 1}, ).get_serve_options(name_prefix="Test:") - assert serve_options["placement_group_bundles"] == [ + assert serve_options["placement_group_bundles"] == [{'CPU': 1, 'GPU': 0}] + [ {"XPU": 1} for _ in range(6) ] From 4892ea2799ed04de76bf792ae5298906ab25fc5f Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 18:42:54 -0700 Subject: [PATCH 09/12] fixed the test Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/tests/serve/configs/test_models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/llm/tests/serve/configs/test_models.py b/python/ray/llm/tests/serve/configs/test_models.py index 88d5faba19a9..cd92a9aa8dfe 100644 --- a/python/ray/llm/tests/serve/configs/test_models.py +++ b/python/ray/llm/tests/serve/configs/test_models.py @@ -220,7 +220,7 @@ def test_resources_per_bundle(self): model_loading_config=dict(model_id="test_model"), engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), ).get_serve_options(name_prefix="Test:") - assert serve_options["placement_group_bundles"] == [{'CPU': 1, 'GPU': 0}] + [ + assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [ {"GPU": 1} for _ in range(6) ] @@ -230,7 +230,7 @@ def test_resources_per_bundle(self): engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), resources_per_bundle={"XPU": 1}, ).get_serve_options(name_prefix="Test:") - assert serve_options["placement_group_bundles"] == [{'CPU': 1, 'GPU': 0}] + [ + assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [ {"XPU": 1} for _ in range(6) ] From a024fadd5a3931e058f8f3c21bde960e08d8ab48 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 22:13:47 -0700 Subject: [PATCH 10/12] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py b/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py index bcdb5c6a74a3..4de75f4d3ca3 100644 --- a/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py +++ b/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py @@ -91,7 +91,7 @@ def test_vllm_engine_stage_post_init(gpu_type, model_llama_3_2_216M): "concurrency": 1, "max_concurrency": 4, "accelerator_type": gpu_type, - "num_gpus": 0, + "num_gpus": 8, } scheduling_strategy = ray_remote_args_fn()["scheduling_strategy"] assert isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy) From a4eee08f61b1a9cb2d5d99502cc2086d8bb4de5f Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 20 Mar 2025 22:29:40 -0700 Subject: [PATCH 11/12] wip Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/batch/stages/vllm_engine_stage.py | 17 +++++++++-------- .../batch/gpu/stages/test_vllm_engine_stage.py | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index be119b107ea8..34fd7f188235 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -668,15 +668,16 @@ def post_init(cls, values): accelerator_type, resources_per_bundle, ) - - if not resources_per_bundle: - # Default to GPUs per bundle if custom resources are not specified. - ray_remote_args["num_gpus"] = num_bundles_per_replica + ray_remote_args["num_gpus"] = 0 else: - ray_remote_args["resources"] = { - resource_key: resource_count * num_bundles_per_replica - for resource_key, resource_count in resources_per_bundle.items() - } + if not resources_per_bundle: + # Default to GPUs per bundle if custom resources are not specified. + ray_remote_args["num_gpus"] = num_bundles_per_replica + else: + ray_remote_args["resources"] = { + resource_key: resource_count * num_bundles_per_replica + for resource_key, resource_count in resources_per_bundle.items() + } map_batches_kwargs.update(ray_remote_args) return values diff --git a/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py b/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py index 4de75f4d3ca3..bcdb5c6a74a3 100644 --- a/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py +++ b/python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py @@ -91,7 +91,7 @@ def test_vllm_engine_stage_post_init(gpu_type, model_llama_3_2_216M): "concurrency": 1, "max_concurrency": 4, "accelerator_type": gpu_type, - "num_gpus": 8, + "num_gpus": 0, } scheduling_strategy = ray_remote_args_fn()["scheduling_strategy"] assert isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy) From 8ae9b33a8b17faadcb1163681012b4de850d0051 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Fri, 21 Mar 2025 08:44:28 -0700 Subject: [PATCH 12/12] fix tests Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/_internal/batch/stages/vllm_engine_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index 34fd7f188235..013e361b9f6c 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -637,7 +637,6 @@ def post_init(cls, values): The updated values. """ map_batches_kwargs = values["map_batches_kwargs"] - resources_per_bundle = map_batches_kwargs.get("resources") accelerator_type = map_batches_kwargs.get("accelerator_type", "") fn_constructor_kwargs = values["fn_constructor_kwargs"] engine_kwargs = fn_constructor_kwargs.get("engine_kwargs", {}) @@ -659,6 +658,7 @@ def post_init(cls, values): # Ray Data won't reserve GPUs in advance. Instead, we specify scheduling # strategy in .map_batches() arguments and let vLLM Ray executor to # create placement groups for each TP/PP worker. + resources_per_bundle = map_batches_kwargs.pop("resources", None) if executor_backend == "ray" and num_bundles_per_replica > 1: # Note that we have to use partial() to pass a function # instead of an object.