Skip to content

[llm] ray.llm support custom accelerators #51359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/ray/data/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class ProcessorConfig(_ProcessorConfig):
On the other hand, small batch sizes are more fault-tolerant and could
reduce bubbles in the data pipeline. You can tune the batch size to balance
the throughput and fault-tolerance based on your use case.
resources_per_bundle: The resource bundles for placement groups.
You can specify a custom device label e.g. {'NPU': 1}.
The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.
accelerator_type: The accelerator type used by the LLM stage in a processor.
Default to None, meaning that only the CPU will be used.
concurrency: The number of workers for data parallelism. Default to 1.
Expand Down
6 changes: 6 additions & 0 deletions python/ray/llm/_internal/batch/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class ProcessorConfig(BaseModelExtended):
"You can tune the batch size to balance the throughput and fault-tolerance "
"based on your use case. Defaults to 64.",
)
resources_per_bundle: Optional[Dict[str, float]] = Field(
default=None,
description="This will override the default resource bundles for placement groups. "
"You can specify a custom device label e.g. {'NPU': 1}. "
"The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.",
)
accelerator_type: Optional[str] = Field(
default=None,
description="The accelerator type used by the LLM stage in a processor. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def build_vllm_engine_processor(
# This is used to make sure we overlap batches to avoid the tail
# latency of each batch.
max_concurrency=config.max_concurrent_batches,
resources=config.resources_per_bundle,
accelerator_type=config.accelerator_type,
runtime_env=config.runtime_env,
),
Expand Down
54 changes: 40 additions & 14 deletions python/ray/llm/_internal/batch/stages/vllm_engine_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,26 +573,42 @@ def __del__(self):
self.llm.shutdown()


def _ray_scheduling_strategy_fn(num_gpus_per_instance: int, accelerator_type: str):
"""
Create a Ray scheduling strategy for vLLM engine.
def _ray_scheduling_strategy_fn(
num_bundles_per_replica: int,
accelerator_type: Optional[str] = None,
resources_per_bundle: Optional[Dict[str, float]] = None,
):
"""Create a Ray scheduling strategy for the engine.

Args:
num_gpus_per_instance: The number of GPUs per instance.
accelerator_type: The accelerator type.
num_bundles_per_replica: The number of device bundles per
engine replica.
accelerator_type: The accelerator type. If None, the
accelerator_type label will not be set.
resources_per_bundle: The custom resources per bundle.
If None, we default to 1xGPU + 1xCPU bundle.

