diff --git a/config.yml b/config.yml index 847d94e..42b3b5a 100644 --- a/config.yml +++ b/config.yml @@ -11,3 +11,8 @@ tokenizer: pretrained_model_name_or_path: HuggingFaceTB/SmolLM2-135M-Instruct data: type: shareGPT +metrics_client: + type: prometheus + prometheus: + url: http://localhost:9090 + scrape_interval: 15 \ No newline at end of file diff --git a/inference_perf/client/base.py b/inference_perf/client/base.py index c63c2c1..ca40dba 100644 --- a/inference_perf/client/base.py +++ b/inference_perf/client/base.py @@ -12,9 +12,82 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod -from typing import Tuple +from typing import List, Tuple, TypedDict + +from pydantic import BaseModel from inference_perf.datagen import InferenceData -from inference_perf.reportgen import ReportGenerator + + +class RequestMetric(BaseModel): + stage_id: int + prompt_tokens: int + output_tokens: int + time_per_request: float + + +class ModelServerPrometheusMetric: + def __init__(self, name: str, op: str, type: str, filters: str) -> None: + self.name = name + self.op = op + self.type = type + self.filters = filters + + +class ModelServerMetrics(BaseModel): + # Throughput + prompt_tokens_per_second: float = 0.0 + output_tokens_per_second: float = 0.0 + requests_per_second: float = 0.0 + + # Latency + avg_request_latency: float = 0.0 + median_request_latency: float = 0.0 + p90_request_latency: float = 0.0 + p99_request_latency: float = 0.0 + avg_time_to_first_token: float = 0.0 + median_time_to_first_token: float = 0.0 + p90_time_to_first_token: float = 0.0 + p99_time_to_first_token: float = 0.0 + avg_time_per_output_token: float = 0.0 + median_time_per_output_token: float = 0.0 + p90_time_per_output_token: float = 0.0 + p99_time_per_output_token: float = 0.0 + + # Request + total_requests: int = 0 + avg_prompt_tokens: int = 0 + avg_output_tokens: int = 0 + avg_queue_length: int = 0 + + +# PrometheusMetricMetadata stores the mapping of metrics to their model server names and types +# and the filters to be applied to them. +# This is used to generate Prometheus query for the metrics. +class PrometheusMetricMetadata(TypedDict): + # Throughput + prompt_tokens_per_second: ModelServerPrometheusMetric + output_tokens_per_second: ModelServerPrometheusMetric + requests_per_second: ModelServerPrometheusMetric + + # Latency + avg_request_latency: ModelServerPrometheusMetric + median_request_latency: ModelServerPrometheusMetric + p90_request_latency: ModelServerPrometheusMetric + p99_request_latency: ModelServerPrometheusMetric + avg_time_to_first_token: ModelServerPrometheusMetric + median_time_to_first_token: ModelServerPrometheusMetric + p90_time_to_first_token: ModelServerPrometheusMetric + p99_time_to_first_token: ModelServerPrometheusMetric + avg_time_per_output_token: ModelServerPrometheusMetric + median_time_per_output_token: ModelServerPrometheusMetric + p90_time_per_output_token: ModelServerPrometheusMetric + p99_time_per_output_token: ModelServerPrometheusMetric + + # Request + total_requests: ModelServerPrometheusMetric + avg_prompt_tokens: ModelServerPrometheusMetric + avg_output_tokens: ModelServerPrometheusMetric + avg_queue_length: ModelServerPrometheusMetric class ModelServerClient(ABC): @@ -23,9 +96,14 @@ def __init__(self, *args: Tuple[int, ...]) -> None: pass @abstractmethod - def set_report_generator(self, reportgen: ReportGenerator) -> None: - self.reportgen = reportgen + async def process_request(self, data: InferenceData, stage_id: int) -> None: + raise NotImplementedError @abstractmethod - async def process_request(self, data: InferenceData, stage_id: int) -> None: + def get_request_metrics(self) -> List[RequestMetric]: + raise NotImplementedError + + @abstractmethod + def get_prometheus_metric_metadata(self) -> PrometheusMetricMetadata: + # assumption: all metrics clients have metrics exported in Prometheus format raise NotImplementedError diff --git a/inference_perf/client/mock_client.py b/inference_perf/client/mock_client.py index 6fd6497..5ef4396 100644 --- a/inference_perf/client/mock_client.py +++ b/inference_perf/client/mock_client.py @@ -11,23 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from inference_perf.datagen import InferenceData -from inference_perf.reportgen import ReportGenerator, RequestMetric -from .base import ModelServerClient import asyncio +from typing import List +from inference_perf.datagen import InferenceData +from .base import ModelServerClient, RequestMetric class MockModelServerClient(ModelServerClient): def __init__(self) -> None: - pass - - def set_report_generator(self, reportgen: ReportGenerator) -> None: - self.reportgen = reportgen + self.request_metrics: List[RequestMetric] = list() async def process_request(self, payload: InferenceData, stage_id: int) -> None: print("Processing request - " + str(payload.data) + " for stage - " + str(stage_id)) await asyncio.sleep(3) - self.reportgen.collect_request_metrics( + self.request_metrics.append( RequestMetric( stage_id=stage_id, prompt_tokens=0, @@ -35,3 +32,6 @@ async def process_request(self, payload: InferenceData, stage_id: int) -> None: time_per_request=3, ) ) + + def get_request_metrics(self) -> List[RequestMetric]: + return self.request_metrics diff --git a/inference_perf/client/vllm_client.py b/inference_perf/client/vllm_client.py index ed9fa3e..ae227e4 100644 --- a/inference_perf/client/vllm_client.py +++ b/inference_perf/client/vllm_client.py @@ -12,11 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. from inference_perf.datagen import InferenceData -from inference_perf.reportgen import ReportGenerator, RequestMetric from inference_perf.config import APIType, CustomTokenizerConfig from inference_perf.utils import CustomTokenizer -from .base import ModelServerClient -from typing import Any, Optional +from .base import ModelServerClient, ModelServerPrometheusMetric, PrometheusMetricMetadata, RequestMetric +from typing import Any, Optional, List import aiohttp import json import time @@ -42,9 +41,67 @@ def __init__(self, uri: str, model_name: str, tokenizer: Optional[CustomTokenize print("Falling back to usage metrics.") else: print("Tokenizer path is empty. Falling back to usage metrics.") + self.request_metrics: List[RequestMetric] = list() - def set_report_generator(self, reportgen: ReportGenerator) -> None: - self.reportgen = reportgen + self.prometheus_metric_metadata: PrometheusMetricMetadata = { + "avg_queue_length": ModelServerPrometheusMetric( + "vllm:num_requests_waiting", "mean", "gauge", "model_name='%s'" % self.model_name + ), + "avg_time_to_first_token": ModelServerPrometheusMetric( + "vllm:time_to_first_token_seconds", "mean", "histogram", "model_name='%s'" % self.model_name + ), + "median_time_to_first_token": ModelServerPrometheusMetric( + "vllm:time_to_first_token_seconds", "median", "histogram", "model_name='%s'" % self.model_name + ), + "p90_time_to_first_token": ModelServerPrometheusMetric( + "vllm:time_to_first_token_seconds", "p90", "histogram", "model_name='%s'" % self.model_name + ), + "p99_time_to_first_token": ModelServerPrometheusMetric( + "vllm:time_to_first_token_seconds", "p99", "histogram", "model_name='%s'" % self.model_name + ), + "avg_time_per_output_token": ModelServerPrometheusMetric( + "vllm:time_per_output_token_seconds", "mean", "histogram", "model_name='%s'" % self.model_name + ), + "median_time_per_output_token": ModelServerPrometheusMetric( + "vllm:time_per_output_token_seconds", "median", "histogram", "model_name='%s'" % self.model_name + ), + "p90_time_per_output_token": ModelServerPrometheusMetric( + "vllm:time_per_output_token_seconds", "p90", "histogram", "model_name='%s'" % self.model_name + ), + "p99_time_per_output_token": ModelServerPrometheusMetric( + "vllm:time_per_output_token_seconds", "p99", "histogram", "model_name='%s'" % self.model_name + ), + "avg_prompt_tokens": ModelServerPrometheusMetric( + "vllm:prompt_tokens_total", "mean", "counter", "model_name='%s'" % self.model_name + ), + "prompt_tokens_per_second": ModelServerPrometheusMetric( + "vllm:prompt_tokens_total", "rate", "counter", "model_name='%s'" % self.model_name + ), + "avg_output_tokens": ModelServerPrometheusMetric( + "vllm:generation_tokens_total", "mean", "counter", "model_name='%s'" % self.model_name + ), + "output_tokens_per_second": ModelServerPrometheusMetric( + "vllm:generation_tokens_total", "rate", "counter", "model_name='%s'" % self.model_name + ), + "total_requests": ModelServerPrometheusMetric( + "vllm:e2e_request_latency_seconds_count", "increase", "counter", "model_name='%s'" % self.model_name + ), + "requests_per_second": ModelServerPrometheusMetric( + "vllm:e2e_request_latency_seconds_count", "rate", "counter", "model_name='%s'" % self.model_name + ), + "avg_request_latency": ModelServerPrometheusMetric( + "vllm:e2e_request_latency_seconds", "mean", "histogram", "model_name='%s'" % self.model_name + ), + "median_request_latency": ModelServerPrometheusMetric( + "vllm:e2e_request_latency_seconds", "median", "histogram", "model_name='%s'" % self.model_name + ), + "p90_request_latency": ModelServerPrometheusMetric( + "vllm:e2e_request_latency_seconds", "p90", "histogram", "model_name='%s'" % self.model_name + ), + "p99_request_latency": ModelServerPrometheusMetric( + "vllm:e2e_request_latency_seconds", "p99", "histogram", "model_name='%s'" % self.model_name + ), + } def _create_payload(self, payload: InferenceData) -> dict[str, Any]: if payload.type == APIType.Completion: @@ -93,7 +150,7 @@ async def process_request(self, data: InferenceData, stage_id: int) -> None: prompt_tokens = usage.get("prompt_tokens", 0) output_tokens = usage.get("completion_tokens", 0) - self.reportgen.collect_request_metrics( + self.request_metrics.append( RequestMetric( stage_id=stage_id, prompt_tokens=prompt_tokens, @@ -105,3 +162,9 @@ async def process_request(self, data: InferenceData, stage_id: int) -> None: print(await response.text()) except aiohttp.ClientConnectorError as e: print("vLLM Server connection error:\n", str(e)) + + def get_request_metrics(self) -> List[RequestMetric]: + return self.request_metrics + + def get_prometheus_metric_metadata(self) -> PrometheusMetricMetadata: + return self.prometheus_metric_metadata diff --git a/inference_perf/config.py b/inference_perf/config.py index cf4769d..1fd68ea 100644 --- a/inference_perf/config.py +++ b/inference_perf/config.py @@ -38,6 +38,11 @@ class LoadType(Enum): POISSON = "poisson" +class MetricsClientType(Enum): + PROMETHEUS = "prometheus" + DEFAULT = "default" + + class LoadStage(BaseModel): rate: int = 1 duration: int = 1 @@ -66,8 +71,14 @@ class ReportConfig(BaseModel): pass -class MetricsConfig(BaseModel): - url: str +class PrometheusClientConfig(BaseModel): + scrape_interval: Optional[int] = 15 + url: str = "http://localhost:9090" + + +class MetricsClientConfig(BaseModel): + type: MetricsClientType + prometheus: Optional[PrometheusClientConfig] = None class VLLMConfig(BaseModel): @@ -86,7 +97,7 @@ class Config(BaseModel): data: Optional[DataConfig] = DataConfig() load: Optional[LoadConfig] = LoadConfig(stages=[LoadStage()]) report: Optional[ReportConfig] = ReportConfig() - metrics: Optional[MetricsConfig] = MetricsConfig(url="") + metrics_client: Optional[MetricsClientConfig] = None storage: Optional[StorageConfig] = StorageConfig() vllm: Optional[VLLMConfig] = None tokenizer: Optional[CustomTokenizerConfig] = None diff --git a/inference_perf/main.py b/inference_perf/main.py index eb64448..435ca50 100644 --- a/inference_perf/main.py +++ b/inference_perf/main.py @@ -11,14 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import List from inference_perf.loadgen import LoadGenerator -from inference_perf.config import DataGenType +from inference_perf.config import DataGenType, MetricsClientType from inference_perf.datagen import DataGenerator, MockDataGenerator, HFShareGPTDataGenerator from inference_perf.client import ModelServerClient, vLLMModelServerClient +from inference_perf.metrics.base import MetricsClient, PerfRuntimeParameters +from inference_perf.metrics.prometheus_client import PrometheusMetricsClient from inference_perf.client.storage import StorageClient, GoogleCloudStorageClient from inference_perf.reportgen import ReportGenerator, ReportFile -from inference_perf.metrics import MockMetricsClient from inference_perf.config import read_config import asyncio @@ -35,13 +37,12 @@ def __init__( self.loadgen = loadgen self.reportgen = reportgen self.storage_clients = storage_clients - self.client.set_report_generator(self.reportgen) def run(self) -> None: asyncio.run(self.loadgen.run(self.client)) - def generate_reports(self) -> List[ReportFile]: - return asyncio.run(self.reportgen.generate_reports()) + def generate_reports(self, runtime_parameters: PerfRuntimeParameters) -> List[ReportFile]: + return asyncio.run(self.reportgen.generate_reports(runtime_parameters=runtime_parameters)) def save_reports(self, reports: List[ReportFile]) -> None: for storage_client in self.storage_clients: @@ -53,7 +54,7 @@ def main_cli() -> None: # Define Model Server Client if config.vllm: - client = vLLMModelServerClient( + model_server_client = vLLMModelServerClient( uri=config.vllm.url, model_name=config.vllm.model_name, tokenizer=config.tokenizer, api_type=config.vllm.api ) else: @@ -76,13 +77,13 @@ def main_cli() -> None: raise Exception("load config missing") # Define Metrics Client - if config.metrics: - metricsclient = MockMetricsClient(uri=config.metrics.url) - else: - raise Exception("metrics config missing") + metrics_client: MetricsClient | None = None + if config.metrics_client: + if config.metrics_client.type == MetricsClientType.PROMETHEUS and config.metrics_client.prometheus: + metrics_client = PrometheusMetricsClient(config=config.metrics_client.prometheus) # Define Report Generator - reportgen = ReportGenerator(metricsclient) + reportgen = ReportGenerator(metrics_client) # Define Storage Clients storage_clients: List[StorageClient] = [] @@ -91,13 +92,18 @@ def main_cli() -> None: storage_clients.append(GoogleCloudStorageClient(config=config.storage.google_cloud_storage)) # Setup Perf Test Runner - perfrunner = InferencePerfRunner(client, loadgen, reportgen, storage_clients) + perfrunner = InferencePerfRunner(model_server_client, loadgen, reportgen, storage_clients) + + start_time = time.time() # Run Perf Test perfrunner.run() - # Generate Reports - reports = perfrunner.generate_reports() + end_time = time.time() + duration = end_time - start_time # Calculate the duration of the test + + # Generate Report after the tests + reports = perfrunner.generate_reports(PerfRuntimeParameters(start_time, duration, model_server_client)) # Save Reports perfrunner.save_reports(reports=reports) diff --git a/inference_perf/metrics/README.md b/inference_perf/metrics/README.md index 9b85246..d512af5 100644 --- a/inference_perf/metrics/README.md +++ b/inference_perf/metrics/README.md @@ -5,10 +5,9 @@ This repository provides clients to query performance metrics from various monit ## Supported Monitoring Platforms **Available now**: -- None +- Self Deployed Prometheus **Todo**: - Google Cloud Monitoring - AWS CloudWatch -- Azure Monitor -- Self Deployed Prometheus \ No newline at end of file +- Azure Monitor \ No newline at end of file diff --git a/inference_perf/metrics/__init__.py b/inference_perf/metrics/__init__.py index 95153c3..c390753 100644 --- a/inference_perf/metrics/__init__.py +++ b/inference_perf/metrics/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .base import MetricsClient, MetricsSummary +from .base import MetricsClient from .mock_client import MockMetricsClient -__all__ = ["MetricsClient", "MetricsSummary", "MockMetricsClient"] +__all__ = ["MetricsClient", "MockMetricsClient"] diff --git a/inference_perf/metrics/base.py b/inference_perf/metrics/base.py index d475236..6bccf76 100644 --- a/inference_perf/metrics/base.py +++ b/inference_perf/metrics/base.py @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod -from pydantic import BaseModel +from inference_perf.client.base import ModelServerClient, ModelServerMetrics -class MetricsSummary(BaseModel): - total_requests: int - avg_prompt_tokens: float - avg_output_tokens: float - avg_time_per_request: float +class PerfRuntimeParameters: + def __init__(self, start_time: float, duration: float, model_server_client: ModelServerClient) -> None: + self.start_time = start_time + self.duration = duration + self.model_server_client = model_server_client class MetricsClient(ABC): @@ -28,5 +28,9 @@ def __init__(self) -> None: pass @abstractmethod - def collect_metrics_summary(self) -> MetricsSummary | None: + def collect_model_server_metrics(self, runtime_parameters: PerfRuntimeParameters) -> ModelServerMetrics | None: + raise NotImplementedError + + @abstractmethod + def wait(self) -> None: raise NotImplementedError diff --git a/inference_perf/metrics/mock_client.py b/inference_perf/metrics/mock_client.py index 45ad184..a5c94f9 100644 --- a/inference_perf/metrics/mock_client.py +++ b/inference_perf/metrics/mock_client.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .base import MetricsSummary, MetricsClient +from inference_perf.client.base import ModelServerMetrics +from .base import MetricsClient, PerfRuntimeParameters class MockMetricsClient(MetricsClient): - def __init__(self, uri: str) -> None: - self.uri = uri + def __init__(self) -> None: + pass - def collect_metrics_summary(self) -> MetricsSummary | None: + def collect_model_server_metrics(self, runtime_parameters: PerfRuntimeParameters) -> ModelServerMetrics | None: return None + + def wait(self) -> None: + pass diff --git a/inference_perf/metrics/prometheus_client.py b/inference_perf/metrics/prometheus_client.py new file mode 100644 index 0000000..4a05ce1 --- /dev/null +++ b/inference_perf/metrics/prometheus_client.py @@ -0,0 +1,237 @@ +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +from typing import cast +import requests +from inference_perf.client.base import ModelServerMetrics, ModelServerPrometheusMetric +from inference_perf.config import PrometheusClientConfig +from .base import MetricsClient, PerfRuntimeParameters + +PROMETHEUS_SCRAPE_BUFFER_SEC = 5 + + +class PrometheusQueryBuilder: + def __init__(self, model_server_metric: ModelServerPrometheusMetric, duration: float): + self.model_server_metric = model_server_metric + self.duration = duration + + def get_queries(self) -> dict[str, dict[str, str]]: + """ + Returns a dictionary of queries for each metric type. + """ + metric_name = self.model_server_metric.name + filter = self.model_server_metric.filters + return { + "gauge": { + "mean": "avg_over_time(%s{%s}[%.0fs])" % (metric_name, filter, self.duration), + "median": "quantile_over_time(0.5, %s{%s}[%.0fs])" % (metric_name, filter, self.duration), + "sd": "stddev_over_time(%s{%s}[%.0fs])" % (metric_name, filter, self.duration), + "min": "min_over_time(%s{%s}[%.0fs])" % (metric_name, filter, self.duration), + "max": "max_over_time(%s{%s}[%.0fs])" % (metric_name, filter, self.duration), + "p90": "quantile_over_time(0.9, %s{%s}[%.0fs])" % (metric_name, filter, self.duration), + "p99": "quantile_over_time(0.99, %s{%s}[%.0fs])" % (metric_name, filter, self.duration), + }, + "histogram": { + "mean": "sum(rate(%s_sum{%s}[%.0fs])) / (sum(rate(%s_count{%s}[%.0fs])) > 0)" + % (metric_name, filter, self.duration, metric_name, filter, self.duration), + "median": "histogram_quantile(0.5, sum(rate(%s_bucket{%s}[%.0fs])) by (le))" + % (metric_name, filter, self.duration), + "min": "histogram_quantile(0, sum(rate(%s_bucket{%s}[%.0fs])) by (le))" % (metric_name, filter, self.duration), + "max": "histogram_quantile(1, sum(rate(%s_bucket{%s}[%.0fs])) by (le))" % (metric_name, filter, self.duration), + "p90": "histogram_quantile(0.9, sum(rate(%s_bucket{%s}[%.0fs])) by (le))" + % (metric_name, filter, self.duration), + "p99": "histogram_quantile(0.99, sum(rate(%s_bucket{%s}[%.0fs])) by (le))" + % (metric_name, filter, self.duration), + }, + "counter": { + "rate": "sum(rate(%s{%s}[%.0fs]))" % (metric_name, filter, self.duration), + "increase": "sum(increase(%s{%s}[%.0fs]))" % (metric_name, filter, self.duration), + "mean": "avg_over_time(rate(%s{%s}[%.0fs])[%.0fs:%.0fs])" + % (metric_name, filter, self.duration, self.duration, self.duration), + "max": "max_over_time(rate(%s{%s}[%.0fs])[%.0fs:%.0fs])" + % (metric_name, filter, self.duration, self.duration, self.duration), + "min": "min_over_time(rate(%s{%s}[%.0fs])[%.0fs:%.0fs])" + % (metric_name, filter, self.duration, self.duration, self.duration), + "p90": "quantile_over_time(0.9, rate(%s{%s}[%.0fs])[%.0fs:%.0fs])" + % (metric_name, filter, self.duration, self.duration, self.duration), + "p99": "quantile_over_time(0.99, rate(%s{%s}[%.0fs])[%.0fs:%.0fs])" + % (metric_name, filter, self.duration, self.duration, self.duration), + }, + } + + def build_query(self) -> str: + """ + Builds the PromQL query for the given metric type and query operation. + + Returns: + The PromQL query. + """ + metric_type = self.model_server_metric.type + query_op = self.model_server_metric.op + + queries = self.get_queries() + if metric_type not in queries: + print("Invalid metric type: %s" % (metric_type)) + return "" + if query_op not in queries[metric_type]: + print("Invalid query operation: %s" % (query_op)) + return "" + return queries[metric_type][query_op] + + +class PrometheusMetricsClient(MetricsClient): + def __init__(self, config: PrometheusClientConfig) -> None: + if config: + self.url = config.url + if not self.url: + raise Exception("prometheus url missing") + self.scrape_interval = config.scrape_interval or 30 + else: + raise Exception("prometheus config missing") + + def wait(self) -> None: + """ + Waits for the Prometheus server to scrape the metrics. + We have added a buffer of 5 seconds to the scrape interval to ensure that metrics for even the last request are collected. + """ + wait_time = self.scrape_interval + PROMETHEUS_SCRAPE_BUFFER_SEC + print(f"Waiting for {wait_time} seconds for Prometheus to collect metrics...") + time.sleep(wait_time) + + def collect_model_server_metrics(self, runtime_parameters: PerfRuntimeParameters) -> ModelServerMetrics | None: + """ + Collects the summary metrics for the given Perf Benchmark Runtime Parameters. + + Args: + runtime_parameters: The runtime parameters containing details about the Perf Benchmark like the duration and model server client + + Returns: + A ModelServerMetrics object containing the summary metrics. + """ + metrics_summary: ModelServerMetrics = ModelServerMetrics() + if runtime_parameters is None: + print("Perf Runtime parameters are not set, skipping metrics collection") + return None + + # Wait for the Prometheus server to scrape the metrics + # We have added a buffer of 5 seconds to the scrape interval to ensure that metrics for even the last request are collected. + # This is to ensure that the metrics are collected before we query them + self.wait() + + # Get the duration and model server client from the runtime parameters + query_eval_time = time.time() + query_duration = query_eval_time - runtime_parameters.start_time + model_server_client = runtime_parameters.model_server_client + + # Get the engine and model from the model server client + if not model_server_client: + print("Model server client is not set") + return None + + metrics_metadata = model_server_client.get_prometheus_metric_metadata() + if not metrics_metadata: + print("Metrics metadata is not present for the runtime") + return None + for summary_metric_name in metrics_metadata: + summary_metric_metadata = metrics_metadata.get(summary_metric_name) + if summary_metric_metadata is None: + print("Metric metadata is not present for metric: %s. Skipping this metric." % (summary_metric_name)) + continue + summary_metric_metadata = cast(ModelServerPrometheusMetric, summary_metric_metadata) + if summary_metric_metadata is None: + print( + "Metric metadata for %s is missing or has an incorrect format. Skipping this metric." + % (summary_metric_name) + ) + continue + + query_builder = PrometheusQueryBuilder(summary_metric_metadata, query_duration) + query = query_builder.build_query() + if not query: + print("No query found for metric: %s. Skipping metric." % (summary_metric_name)) + continue + + # Execute the query and get the result + result = self.execute_query(query, str(query_eval_time)) + if result is None: + print("Error executing query: %s" % (query)) + continue + # Set the result in metrics summary + attr = getattr(metrics_summary, summary_metric_name) + if attr is not None: + target_type = type(attr) + setattr(metrics_summary, summary_metric_name, target_type(result)) + + return metrics_summary + + def execute_query(self, query: str, eval_time: str) -> float: + """ + Executes the given query on the Prometheus server and returns the result. + + Args: + query: the PromQL query to execute + + Returns: + The result of the query. + """ + query_result = 0.0 + try: + response = requests.get(f"{self.url}/api/v1/query", params={"query": query, "time": eval_time}) + if response is None: + print("Error executing query: %s" % (query)) + return query_result + + response.raise_for_status() + except Exception as e: + print("Error executing query: %s" % (e)) + return query_result + + # Check if the response is valid + # Sample response: + # { + # "status": "success", + # "data": { + # "resultType": "vector", + # "result": [ + # { + # "metric": {}, + # "value": [ + # 1632741820.781, + # "0.0000000000000000" + # ] + # } + # ] + # } + # } + response_obj = response.json() + if response_obj.get("status") != "success": + print("Error executing query: %s" % (response_obj)) + return query_result + + data = response_obj.get("data", {}) + result = data.get("result", []) + if len(result) > 0 and "value" in result[0]: + if isinstance(result[0]["value"], list) and len(result[0]["value"]) > 1: + # Return the value of the first result + # The value is in the second element of the list + # e.g. [1632741820.781, "0.0000000000000000"] + # We need to convert it to float + # and return it + # Convert the value to float + try: + query_result = float(result[0]["value"][1]) + except ValueError: + print("Error converting value to float: %s" % (result[0]["value"][1])) + return query_result + return query_result diff --git a/inference_perf/reportgen/__init__.py b/inference_perf/reportgen/__init__.py index 03a3e26..34dad04 100644 --- a/inference_perf/reportgen/__init__.py +++ b/inference_perf/reportgen/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .base import ReportGenerator, RequestMetric, ReportFile +from .base import ReportGenerator, ReportFile -__all__ = ["ReportGenerator", "RequestMetric", "ReportGenerator", "ReportFile"] +__all__ = ["ReportGenerator", "ReportGenerator", "ReportFile"] diff --git a/inference_perf/reportgen/base.py b/inference_perf/reportgen/base.py index f936e8f..b2448a4 100644 --- a/inference_perf/reportgen/base.py +++ b/inference_perf/reportgen/base.py @@ -15,7 +15,9 @@ import statistics from pydantic import BaseModel from typing import Any, List -from inference_perf.metrics import MetricsClient, MetricsSummary +from inference_perf.metrics import MetricsClient +from inference_perf.client.base import ModelServerMetrics +from inference_perf.metrics.base import PerfRuntimeParameters class ReportFile: @@ -40,39 +42,86 @@ def get_contents(self) -> dict[str, Any]: return self.contents.model_dump() -class RequestMetric(BaseModel): - stage_id: int - prompt_tokens: int - output_tokens: int - time_per_request: float - - class ReportGenerator: - def __init__(self, metrics_client: MetricsClient) -> None: + def __init__(self, metrics_client: MetricsClient | None) -> None: self.metrics_client = metrics_client - self.metrics: List[RequestMetric] = [] - - def collect_request_metrics(self, metric: RequestMetric) -> None: - self.metrics.append(metric) - async def generate_reports(self) -> List[ReportFile]: + async def generate_reports(self, runtime_parameters: PerfRuntimeParameters) -> List[ReportFile]: print("\n\nGenerating Report ..") - summary = self.metrics_client.collect_metrics_summary() - if summary is not None: - for field_name, value in summary: - print(f"{field_name}: {value}") + request_summary = self.report_request_summary(runtime_parameters) + metrics_client_summary = self.report_metrics_summary(runtime_parameters) + + return [ + ReportFile(name="request_summary_report", contents=request_summary), + ReportFile(name="metrics_client_report", contents=metrics_client_summary), + ] + + def report_request_summary(self, runtime_parameters: PerfRuntimeParameters) -> ModelServerMetrics: + """ + report request metrics collected by the model server client during the run. + Args: + runtime_parameters (PerfRuntimeParameters): The runtime parameters containing the model server client, query eval time in the metrics db, duration. + """ + request_metrics = runtime_parameters.model_server_client.get_request_metrics() + request_summary = ModelServerMetrics() + if len(request_metrics) > 0: + total_prompt_tokens = sum([x.prompt_tokens for x in request_metrics]) + total_output_tokens = sum([x.output_tokens for x in request_metrics]) + if runtime_parameters.duration > 0: + prompt_tokens_per_second = total_prompt_tokens / runtime_parameters.duration + output_tokens_per_second = total_output_tokens / runtime_parameters.duration + requests_per_second = len(request_metrics) / runtime_parameters.duration + else: + prompt_tokens_per_second = 0.0 + output_tokens_per_second = 0.0 + requests_per_second = 0.0 - elif summary is None and len(self.metrics) > 0: - summary = MetricsSummary( - total_requests=len(self.metrics), - avg_prompt_tokens=statistics.mean([x.prompt_tokens for x in self.metrics]), - avg_output_tokens=statistics.mean([x.output_tokens for x in self.metrics]), - avg_time_per_request=statistics.mean([x.time_per_request for x in self.metrics]), + request_summary = ModelServerMetrics( + total_requests=len(request_metrics), + requests_per_second=requests_per_second, + prompt_tokens_per_second=prompt_tokens_per_second, + output_tokens_per_second=output_tokens_per_second, + avg_prompt_tokens=int(statistics.mean([x.prompt_tokens for x in request_metrics])), + avg_output_tokens=int(statistics.mean([x.output_tokens for x in request_metrics])), + avg_request_latency=statistics.mean([x.time_per_request for x in request_metrics]), + median_request_latency=statistics.median([x.time_per_request for x in request_metrics]), + p90_request_latency=statistics.quantiles([x.time_per_request for x in request_metrics], n=10)[8], + p99_request_latency=statistics.quantiles([x.time_per_request for x in request_metrics], n=100)[98], + avg_time_to_first_token=0.0, + median_time_to_first_token=0.0, + p90_time_to_first_token=0.0, + p99_time_to_first_token=0.0, + median_time_per_output_token=0.0, + p90_time_per_output_token=0.0, + p99_time_per_output_token=0.0, + avg_time_per_output_token=0.0, + avg_queue_length=0, ) - for field_name, value in summary: + print("-" * 50) + print("Request Summary") + print("-" * 50) + for field_name, value in request_summary: print(f"{field_name}: {value}") else: - print("Report generation failed - no metrics collected") - return [] + print("Report generation failed - no request metrics collected") + return request_summary - return [ReportFile(name="mock_report", contents=summary)] + def report_metrics_summary(self, runtime_parameters: PerfRuntimeParameters) -> ModelServerMetrics: + """ + Report summary of the metrics collected by the metrics client during the run. + Args: + runtime_parameters (PerfRuntimeParameters): The runtime parameters containing the model server client, query eval time in the metrics db, duration. + """ + metrics_client_summary = ModelServerMetrics() + if self.metrics_client is not None: + print("-" * 50) + print("Metrics Client Summary") + print("-" * 50) + collected_metrics = self.metrics_client.collect_model_server_metrics(runtime_parameters) + if collected_metrics is not None: + metrics_client_summary = collected_metrics + for field_name, value in metrics_client_summary: + print(f"{field_name}: {value}") + else: + print("Report generation failed - no metrics collected by metrics client") + return metrics_client_summary diff --git a/pyproject.toml b/pyproject.toml index dbf2711..39a26e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dev = [ "pytest>=8.3.4", "types-PyYAML>=6.0.12.20241230", "ipykernel>=6.29.5", + "types-requests>=2.32.0.20250328", ] [tool.ruff]