Skip to content

Added Prometheus client to get model server metrics #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ tokenizer:
pretrained_model_name_or_path: HuggingFaceTB/SmolLM2-135M-Instruct
data:
type: shareGPT
metrics_client:
type: prometheus
prometheus:
url: http://localhost:9090
scrape_interval: 15
88 changes: 83 additions & 5 deletions inference_perf/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,82 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from typing import Tuple
from typing import List, Tuple, TypedDict

from pydantic import BaseModel
from inference_perf.datagen import InferenceData
from inference_perf.reportgen import ReportGenerator


class RequestMetric(BaseModel):
stage_id: int
prompt_tokens: int
output_tokens: int
time_per_request: float


class ModelServerPrometheusMetric:
def __init__(self, name: str, op: str, type: str, filters: str) -> None:
self.name = name
self.op = op
self.type = type
self.filters = filters


class ModelServerMetrics(BaseModel):
# Throughput
prompt_tokens_per_second: float = 0.0
output_tokens_per_second: float = 0.0
requests_per_second: float = 0.0

# Latency
avg_request_latency: float = 0.0
median_request_latency: float = 0.0
p90_request_latency: float = 0.0
p99_request_latency: float = 0.0
avg_time_to_first_token: float = 0.0
median_time_to_first_token: float = 0.0
p90_time_to_first_token: float = 0.0
p99_time_to_first_token: float = 0.0
avg_time_per_output_token: float = 0.0
median_time_per_output_token: float = 0.0
p90_time_per_output_token: float = 0.0
p99_time_per_output_token: float = 0.0

# Request
total_requests: int = 0
avg_prompt_tokens: int = 0
avg_output_tokens: int = 0
avg_queue_length: int = 0


# PrometheusMetricMetadata stores the mapping of metrics to their model server names and types
# and the filters to be applied to them.
# This is used to generate Prometheus query for the metrics.
class PrometheusMetricMetadata(TypedDict):
# Throughput
prompt_tokens_per_second: ModelServerPrometheusMetric
output_tokens_per_second: ModelServerPrometheusMetric
requests_per_second: ModelServerPrometheusMetric

# Latency
avg_request_latency: ModelServerPrometheusMetric
median_request_latency: ModelServerPrometheusMetric
p90_request_latency: ModelServerPrometheusMetric
p99_request_latency: ModelServerPrometheusMetric
avg_time_to_first_token: ModelServerPrometheusMetric
median_time_to_first_token: ModelServerPrometheusMetric
p90_time_to_first_token: ModelServerPrometheusMetric
p99_time_to_first_token: ModelServerPrometheusMetric
avg_time_per_output_token: ModelServerPrometheusMetric
median_time_per_output_token: ModelServerPrometheusMetric
p90_time_per_output_token: ModelServerPrometheusMetric
p99_time_per_output_token: ModelServerPrometheusMetric

# Request
total_requests: ModelServerPrometheusMetric
avg_prompt_tokens: ModelServerPrometheusMetric
avg_output_tokens: ModelServerPrometheusMetric
avg_queue_length: ModelServerPrometheusMetric


class ModelServerClient(ABC):
Expand All @@ -23,9 +96,14 @@ def __init__(self, *args: Tuple[int, ...]) -> None:
pass

@abstractmethod
def set_report_generator(self, reportgen: ReportGenerator) -> None:
self.reportgen = reportgen
async def process_request(self, data: InferenceData, stage_id: int) -> None:
raise NotImplementedError

@abstractmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

methods repeated?

async def process_request(self, data: InferenceData, stage_id: int) -> None:
def get_request_metrics(self) -> List[RequestMetric]:
raise NotImplementedError

@abstractmethod
def get_prometheus_metric_metadata(self) -> PrometheusMetricMetadata:
# assumption: all metrics clients have metrics exported in Prometheus format
raise NotImplementedError
16 changes: 8 additions & 8 deletions inference_perf/client/mock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from inference_perf.datagen import InferenceData
from inference_perf.reportgen import ReportGenerator, RequestMetric
from .base import ModelServerClient
import asyncio
from typing import List
from inference_perf.datagen import InferenceData
from .base import ModelServerClient, RequestMetric


