Skip to content

Commit 2b779b1

Browse files
authored
feat: OTLP Metrics Support (#239)
* feat: add support for receving metrics * rm * fix docs * refactor common fixtures * consolidate fixtures * Update README.md * clean up * add rn
1 parent c82c0e0 commit 2b779b1

File tree

9 files changed

+888
-148
lines changed

9 files changed

+888
-148
lines changed

README.md

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,15 @@ Return OpenTelemetry logs that have been received by the agent for the given ses
391391

392392
Logs are returned as a JSON list of the OTLP logs payloads received. The logs are in the standard OpenTelemetry Protocol (OTLP) v1.7.0 format, decoded from protobuf into JSON.
393393

394+
### /test/session/metrics
395+
396+
Return OpenTelemetry metrics that have been received by the agent for the given session token.
397+
398+
#### [optional] `?test_session_token=`
399+
#### [optional] `X-Datadog-Test-Session-Token`
400+
401+
Metrics are returned as a JSON list of the OTLP metrics payloads received. The metrics are in the standard OpenTelemetry Protocol (OTLP) v1.7.0 format, decoded from protobuf into JSON.
402+
394403
### /test/session/responses/config (POST)
395404
Create a Remote Config payload to retrieve in endpoint `/v0.7/config`
396405

@@ -495,24 +504,30 @@ Mimics the pipeline_stats endpoint of the agent, but always returns OK, and logs
495504

496505
Accepts OpenTelemetry Protocol (OTLP) v1.7.0 logs in protobuf format via HTTP. This endpoint validates and decodes OTLP logs payloads for testing OpenTelemetry logs exporters and libraries.
497506

498-
The HTTP endpoint accepts `POST` requests with `Content-Type: application/x-protobuf` and stores the decoded logs for retrieval via the `/test/session/logs` endpoint.
507+
The HTTP endpoint accepts `POST` requests with `Content-Type: application/x-protobuf` and `Content-Type: application/json` and stores the decoded logs for retrieval via the `/test/session/logs` endpoint.
508+
509+
### /v1/metrics (HTTP)
510+
511+
Accepts OpenTelemetry Protocol (OTLP) v1.7.0 metrics in protobuf format via HTTP. This endpoint validates and decodes OTLP metrics payloads for testing OpenTelemetry metrics exporters and libraries.
512+
513+
The HTTP endpoint accepts `POST` requests with `Content-Type: application/x-protobuf` and `Content-Type: application/json` and stores the decoded metrics for retrieval via the `/test/session/metrics` endpoint.
499514

500-
### OTLP Logs via GRPC
515+
### OTLP Logs and Metrics via GRPC
501516

502-
OTLP logs can also be sent via GRPC using the OpenTelemetry `LogsService.Export` method. The GRPC server implements the standard OTLP logs service interface and forwards all requests to the HTTP server, ensuring consistent processing and session management.
517+
OTLP logs and metrics can also be sent via GRPC using the OpenTelemetry `LogsService.Export` and `MetricsService.Export` methods respectively. The GRPC server implements the standard OTLP service interfaces and forwards all requests to the HTTP server, ensuring consistent processing and session management.
503518

504-
**Note:** OTLP logs are served on separate ports from the main APM endpoints (default: 8126):
519+
**Note:** OTLP endpoints are served on separate ports from the main APM endpoints (default: 8126):
505520
- **HTTP**: Port 4318 (default) - Use `--otlp-http-port` to configure
506521
- **GRPC**: Port 4317 (default) - Use `--otlp-grpc-port` to configure
507522

508-
Both protocols store decoded logs for retrieval via the `/test/session/logs` HTTP endpoint.
523+
Both protocols store decoded data for retrieval via the `/test/session/logs` and `/test/session/metrics` HTTP endpoints respectively.
509524

510525
GRPC Client → GRPC Server → HTTP POST → HTTP Server → Agent Storage
511526
↓ ↓
512527
(forwards protobuf) (session management)
513528
↓ ↓
514529
HTTP Retrievable via
515-
Response /test/session/logs
530+
Response /test/session/{logs,metrics}
516531

517532
### /tracer_flare/v1
518533

ddapm_test_agent/agent.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from msgpack.exceptions import ExtraData as MsgPackExtraDataException
3939
from multidict import CIMultiDict
4040
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import add_LogsServiceServicer_to_server
41+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import add_MetricsServiceServicer_to_server
4142

4243
from . import _get_version
4344
from . import trace_snapshot
@@ -51,6 +52,9 @@
5152
from .logs import LOGS_ENDPOINT
5253
from .logs import OTLPLogsGRPCServicer
5354
from .logs import decode_logs_request
55+
from .metrics import METRICS_ENDPOINT
56+
from .metrics import OTLPMetricsGRPCServicer
57+
from .metrics import decode_metrics_request
5458
from .remoteconfig import RemoteConfigServer
5559
from .trace import Span
5660
from .trace import Trace
@@ -513,6 +517,20 @@ async def _logs_by_session(self, token: Optional[str]) -> List[Dict[str, Any]]:
513517
logs.append(logs_data)
514518
return logs
515519

520+
async def _metrics_by_session(self, token: Optional[str]) -> List[Dict[str, Any]]:
521+
"""Return the metrics that belong to the given session token.
522+
523+
If token is None or if the token was used to manually start a session
524+
with /session-start then return all metrics that were sent since the last
525+
/session-start request was made.
526+
"""
527+
metrics: List[Dict[str, Any]] = []
528+
for req in self._requests_by_session(token):
529+
if req.match_info.handler == self.handle_v1_metrics:
530+
metrics_data = self._decode_v1_metrics(req)
531+
metrics.append(metrics_data)
532+
return metrics
533+
516534
async def _integration_requests_by_session(
517535
self,
518536
token: Optional[str],
@@ -593,6 +611,14 @@ def _decode_v1_logs(self, request: Request) -> Dict[str, Any]:
593611
except Exception as e:
594612
raise web.HTTPBadRequest(text=str(e))
595613

614+
def _decode_v1_metrics(self, request: Request) -> Dict[str, Any]:
615+
raw_data = self._request_data(request)
616+
content_type = request.headers.get("Content-Type", "").lower().strip()
617+
try:
618+
return decode_metrics_request(raw_data, content_type)
619+
except Exception as e:
620+
raise web.HTTPBadRequest(text=str(e))
621+
596622
async def handle_v04_traces(self, request: Request) -> web.Response:
597623
return await self._handle_traces(request, version="v0.4")
598624

@@ -631,6 +657,21 @@ async def handle_v1_logs(self, request: Request) -> web.Response:
631657
)
632658
return web.HTTPOk()
633659

660+
async def handle_v1_metrics(self, request: Request) -> web.Response:
661+
metrics_data = self._decode_v1_metrics(request)
662+
num_resource_metrics = len(metrics_data.get("resource_metrics", []))
663+
total_metrics = sum(
664+
len(scope_metric.get("metrics", []))
665+
for resource_metric in metrics_data.get("resource_metrics", [])
666+
for scope_metric in resource_metric.get("scope_metrics", [])
667+
)
668+
log.info(
669+
"received /v1/metrics payload with %r resource metric(s) containing %r metric(s)",
670+
num_resource_metrics,
671+
total_metrics,
672+
)
673+
return web.HTTPOk()
674+
634675
async def handle_v07_remoteconfig(self, request: Request) -> web.Response:
635676
"""Emulates Remote Config endpoint: /v0.7/config"""
636677
token = _session_token(request)
@@ -996,6 +1037,11 @@ async def handle_session_logs(self, request: Request) -> web.Response:
9961037
logs = await self._logs_by_session(token)
9971038
return web.json_response(logs)
9981039

1040+
async def handle_session_metrics(self, request: Request) -> web.Response:
1041+
token = request["session_token"]
1042+
metrics = await self._metrics_by_session(token)
1043+
return web.json_response(metrics)
1044+
9991045
async def handle_session_requests(self, request: Request) -> web.Response:
10001046
token = request["session_token"]
10011047
resp = []
@@ -1013,6 +1059,7 @@ async def handle_session_requests(self, request: Request) -> web.Response:
10131059
self.handle_evp_proxy_v2_api_v2_llmobs,
10141060
self.handle_evp_proxy_v2_llmobs_eval_metric,
10151061
self.handle_v1_logs,
1062+
self.handle_v1_metrics,
10161063
):
10171064
continue
10181065
resp.append(
@@ -1250,8 +1297,10 @@ async def otlp_store_request_middleware(request: Request, handler: _Handler) ->
12501297
app.add_routes(
12511298
[
12521299
web.post(LOGS_ENDPOINT, agent.handle_v1_logs),
1300+
web.post(METRICS_ENDPOINT, agent.handle_v1_metrics),
12531301
web.get("/test/session/requests", agent.handle_session_requests),
12541302
web.get("/test/session/logs", agent.handle_session_logs),
1303+
web.get("/test/session/metrics", agent.handle_session_metrics),
12551304
web.get("/test/session/clear", agent.handle_session_clear),
12561305
web.get("/test/session/start", agent.handle_session_start),
12571306
]
@@ -1269,6 +1318,10 @@ async def make_otlp_grpc_server_async(agent: Agent, http_port: int, grpc_port: i
12691318
logs_servicer = OTLPLogsGRPCServicer(http_port)
12701319
add_LogsServiceServicer_to_server(logs_servicer, server)
12711320

1321+
# Add the OTLP metrics servicer
1322+
metrics_servicer = OTLPMetricsGRPCServicer(http_port)
1323+
add_MetricsServiceServicer_to_server(metrics_servicer, server)
1324+
12721325
# Setup and start the server
12731326
listen_addr = f"[::]:{grpc_port}"
12741327
server.add_insecure_port(listen_addr)

ddapm_test_agent/client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,20 @@ def wait_for_num_logs(self, num: int, clear: bool = False, wait_loops: int = 30)
163163
return logs
164164
time.sleep(0.1)
165165
raise ValueError("Number (%r) of logs not available from test agent, got %r" % (num, len(logs)))
166+
167+
def metrics(self, clear: bool = False, **kwargs: Any) -> List[Any]:
168+
resp = self._session.get(self._url("/test/session/metrics"), **kwargs)
169+
if clear:
170+
self.clear()
171+
return cast(List[Any], resp.json())
172+
173+
def wait_for_num_metrics(self, num: int, clear: bool = False, wait_loops: int = 30) -> List[Any]:
174+
"""Wait for `num` metrics to be received from the test agent."""
175+
for _ in range(wait_loops):
176+
metrics = self.metrics(clear=False)
177+
if len(metrics) == num:
178+
if clear:
179+
self.clear()
180+
return metrics
181+
time.sleep(0.1)
182+
raise ValueError("Number (%r) of metrics not available from test agent, got %r" % (num, len(metrics)))

ddapm_test_agent/logs.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121

2222
def decode_logs_request(request_body: bytes, content_type: str) -> Dict[str, Any]:
23-
"""Decode the protobuf request body into an ExportLogsServiceRequest object."""
2423
if content_type == "application/json":
2524
parsed_json = json.loads(request_body)
2625
if not isinstance(parsed_json, dict):
@@ -35,27 +34,23 @@ def decode_logs_request(request_body: bytes, content_type: str) -> Dict[str, Any
3534

3635

3736
def protobuf_to_dict(pb_obj: Any) -> Dict[str, Any]:
38-
"""Convert a protobuf object to a dictionary."""
3937
return MessageToDict(pb_obj, preserving_proto_field_name=True)
4038

4139

4240
class OTLPLogsGRPCServicer(LogsServiceServicer):
43-
"""GRPC servicer that forwards OTLP logs to HTTP server."""
4441

4542
def __init__(self, http_port: int):
4643
self.http_url = f"http://127.0.0.1:{http_port}"
4744

4845
async def Export(
4946
self, request: ExportLogsServiceRequest, context: grpc_aio.ServicerContext
5047
) -> ExportLogsServiceResponse:
51-
"""Export logs by forwarding to HTTP server."""
5248
try:
5349
protobuf_data = request.SerializeToString()
5450
headers = {"Content-Type": "application/x-protobuf"}
5551
metadata = dict(context.invocation_metadata())
5652
if "session-token" in metadata:
5753
headers["Session-Token"] = metadata["session-token"]
58-
# Forward to OTLP HTTP server
5954
async with ClientSession(self.http_url) as session:
6055
async with session.post(LOGS_ENDPOINT, headers=headers, data=protobuf_data) as resp:
6156
context.set_trailing_metadata([("http-status", str(resp.status))])

ddapm_test_agent/metrics.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""OTLP Metrics handling for the test agent."""
2+
3+
import json
4+
import logging
5+
from typing import Any
6+
from typing import Dict
7+
8+
from aiohttp import ClientSession
9+
from google.protobuf.json_format import MessageToDict
10+
from grpc import aio as grpc_aio
11+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest
12+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceResponse
13+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import MetricsServiceServicer
14+
15+
16+
METRICS_ENDPOINT = "/v1/metrics"
17+
18+
19+
log = logging.getLogger(__name__)
20+
21+
22+
def decode_metrics_request(request_body: bytes, content_type: str) -> Dict[str, Any]:
23+
if content_type == "application/json":
24+
parsed_json = json.loads(request_body)
25+
if not isinstance(parsed_json, dict):
26+
raise Exception("JSON payload must be an object")
27+
return parsed_json
28+
elif content_type == "application/x-protobuf":
29+
export_request = ExportMetricsServiceRequest()
30+
export_request.ParseFromString(request_body)
31+
return protobuf_to_dict(export_request)
32+
else:
33+
raise ValueError(f"Content-Type must be application/x-protobuf or application/json, got {content_type}")
34+
35+
36+
def protobuf_to_dict(pb_obj: Any) -> Dict[str, Any]:
37+
return MessageToDict(pb_obj, preserving_proto_field_name=True)
38+
39+
40+
class OTLPMetricsGRPCServicer(MetricsServiceServicer):
41+
42+
def __init__(self, http_port: int):
43+
self.http_url = f"http://127.0.0.1:{http_port}"
44+
45+
def _count_data_points(self, request: ExportMetricsServiceRequest) -> int:
46+
return len(
47+
[
48+
dp
49+
for rm in request.resource_metrics
50+
for sm in rm.scope_metrics
51+
for m in sm.metrics
52+
for dp in (
53+
m.gauge.data_points
54+
if m.HasField("gauge")
55+
else (
56+
m.sum.data_points
57+
if m.HasField("sum")
58+
else (
59+
m.histogram.data_points
60+
if m.HasField("histogram")
61+
else (
62+
m.exponential_histogram.data_points
63+
if m.HasField("exponential_histogram")
64+
else m.summary.data_points if m.HasField("summary") else []
65+
)
66+
)
67+
)
68+
)
69+
]
70+
)
71+
72+
async def Export(
73+
self, request: ExportMetricsServiceRequest, context: grpc_aio.ServicerContext
74+
) -> ExportMetricsServiceResponse:
75+
try:
76+
protobuf_data = request.SerializeToString()
77+
headers = {"Content-Type": "application/x-protobuf"}
78+
metadata = dict(context.invocation_metadata())
79+
if "session-token" in metadata:
80+
headers["Session-Token"] = metadata["session-token"]
81+
async with ClientSession(self.http_url) as session:
82+
async with session.post(METRICS_ENDPOINT, headers=headers, data=protobuf_data) as resp:
83+
context.set_trailing_metadata([("http-status", str(resp.status))])
84+
response = ExportMetricsServiceResponse()
85+
if resp.status >= 400:
86+
response.partial_success.rejected_data_points = self._count_data_points(request)
87+
response.partial_success.error_message = f"HTTP {resp.status}: {await resp.text()}"
88+
return response
89+
except Exception as e:
90+
context.set_trailing_metadata([("http-status", "500"), ("error", str(e))])
91+
response = ExportMetricsServiceResponse()
92+
response.partial_success.rejected_data_points = self._count_data_points(request)
93+
response.partial_success.error_message = f"Forward failed: {str(e)}"
94+
return response
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
features:
3+
- |
4+
OTLP: Added OpenTelemetry Protocol (OTLP) metrics support via HTTP endpoint ``/v1/metrics`` on port 4318 and GRPC server on port 4317.
5+
Supports all OTLP metric types including Gauge, Sum, Histogram, ExponentialHistogram, and Summary.
6+
The GRPC server forwards requests to the OTLP HTTP server for validation and storage, enabling testing of applications that send metrics via either protocol.
7+
Session endpoint ``/test/session/metrics`` allows retrieval of captured metrics for test validation.

0 commit comments

Comments
 (0)