diff --git a/python/ray/data/llm.py b/python/ray/data/llm.py index 096c768ec4f4c..eb6ab620484bd 100644 --- a/python/ray/data/llm.py +++ b/python/ray/data/llm.py @@ -19,6 +19,9 @@ class ProcessorConfig(_ProcessorConfig): On the other hand, small batch sizes are more fault-tolerant and could reduce bubbles in the data pipeline. You can tune the batch size to balance the throughput and fault-tolerance based on your use case. + resources_per_bundle: The resource bundles for placement groups. + You can specify a custom device label e.g. {'NPU': 1}. + The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}. accelerator_type: The accelerator type used by the LLM stage in a processor. Default to None, meaning that only the CPU will be used. concurrency: The number of workers for data parallelism. Default to 1. diff --git a/python/ray/llm/_internal/batch/processor/base.py b/python/ray/llm/_internal/batch/processor/base.py index a05981c8d1859..20dd944d9b1e4 100644 --- a/python/ray/llm/_internal/batch/processor/base.py +++ b/python/ray/llm/_internal/batch/processor/base.py @@ -26,6 +26,12 @@ class ProcessorConfig(BaseModelExtended): "You can tune the batch size to balance the throughput and fault-tolerance " "based on your use case. Defaults to 64.", ) + resources_per_bundle: Optional[Dict[str, float]] = Field( + default=None, + description="This will override the default resource bundles for placement groups. " + "You can specify a custom device label e.g. {'NPU': 1}. " + "The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.", + ) accelerator_type: Optional[str] = Field( default=None, description="The accelerator type used by the LLM stage in a processor. " diff --git a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py index 97dc83227cdf2..f0d1ac5270620 100644 --- a/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py +++ b/python/ray/llm/_internal/batch/processor/vllm_engine_proc.py @@ -190,6 +190,7 @@ def build_vllm_engine_processor( # This is used to make sure we overlap batches to avoid the tail # latency of each batch. max_concurrency=config.max_concurrent_batches, + resources=config.resources_per_bundle, accelerator_type=config.accelerator_type, runtime_env=config.runtime_env, ), diff --git a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py index c5099347858d2..013e361b9f6c6 100644 --- a/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py +++ b/python/ray/llm/_internal/batch/stages/vllm_engine_stage.py @@ -573,26 +573,42 @@ def __del__(self): self.llm.shutdown() -def _ray_scheduling_strategy_fn(num_gpus_per_instance: int, accelerator_type: str): - """ - Create a Ray scheduling strategy for vLLM engine. +def _ray_scheduling_strategy_fn( + num_bundles_per_replica: int, + accelerator_type: Optional[str] = None, + resources_per_bundle: Optional[Dict[str, float]] = None, +): + """Create a Ray scheduling strategy for the engine. Args: - num_gpus_per_instance: The number of GPUs per instance. - accelerator_type: The accelerator type. + num_bundles_per_replica: The number of device bundles per + engine replica. + accelerator_type: The accelerator type. If None, the + accelerator_type label will not be set. + resources_per_bundle: The custom resources per bundle. + If None, we default to 1xGPU + 1xCPU bundle. Returns: The Ray scheduling strategy. """ def _get_bundle() -> Dict[str, float]: - bundle: Dict[str, float] = {"GPU": 1, "CPU": 1} + + bundle = {} + # Custom resources + if resources_per_bundle: + bundle = resources_per_bundle + else: + # GPU bundles + bundle = {"GPU": 1, "CPU": 1} + + # Accelerator type if accelerator_type: bundle[f"accelerator_type:{accelerator_type}"] = 0.001 return bundle pg = ray.util.placement_group( - [_get_bundle()] * num_gpus_per_instance, + [_get_bundle()] * num_bundles_per_replica, strategy="STRICT_PACK", ) return dict( @@ -629,29 +645,39 @@ def post_init(cls, values): if accelerator_type: ray_remote_args["accelerator_type"] = accelerator_type - # Setup num_gpus required per vLLM engine. + # Setup num_workers required per vLLM engine. tp_size = engine_kwargs.get("tensor_parallel_size", 1) pp_size = engine_kwargs.get("pipeline_parallel_size", 1) - num_gpus = tp_size * pp_size + num_bundles_per_replica = tp_size * pp_size # Use the MP backend by default. engine_kwargs.setdefault("distributed_executor_backend", "mp") executor_backend = engine_kwargs.get("distributed_executor_backend") - # When Ray is used in the vLLM engine, we set num_gpus to 0 so that + # When Ray is used in the vLLM engine, we set num_devices to 0 so that # Ray Data won't reserve GPUs in advance. Instead, we specify scheduling # strategy in .map_batches() arguments and let vLLM Ray executor to # create placement groups for each TP/PP worker. - if executor_backend == "ray" and num_gpus > 1: + resources_per_bundle = map_batches_kwargs.pop("resources", None) + if executor_backend == "ray" and num_bundles_per_replica > 1: # Note that we have to use partial() to pass a function # instead of an object. map_batches_kwargs["ray_remote_args_fn"] = partial( _ray_scheduling_strategy_fn, - num_gpus, + num_bundles_per_replica, accelerator_type, + resources_per_bundle, ) - num_gpus = 0 + ray_remote_args["num_gpus"] = 0 + else: + if not resources_per_bundle: + # Default to GPUs per bundle if custom resources are not specified. + ray_remote_args["num_gpus"] = num_bundles_per_replica + else: + ray_remote_args["resources"] = { + resource_key: resource_count * num_bundles_per_replica + for resource_key, resource_count in resources_per_bundle.items() + } - map_batches_kwargs["num_gpus"] = num_gpus map_batches_kwargs.update(ray_remote_args) return values diff --git a/python/ray/llm/_internal/serve/configs/server_models.py b/python/ray/llm/_internal/serve/configs/server_models.py index b47cba60c0d30..8a19aa849ca35 100644 --- a/python/ray/llm/_internal/serve/configs/server_models.py +++ b/python/ray/llm/_internal/serve/configs/server_models.py @@ -190,6 +190,13 @@ class LLMConfig(BaseModelExtended): ), ) + resources_per_bundle: Optional[Dict[str, float]] = Field( + default=None, + description="This will override the default resource bundles for placement groups. " + "You can specify a custom device label e.g. {'NPU': 1}. " + "The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.", + ) + accelerator_type: Optional[str] = Field( default=None, description=f"The type of accelerator runs the model on. Only the following values are supported: {str([t.value for t in GPUType])}", diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 081bdbbefbc09..c5f50909d1faa 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -51,6 +51,11 @@ class VLLMEngineConfig(BaseModelExtended): None, description="Configuration for cloud storage mirror. This is for where the weights are downloaded from.", ) + resources_per_bundle: Optional[Dict[str, float]] = Field( + default=None, + description="This overrides the vLLM engine worker's default resource configuration, " + "the number of resources returned by `placement_bundles`.", + ) accelerator_type: Optional[GPUType] = Field( None, description="The type of accelerator to use. This is used to determine the placement group strategy.", @@ -104,6 +109,7 @@ def from_llm_config(cls, llm_config: LLMConfig) -> "VLLMEngineConfig": model_id=llm_config.model_id, hf_model_id=hf_model_id, mirror_config=mirror_config, + resources_per_bundle=llm_config.resources_per_bundle, accelerator_type=llm_config.accelerator_type, engine_kwargs=llm_config.engine_kwargs, runtime_env=llm_config.runtime_env, @@ -122,7 +128,7 @@ def pipeline_parallel_degree(self) -> int: return self.engine_kwargs.get("pipeline_parallel_size", 1) @property - def num_gpu_workers(self) -> int: + def num_devices(self) -> int: return self.tensor_parallel_degree * self.pipeline_parallel_degree @property @@ -134,10 +140,13 @@ def placement_strategy(self) -> str: @property def placement_bundles(self) -> List[Dict[str, float]]: - bundle = {"GPU": 1} + if self.resources_per_bundle: + bundle = self.resources_per_bundle + else: + bundle = {"GPU": 1} if self.accelerator_type: bundle[self.ray_accelerator_type()] = 0.001 - bundles = [bundle for _ in range(self.num_gpu_workers)] + bundles = [bundle for _ in range(self.num_devices)] return bundles diff --git a/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py b/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py index fde8b8f09afd4..466d703dcdfd5 100644 --- a/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py +++ b/python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py @@ -266,6 +266,6 @@ def push_telemetry_report_for_all_models( max_replicas=max_replicas, tensor_parallel_degree=engine_config.tensor_parallel_degree, gpu_type=model.accelerator_type or DEFAULT_GPU_TYPE, - num_gpus=engine_config.num_gpu_workers, + num_gpus=engine_config.num_devices, ) _push_telemetry_report(telemetry_model) diff --git a/python/ray/llm/tests/serve/configs/test_models.py b/python/ray/llm/tests/serve/configs/test_models.py index 5312ed7821933..cd92a9aa8dfe0 100644 --- a/python/ray/llm/tests/serve/configs/test_models.py +++ b/python/ray/llm/tests/serve/configs/test_models.py @@ -212,6 +212,28 @@ def test_get_serve_options_without_accelerator_type(self): } assert serve_options == expected_options + def test_resources_per_bundle(self): + """Test that resources_per_bundle is correctly parsed.""" + + # Test the default resource bundle + serve_options = LLMConfig( + model_loading_config=dict(model_id="test_model"), + engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), + ).get_serve_options(name_prefix="Test:") + assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [ + {"GPU": 1} for _ in range(6) + ] + + # Test the custom resource bundle + serve_options = LLMConfig( + model_loading_config=dict(model_id="test_model"), + engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), + resources_per_bundle={"XPU": 1}, + ).get_serve_options(name_prefix="Test:") + assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [ + {"XPU": 1} for _ in range(6) + ] + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/accelerators/accelerators.py b/python/ray/util/accelerators/accelerators.py index c7099fdd109d1..63c4f2f10ada8 100644 --- a/python/ray/util/accelerators/accelerators.py +++ b/python/ray/util/accelerators/accelerators.py @@ -26,6 +26,8 @@ GOOGLE_TPU_V5P = "TPU-V5P" GOOGLE_TPU_V5LITEPOD = "TPU-V5LITEPOD" GOOGLE_TPU_V6E = "TPU-V6E" +HUAWEI_NPU_910B = "Ascend910B" +HUAWEI_NPU_910B4 = "Ascend910B4" # Use these instead of NVIDIA_A100 if you need a specific accelerator size. Note that # these labels are not auto-added to nodes, you'll have to add them manually in