Returns:
The Ray scheduling strategy.
"""

def _get_bundle() -> Dict[str, float]:
bundle: Dict[str, float] = {"GPU": 1, "CPU": 1}

bundle = {}
# Custom resources
if resources_per_bundle:
bundle = resources_per_bundle
else:
# GPU bundles
bundle = {"GPU": 1, "CPU": 1}

# Accelerator type
if accelerator_type:
bundle[f"accelerator_type:{accelerator_type}"] = 0.001
return bundle

pg = ray.util.placement_group(
[_get_bundle()] * num_gpus_per_instance,
[_get_bundle()] * num_bundles_per_replica,
strategy="STRICT_PACK",
)
return dict(
Expand Down Expand Up @@ -629,29 +645,39 @@ def post_init(cls, values):
if accelerator_type:
ray_remote_args["accelerator_type"] = accelerator_type

# Setup num_gpus required per vLLM engine.
# Setup num_workers required per vLLM engine.
tp_size = engine_kwargs.get("tensor_parallel_size", 1)
pp_size = engine_kwargs.get("pipeline_parallel_size", 1)
num_gpus = tp_size * pp_size
num_bundles_per_replica = tp_size * pp_size

# Use the MP backend by default.
engine_kwargs.setdefault("distributed_executor_backend", "mp")
executor_backend = engine_kwargs.get("distributed_executor_backend")

# When Ray is used in the vLLM engine, we set num_gpus to 0 so that
# When Ray is used in the vLLM engine, we set num_devices to 0 so that
# Ray Data won't reserve GPUs in advance. Instead, we specify scheduling
# strategy in .map_batches() arguments and let vLLM Ray executor to
# create placement groups for each TP/PP worker.
if executor_backend == "ray" and num_gpus > 1:
resources_per_bundle = map_batches_kwargs.pop("resources", None)
if executor_backend == "ray" and num_bundles_per_replica > 1:
# Note that we have to use partial() to pass a function
# instead of an object.
map_batches_kwargs["ray_remote_args_fn"] = partial(
_ray_scheduling_strategy_fn,
num_gpus,
num_bundles_per_replica,
accelerator_type,
resources_per_bundle,
)
num_gpus = 0
ray_remote_args["num_gpus"] = 0
else:
if not resources_per_bundle:
# Default to GPUs per bundle if custom resources are not specified.
ray_remote_args["num_gpus"] = num_bundles_per_replica
else:
ray_remote_args["resources"] = {
resource_key: resource_count * num_bundles_per_replica
for resource_key, resource_count in resources_per_bundle.items()
}

map_batches_kwargs["num_gpus"] = num_gpus
map_batches_kwargs.update(ray_remote_args)
return values
7 changes: 7 additions & 0 deletions python/ray/llm/_internal/serve/configs/server_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ class LLMConfig(BaseModelExtended):
),
)

resources_per_bundle: Optional[Dict[str, float]] = Field(
default=None,
description="This will override the default resource bundles for placement groups. "
"You can specify a custom device label e.g. {'NPU': 1}. "
"The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.",
)

accelerator_type: Optional[str] = Field(
default=None,
description=f"The type of accelerator runs the model on. Only the following values are supported: {str([t.value for t in GPUType])}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class VLLMEngineConfig(BaseModelExtended):
None,
description="Configuration for cloud storage mirror. This is for where the weights are downloaded from.",
)
resources_per_bundle: Optional[Dict[str, float]] = Field(
default=None,
description="This overrides the vLLM engine worker's default resource configuration, "
"the number of resources returned by `placement_bundles`.",
)
accelerator_type: Optional[GPUType] = Field(
None,
description="The type of accelerator to use. This is used to determine the placement group strategy.",
Expand Down Expand Up @@ -104,6 +109,7 @@ def from_llm_config(cls, llm_config: LLMConfig) -> "VLLMEngineConfig":
model_id=llm_config.model_id,
hf_model_id=hf_model_id,
mirror_config=mirror_config,
resources_per_bundle=llm_config.resources_per_bundle,
accelerator_type=llm_config.accelerator_type,
engine_kwargs=llm_config.engine_kwargs,
runtime_env=llm_config.runtime_env,
Expand All @@ -122,7 +128,7 @@ def pipeline_parallel_degree(self) -> int:
return self.engine_kwargs.get("pipeline_parallel_size", 1)

@property
def num_gpu_workers(self) -> int:
def num_devices(self) -> int:
return self.tensor_parallel_degree * self.pipeline_parallel_degree

@property
Expand All @@ -134,10 +140,13 @@ def placement_strategy(self) -> str:

@property
def placement_bundles(self) -> List[Dict[str, float]]:
bundle = {"GPU": 1}
if self.resources_per_bundle:
bundle = self.resources_per_bundle
else:
bundle = {"GPU": 1}
if self.accelerator_type:
bundle[self.ray_accelerator_type()] = 0.001
bundles = [bundle for _ in range(self.num_gpu_workers)]
bundles = [bundle for _ in range(self.num_devices)]

return bundles

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,6 @@ def push_telemetry_report_for_all_models(
max_replicas=max_replicas,
tensor_parallel_degree=engine_config.tensor_parallel_degree,
gpu_type=model.accelerator_type or DEFAULT_GPU_TYPE,
num_gpus=engine_config.num_gpu_workers,
num_gpus=engine_config.num_devices,
)
_push_telemetry_report(telemetry_model)
22 changes: 22 additions & 0 deletions python/ray/llm/tests/serve/configs/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,28 @@ def test_get_serve_options_without_accelerator_type(self):
}
assert serve_options == expected_options

def test_resources_per_bundle(self):
"""Test that resources_per_bundle is correctly parsed."""

# Test the default resource bundle
serve_options = LLMConfig(
model_loading_config=dict(model_id="test_model"),
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
).get_serve_options(name_prefix="Test:")
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
{"GPU": 1} for _ in range(6)
]

# Test the custom resource bundle
serve_options = LLMConfig(
model_loading_config=dict(model_id="test_model"),
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
resources_per_bundle={"XPU": 1},
).get_serve_options(name_prefix="Test:")
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
{"XPU": 1} for _ in range(6)
]


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
2 changes: 2 additions & 0 deletions python/ray/util/accelerators/accelerators.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
GOOGLE_TPU_V5P = "TPU-V5P"
GOOGLE_TPU_V5LITEPOD = "TPU-V5LITEPOD"
GOOGLE_TPU_V6E = "TPU-V6E"
HUAWEI_NPU_910B = "Ascend910B"
HUAWEI_NPU_910B4 = "Ascend910B4"

# Use these instead of NVIDIA_A100 if you need a specific accelerator size. Note that
# these labels are not auto-added to nodes, you'll have to add them manually in
Expand Down