class MockModelServerClient(ModelServerClient):
def __init__(self) -> None:
pass

def set_report_generator(self, reportgen: ReportGenerator) -> None:
self.reportgen = reportgen
self.request_metrics: List[RequestMetric] = list()

async def process_request(self, payload: InferenceData, stage_id: int) -> None:
print("Processing request - " + str(payload.data) + " for stage - " + str(stage_id))
await asyncio.sleep(3)
self.reportgen.collect_request_metrics(
self.request_metrics.append(
RequestMetric(
stage_id=stage_id,
prompt_tokens=0,
output_tokens=0,
time_per_request=3,
)
)

def get_request_metrics(self) -> List[RequestMetric]:
return self.request_metrics
75 changes: 69 additions & 6 deletions inference_perf/client/vllm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from inference_perf.datagen import InferenceData
from inference_perf.reportgen import ReportGenerator, RequestMetric
from inference_perf.config import APIType, CustomTokenizerConfig
from inference_perf.utils import CustomTokenizer
from .base import ModelServerClient
from typing import Any, Optional
from .base import ModelServerClient, ModelServerPrometheusMetric, PrometheusMetricMetadata, RequestMetric
from typing import Any, Optional, List
import aiohttp
import json
import time
Expand All @@ -42,9 +41,67 @@ def __init__(self, uri: str, model_name: str, tokenizer: Optional[CustomTokenize
print("Falling back to usage metrics.")
else:
print("Tokenizer path is empty. Falling back to usage metrics.")
self.request_metrics: List[RequestMetric] = list()

def set_report_generator(self, reportgen: ReportGenerator) -> None:
self.reportgen = reportgen
self.prometheus_metric_metadata: PrometheusMetricMetadata = {
"avg_queue_length": ModelServerPrometheusMetric(
"vllm:num_requests_waiting", "mean", "gauge", "model_name='%s'" % self.model_name
),
"avg_time_to_first_token": ModelServerPrometheusMetric(
"vllm:time_to_first_token_seconds", "mean", "histogram", "model_name='%s'" % self.model_name
),
"median_time_to_first_token": ModelServerPrometheusMetric(
"vllm:time_to_first_token_seconds", "median", "histogram", "model_name='%s'" % self.model_name
),
"p90_time_to_first_token": ModelServerPrometheusMetric(
"vllm:time_to_first_token_seconds", "p90", "histogram", "model_name='%s'" % self.model_name
),
"p99_time_to_first_token": ModelServerPrometheusMetric(
"vllm:time_to_first_token_seconds", "p99", "histogram", "model_name='%s'" % self.model_name
),
"avg_time_per_output_token": ModelServerPrometheusMetric(
"vllm:time_per_output_token_seconds", "mean", "histogram", "model_name='%s'" % self.model_name
),
"median_time_per_output_token": ModelServerPrometheusMetric(
"vllm:time_per_output_token_seconds", "median", "histogram", "model_name='%s'" % self.model_name
),
"p90_time_per_output_token": ModelServerPrometheusMetric(
"vllm:time_per_output_token_seconds", "p90", "histogram", "model_name='%s'" % self.model_name
),
"p99_time_per_output_token": ModelServerPrometheusMetric(
"vllm:time_per_output_token_seconds", "p99", "histogram", "model_name='%s'" % self.model_name
),
"avg_prompt_tokens": ModelServerPrometheusMetric(
"vllm:prompt_tokens_total", "mean", "counter", "model_name='%s'" % self.model_name
),
"prompt_tokens_per_second": ModelServerPrometheusMetric(
"vllm:prompt_tokens_total", "rate", "counter", "model_name='%s'" % self.model_name
),
"avg_output_tokens": ModelServerPrometheusMetric(
"vllm:generation_tokens_total", "mean", "counter", "model_name='%s'" % self.model_name
),
"output_tokens_per_second": ModelServerPrometheusMetric(
"vllm:generation_tokens_total", "rate", "counter", "model_name='%s'" % self.model_name
),
"total_requests": ModelServerPrometheusMetric(
"vllm:e2e_request_latency_seconds_count", "increase", "counter", "model_name='%s'" % self.model_name
),
"requests_per_second": ModelServerPrometheusMetric(
"vllm:e2e_request_latency_seconds_count", "rate", "counter", "model_name='%s'" % self.model_name
),
"avg_request_latency": ModelServerPrometheusMetric(
"vllm:e2e_request_latency_seconds", "mean", "histogram", "model_name='%s'" % self.model_name
),
"median_request_latency": ModelServerPrometheusMetric(
"vllm:e2e_request_latency_seconds", "median", "histogram", "model_name='%s'" % self.model_name
),
"p90_request_latency": ModelServerPrometheusMetric(
"vllm:e2e_request_latency_seconds", "p90", "histogram", "model_name='%s'" % self.model_name
),
"p99_request_latency": ModelServerPrometheusMetric(
"vllm:e2e_request_latency_seconds", "p99", "histogram", "model_name='%s'" % self.model_name
),
}

def _create_payload(self, payload: InferenceData) -> dict[str, Any]:
if payload.type == APIType.Completion:
Expand Down Expand Up @@ -93,7 +150,7 @@ async def process_request(self, data: InferenceData, stage_id: int) -> None:
prompt_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)

self.reportgen.collect_request_metrics(
self.request_metrics.append(
RequestMetric(
stage_id=stage_id,
prompt_tokens=prompt_tokens,
Expand All @@ -105,3 +162,9 @@ async def process_request(self, data: InferenceData, stage_id: int) -> None:
print(await response.text())
except aiohttp.ClientConnectorError as e:
print("vLLM Server connection error:\n", str(e))

def get_request_metrics(self) -> List[RequestMetric]:
return self.request_metrics

def get_prometheus_metric_metadata(self) -> PrometheusMetricMetadata:
return self.prometheus_metric_metadata
17 changes: 14 additions & 3 deletions inference_perf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class LoadType(Enum):
POISSON = "poisson"


class MetricsClientType(Enum):
PROMETHEUS = "prometheus"
DEFAULT = "default"


class LoadStage(BaseModel):
rate: int = 1
duration: int = 1
Expand Down Expand Up @@ -66,8 +71,14 @@ class ReportConfig(BaseModel):
pass


class MetricsConfig(BaseModel):
url: str
class PrometheusClientConfig(BaseModel):
scrape_interval: Optional[int] = 15
url: str = "http://localhost:9090"


class MetricsClientConfig(BaseModel):
type: MetricsClientType
prometheus: Optional[PrometheusClientConfig] = None


class VLLMConfig(BaseModel):
Expand All @@ -86,7 +97,7 @@ class Config(BaseModel):
data: Optional[DataConfig] = DataConfig()
load: Optional[LoadConfig] = LoadConfig(stages=[LoadStage()])
report: Optional[ReportConfig] = ReportConfig()
metrics: Optional[MetricsConfig] = MetricsConfig(url="")
metrics_client: Optional[MetricsClientConfig] = None
storage: Optional[StorageConfig] = StorageConfig()
vllm: Optional[VLLMConfig] = None
tokenizer: Optional[CustomTokenizerConfig] = None
Expand Down
34 changes: 20 additions & 14 deletions inference_perf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from typing import List
from inference_perf.loadgen import LoadGenerator
from inference_perf.config import DataGenType
from inference_perf.config import DataGenType, MetricsClientType
from inference_perf.datagen import DataGenerator, MockDataGenerator, HFShareGPTDataGenerator
from inference_perf.client import ModelServerClient, vLLMModelServerClient
from inference_perf.metrics.base import MetricsClient, PerfRuntimeParameters
from inference_perf.metrics.prometheus_client import PrometheusMetricsClient
from inference_perf.client.storage import StorageClient, GoogleCloudStorageClient
from inference_perf.reportgen import ReportGenerator, ReportFile
from inference_perf.metrics import MockMetricsClient
from inference_perf.config import read_config
import asyncio

Expand All @@ -35,13 +37,12 @@ def __init__(
self.loadgen = loadgen
self.reportgen = reportgen
self.storage_clients = storage_clients
self.client.set_report_generator(self.reportgen)

def run(self) -> None:
asyncio.run(self.loadgen.run(self.client))

def generate_reports(self) -> List[ReportFile]:
return asyncio.run(self.reportgen.generate_reports())
def generate_reports(self, runtime_parameters: PerfRuntimeParameters) -> List[ReportFile]:
return asyncio.run(self.reportgen.generate_reports(runtime_parameters=runtime_parameters))

def save_reports(self, reports: List[ReportFile]) -> None:
for storage_client in self.storage_clients:
Expand All @@ -53,7 +54,7 @@ def main_cli() -> None:

# Define Model Server Client
if config.vllm:
client = vLLMModelServerClient(
model_server_client = vLLMModelServerClient(
uri=config.vllm.url, model_name=config.vllm.model_name, tokenizer=config.tokenizer, api_type=config.vllm.api
)
else:
Expand All @@ -76,13 +77,13 @@ def main_cli() -> None:
raise Exception("load config missing")

# Define Metrics Client
if config.metrics:
metricsclient = MockMetricsClient(uri=config.metrics.url)
else:
raise Exception("metrics config missing")
metrics_client: MetricsClient | None = None
if config.metrics_client:
if config.metrics_client.type == MetricsClientType.PROMETHEUS and config.metrics_client.prometheus:
metrics_client = PrometheusMetricsClient(config=config.metrics_client.prometheus)

# Define Report Generator
reportgen = ReportGenerator(metricsclient)
reportgen = ReportGenerator(metrics_client)

# Define Storage Clients
storage_clients: List[StorageClient] = []
Expand All @@ -91,13 +92,18 @@ def main_cli() -> None:
storage_clients.append(GoogleCloudStorageClient(config=config.storage.google_cloud_storage))

# Setup Perf Test Runner
perfrunner = InferencePerfRunner(client, loadgen, reportgen, storage_clients)
perfrunner = InferencePerfRunner(model_server_client, loadgen, reportgen, storage_clients)

start_time = time.time()

# Run Perf Test
perfrunner.run()

# Generate Reports
reports = perfrunner.generate_reports()
end_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't include the metrics wait time above in the total test run time if all the requests have finished already since that can skew the test results if we are waiting for the metrics scrape interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the duration of the test should be calculated correctly and also consider that this could be a multi-stage run so durations needs to be claculated for each stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
To tackle that I am now separately calculating perf run duration here and have moved the wait logic to Prometheus Client.
This duration will be used for computing the default request summary.

I now pass benchmark start time to the metrics client in perf runtime params. We call the wait function and then calculate the end time.
In this way, prometheus query runs for:
start timestamp = benchmark start time and
end timestamp = benchmark end + wait() ensuring we get all the metrics.

duration = end_time - start_time # Calculate the duration of the test

# Generate Report after the tests
reports = perfrunner.generate_reports(PerfRuntimeParameters(start_time, duration, model_server_client))

# Save Reports
perfrunner.save_reports(reports=reports)
Expand Down
5 changes: 2 additions & 3 deletions inference_perf/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ This repository provides clients to query performance metrics from various monit
## Supported Monitoring Platforms

**Available now**:
- None
- Self Deployed Prometheus

**Todo**:
- Google Cloud Monitoring
- AWS CloudWatch
- Azure Monitor
- Self Deployed Prometheus
- Azure Monitor
4 changes: 2 additions & 2 deletions inference_perf/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import MetricsClient, MetricsSummary
from .base import MetricsClient
from .mock_client import MockMetricsClient

__all__ = ["MetricsClient", "MetricsSummary", "MockMetricsClient"]
__all__ = ["MetricsClient", "MockMetricsClient"]
Loading