Skip to content

Commit a776505

Browse files
feat(ci_visibility): xdist distributed tracing support (#13509)
Co-authored-by: Vítor De Araújo <vitor.dearaujo@datadoghq.com>
1 parent 4d20f16 commit a776505

13 files changed

+999
-322
lines changed

ddtrace/contrib/internal/pytest/_plugin_v2.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@
102102
DISABLED_BY_TEST_MANAGEMENT_REASON = "Flaky test is disabled by Datadog"
103103

104104

105+
class XdistHooks:
106+
@pytest.hookimpl
107+
def pytest_configure_node(self, node):
108+
main_session_span = InternalTestSession.get_span()
109+
if main_session_span:
110+
root_span = main_session_span.span_id
111+
else:
112+
root_span = 0
113+
114+
node.workerinput["root_span"] = root_span
115+
116+
105117
def _handle_itr_should_skip(item, test_id) -> bool:
106118
"""Checks whether a test should be skipped
107119
@@ -252,6 +264,9 @@ def pytest_configure(config: pytest_Config) -> None:
252264
from ddtrace.contrib.internal.pytest._pytest_bdd_subplugin import _PytestBddSubPlugin
253265

254266
config.pluginmanager.register(_PytestBddSubPlugin(), "_datadog-pytest-bdd")
267+
268+
if config.pluginmanager.hasplugin("xdist"):
269+
config.pluginmanager.register(XdistHooks())
255270
else:
256271
# If the pytest ddtrace plugin is not enabled, we should disable CI Visibility, as it was enabled during
257272
# pytest_load_initial_conftests
@@ -299,7 +314,27 @@ def pytest_sessionstart(session: pytest.Session) -> None:
299314

300315
InternalTestSession.set_library_capabilities(library_capabilities)
301316

302-
InternalTestSession.start()
317+
extracted_context = None
318+
if hasattr(session.config, "workerinput"):
319+
from ddtrace._trace.context import Context
320+
from ddtrace.constants import USER_KEEP
321+
322+
received_root_span = session.config.workerinput.get("root_span", "MISSING_SPAN")
323+
try:
324+
root_span = int(received_root_span)
325+
extracted_context = Context(
326+
trace_id=1,
327+
span_id=root_span, # This span_id here becomes context.span_id for the parent context
328+
sampling_priority=USER_KEEP,
329+
)
330+
except ValueError:
331+
log.debug(
332+
"pytest_sessionstart: Could not convert root_span %s to int",
333+
received_root_span,
334+
)
335+
336+
InternalTestSession.start(extracted_context)
337+
303338
if InternalTestSession.efd_enabled() and not _pytest_version_supports_efd():
304339
log.warning("Early Flake Detection disabled: pytest version is not supported")
305340

@@ -732,7 +767,10 @@ def _pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
732767
if ModuleCodeCollector.is_installed():
733768
ModuleCodeCollector.uninstall()
734769

735-
InternalTestSession.finish(force_finish_children=True)
770+
InternalTestSession.finish(
771+
force_finish_children=True,
772+
override_status=TestStatus.FAIL if session.exitstatus == pytest.ExitCode.TESTS_FAILED else None,
773+
)
736774

737775

738776
@pytest.hookimpl(hookwrapper=True, tryfirst=True)

ddtrace/ext/test_visibility/api.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from typing import Optional
2424
from typing import Type
2525

26+
from ddtrace._trace.context import Context
2627
from ddtrace.ext.test import Status as _TestStatus
2728
from ddtrace.ext.test_visibility._item_ids import TestId
2829
from ddtrace.ext.test_visibility._item_ids import TestModuleId
@@ -179,9 +180,9 @@ def discover(
179180

180181
@staticmethod
181182
@_catch_and_log_exceptions
182-
def start():
183+
def start(context: Optional[Context] = None):
183184
log.debug("Starting session")
184-
core.dispatch("test_visibility.session.start")
185+
core.dispatch("test_visibility.session.start", (context,))
185186

186187
class FinishArgs(NamedTuple):
187188
force_finish_children: bool

ddtrace/internal/ci_visibility/api/_base.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from typing import TypeVar
1414
from typing import Union
1515

16+
from ddtrace._trace.context import Context
1617
from ddtrace.constants import SPAN_KIND
1718
from ddtrace.ext import SpanTypes
1819
from ddtrace.ext import test
@@ -177,9 +178,14 @@ def _add_all_tags_to_span(self) -> None:
177178
except Exception as e:
178179
log.debug("Error setting tag %s: %s", tag, e)
179180

180-
def _start_span(self) -> None:
181+
def _start_span(self, context: Optional[Context] = None) -> None:
181182
# Test items do not use a parent, and are instead their own trace's root span
182-
parent_span = self.get_parent_span() if isinstance(self, TestVisibilityParentItem) else None
183+
# except if context is passed (for xdist support)
184+
parent_span: Optional[Union[Span, Context]] = None
185+
if context is not None:
186+
parent_span = context
187+
elif isinstance(self, TestVisibilityParentItem):
188+
parent_span = self.get_parent_span()
183189

184190
self._span = self._tracer._start_span(
185191
self._operation_name,
@@ -367,7 +373,7 @@ def _telemetry_record_event_finished(self):
367373
# Telemetry for events created has specific tags for item types
368374
raise NotImplementedError("This method must be implemented by the subclass")
369375

370-
def start(self) -> None:
376+
def start(self, context: Optional[Context] = None) -> None:
371377
log.debug("Test Visibility: starting %s", self)
372378

373379
if self.is_started():
@@ -376,8 +382,9 @@ def start(self) -> None:
376382
log.warning(error_msg)
377383
raise CIVisibilityDataError(error_msg)
378384
return
385+
379386
self._telemetry_record_event_created()
380-
self._start_span()
387+
self._start_span(context)
381388

382389
def is_started(self) -> bool:
383390
return self._span is not None
@@ -578,7 +585,8 @@ def get_status(self) -> Union[TestStatus, SPECIAL_STATUS]:
578585

579586
if children_status_counts[TestStatus.FAIL.value] > 0:
580587
return TestStatus.FAIL
581-
if children_status_counts[TestStatus.SKIP.value] == len(self._children):
588+
len_children = len(self._children)
589+
if len_children > 0 and children_status_counts[TestStatus.SKIP.value] == len_children:
582590
return TestStatus.SKIP
583591
# We can assume the current item passes if not all children are skipped, and there were no failures
584592
if children_status_counts[TestStatus.FAIL.value] == 0:

ddtrace/internal/ci_visibility/encoder.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import os
23
import threading
34
from typing import TYPE_CHECKING # noqa:F401
45
from uuid import uuid4
@@ -78,8 +79,22 @@ def encode(self):
7879
self._init_buffer()
7980
return payload, buffer_size
8081

82+
def _get_parent_session(self, traces):
83+
for trace in traces:
84+
for span in trace:
85+
if span.get_tag(EVENT_TYPE) == SESSION_TYPE and span.parent_id is not None and span.parent_id != 0:
86+
return span.parent_id
87+
return 0
88+
8189
def _build_payload(self, traces):
82-
normalized_spans = [self._convert_span(span, trace[0].context.dd_origin) for trace in traces for span in trace]
90+
new_parent_session_span_id = self._get_parent_session(traces)
91+
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
92+
normalized_spans = [
93+
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
94+
for trace in traces
95+
for span in trace
96+
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
97+
]
8398
if not normalized_spans:
8499
return None
85100
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))
@@ -93,8 +108,8 @@ def _build_payload(self, traces):
93108
def _pack_payload(payload):
94109
return msgpack_packb(payload)
95110

96-
def _convert_span(self, span, dd_origin):
97-
# type: (Span, str) -> Dict[str, Any]
111+
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
112+
# type: (Span, str, Optional[int]) -> Dict[str, Any]
98113
sp = JSONEncoderV2._span_to_dict(span)
99114
sp = JSONEncoderV2._normalize_span(sp)
100115
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
@@ -103,7 +118,7 @@ def _convert_span(self, span, dd_origin):
103118
sp["metrics"] = dict(sorted(span._metrics.items()))
104119
if dd_origin is not None:
105120
sp["meta"].update({"_dd.origin": dd_origin})
106-
sp = CIVisibilityEncoderV01._filter_ids(sp)
121+
sp = CIVisibilityEncoderV01._filter_ids(sp, new_parent_session_span_id)
107122

108123
version = CIVisibilityEncoderV01.TEST_SUITE_EVENT_VERSION
109124
if span.get_tag(EVENT_TYPE) == "test":
@@ -117,7 +132,7 @@ def _convert_span(self, span, dd_origin):
117132
return {"version": version, "type": event_type, "content": sp}
118133

119134
@staticmethod
120-
def _filter_ids(sp):
135+
def _filter_ids(sp, new_parent_session_span_id=0):
121136
"""
122137
Remove trace/span/parent IDs if non-test event, move session/module/suite IDs from meta to outer content layer.
123138
"""
@@ -130,7 +145,7 @@ def _filter_ids(sp):
130145
sp["parent_id"] = int(sp.get("parent_id") or "1")
131146
sp["span_id"] = int(sp.get("span_id") or "1")
132147
if sp["meta"].get(EVENT_TYPE) in [SESSION_TYPE, MODULE_TYPE, SUITE_TYPE, SpanTypes.TEST]:
133-
test_session_id = sp["meta"].get(SESSION_ID)
148+
test_session_id = new_parent_session_span_id or sp["meta"].get(SESSION_ID)
134149
if test_session_id:
135150
sp[SESSION_ID] = int(test_session_id)
136151
del sp["meta"][SESSION_ID]
@@ -221,8 +236,9 @@ def _build_payload(self, traces):
221236
return None
222237
return b"\r\n".join(self._build_body(data))
223238

224-
def _convert_span(self, span, dd_origin):
225-
# type: (Span, str) -> Dict[str, Any]
239+
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
240+
# type: (Span, str, Optional[int]) -> Dict[str, Any]
241+
# DEV: new_parent_session_span_id is unused here, but it is used in super class
226242
files: Dict[str, Any] = {}
227243

228244
files_struct_tag_value = span.get_struct_tag(COVERAGE_TAG_NAME)

ddtrace/internal/ci_visibility/recorder.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import ddtrace
1616
from ddtrace import config as ddconfig
17+
from ddtrace._trace.context import Context
1718
from ddtrace.contrib import trace_utils
1819
from ddtrace.ext import ci
1920
from ddtrace.ext import test
@@ -1102,10 +1103,10 @@ def _on_discover_session(discover_args: TestSession.DiscoverArgs) -> None:
11021103

11031104

11041105
@_requires_civisibility_enabled
1105-
def _on_start_session() -> None:
1106+
def _on_start_session(context: Optional[Context] = None) -> None:
11061107
log.debug("Handling start session")
11071108
session = CIVisibility.get_session()
1108-
session.start()
1109+
session.start(context)
11091110

11101111

11111112
@_requires_civisibility_enabled
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
CI Visibility: This introduces preliminary support to link children pytest-xdist tests (and test suites and test modules) to their parent sessions, instead of being sent as independent sessions.

0 commit comments

Comments
 (0)