diff --git a/ddtrace/contrib/internal/pytest/_plugin_v1.py b/ddtrace/contrib/internal/pytest/_plugin_v1.py index ffc291afb80..e3865acc628 100644 --- a/ddtrace/contrib/internal/pytest/_plugin_v1.py +++ b/ddtrace/contrib/internal/pytest/_plugin_v1.py @@ -472,7 +472,7 @@ def pytest_sessionstart(session): global _global_skipped_elements _global_skipped_elements = 0 - workspace_path = _CIVisibility.get_workspace_path() + workspace_path = _CIVisibility._instance.get_workspace_path() if workspace_path is None: workspace_path = session.config.rootdir @@ -492,7 +492,7 @@ def pytest_sessionstart(session): test_session_span.set_tag_str(test.COMMAND, test_command) test_session_span.set_tag_str(_SESSION_ID, str(test_session_span.span_id)) - _CIVisibility.set_test_session_name(test_command=test_command) + _CIVisibility._instance.set_test_session_name(test_command=test_command) if _CIVisibility.test_skipping_enabled(): test_session_span.set_tag_str(test.ITR_TEST_SKIPPING_ENABLED, "true") diff --git a/ddtrace/contrib/internal/pytest/_plugin_v2.py b/ddtrace/contrib/internal/pytest/_plugin_v2.py index afdd2b9c168..dc684ff57a5 100644 --- a/ddtrace/contrib/internal/pytest/_plugin_v2.py +++ b/ddtrace/contrib/internal/pytest/_plugin_v2.py @@ -441,7 +441,7 @@ def _pytest_runtest_protocol_pre_yield(item) -> t.Optional[ModuleCodeCollector.C _handle_test_management(item, test_id) _handle_itr_should_skip(item, test_id) - item_will_skip = _pytest_marked_to_skip(item) or InternalTest.was_skipped_by_itr(test_id) + item_will_skip = _pytest_marked_to_skip(item) or InternalTest.was_itr_skipped(test_id) collect_test_coverage = InternalTestSession.should_collect_coverage() and not item_will_skip @@ -470,7 +470,7 @@ def _pytest_runtest_protocol_post_yield(item, nextitem, coverage_collector): # - we trust that the next item is in the same module if it is in the same suite next_test_id = _get_test_id_from_item(nextitem) if nextitem else None if next_test_id is None or next_test_id.parent_id != suite_id: - if InternalTestSuite.is_itr_skippable(suite_id) and not InternalTestSuite.was_forced_run(suite_id): + if InternalTestSuite.is_itr_skippable(suite_id) and not InternalTestSuite.was_itr_forced_run(suite_id): InternalTestSuite.mark_itr_skipped(suite_id) else: _handle_coverage_dependencies(suite_id) @@ -611,7 +611,7 @@ def _process_result(item, result) -> _TestOutcome: # If run with --runxfail flag, tests behave as if they were not marked with xfail, # that's why no XFAIL_REASON or test.RESULT tags will be added. if result.skipped: - if InternalTest.was_skipped_by_itr(test_id): + if InternalTest.was_itr_skipped(test_id): # Items that were skipped by ITR already have their status and reason set return _TestOutcome() @@ -794,7 +794,7 @@ def _pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: if skipped_count > 0: # Update the session's internal _itr_skipped_count so that when _set_itr_tags() is called # during session finishing, it will use the correct worker-aggregated count - InternalTestSession.set_itr_tags(skipped_count) + InternalTestSession.set_itr_skipped_count(skipped_count) InternalTestSession.finish( force_finish_children=True, diff --git a/ddtrace/contrib/internal/pytest/_report_links.py b/ddtrace/contrib/internal/pytest/_report_links.py index 3f5b76bc08e..d57b0526c54 100644 --- a/ddtrace/contrib/internal/pytest/_report_links.py +++ b/ddtrace/contrib/internal/pytest/_report_links.py @@ -3,7 +3,7 @@ from urllib.parse import quote from ddtrace.ext import ci -from ddtrace.internal.ci_visibility import CIVisibility +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service DEFAULT_DATADOG_SITE = "datadoghq.com" @@ -16,10 +16,11 @@ def print_test_report_links(terminalreporter): base_url = _get_base_url( dd_site=os.getenv("DD_SITE", DEFAULT_DATADOG_SITE), dd_subdomain=os.getenv("DD_SUBDOMAIN", "") ) - ci_tags = CIVisibility.get_ci_tags() - settings = CIVisibility.get_session_settings() + ci_visibility_instance = require_ci_visibility_service() + ci_tags = ci_visibility_instance.get_ci_tags() + settings = ci_visibility_instance.get_session_settings() service = settings.test_service - env = CIVisibility.get_dd_env() + env = ci_visibility_instance.get_dd_env() redirect_test_commit_url = _build_test_commit_redirect_url(base_url, ci_tags, service, env) test_runs_url = _build_test_runs_url(base_url, ci_tags) diff --git a/ddtrace/contrib/internal/unittest/patch.py b/ddtrace/contrib/internal/unittest/patch.py index 484c94fa552..bf80fadc985 100644 --- a/ddtrace/contrib/internal/unittest/patch.py +++ b/ddtrace/contrib/internal/unittest/patch.py @@ -659,7 +659,7 @@ def _start_test_session_span(instance) -> ddtrace.trace.Span: "true" if _CIVisibility._instance._collect_coverage_enabled else "false", ) - _CIVisibility.set_test_session_name(test_command=test_command) + _CIVisibility._instance.set_test_session_name(test_command=test_command) if _CIVisibility.test_skipping_enabled(): _set_test_skipping_tags_to_span(test_session_span) diff --git a/ddtrace/ext/test_visibility/_item_ids.py b/ddtrace/ext/test_visibility/_item_ids.py deleted file mode 100644 index aec7bd21ab6..00000000000 --- a/ddtrace/ext/test_visibility/_item_ids.py +++ /dev/null @@ -1,38 +0,0 @@ -"""Provides identifier classes for items used in the Test Visibility API - -NOTE: BETA - this API is currently in development and is subject to change. -""" -import dataclasses -from typing import Optional - -from ddtrace.ext.test_visibility._test_visibility_base import _TestVisibilityChildItemIdBase -from ddtrace.ext.test_visibility._test_visibility_base import _TestVisibilityRootItemIdBase - - -@dataclasses.dataclass(frozen=True) -class TestModuleId(_TestVisibilityRootItemIdBase): - name: str - - def __repr__(self): - return "TestModuleId(module={})".format( - self.name, - ) - - -@dataclasses.dataclass(frozen=True) -class TestSuiteId(_TestVisibilityChildItemIdBase[TestModuleId]): - def __repr__(self): - return "TestSuiteId(module={}, suite={})".format(self.parent_id.name, self.name) - - -@dataclasses.dataclass(frozen=True) -class TestId(_TestVisibilityChildItemIdBase[TestSuiteId]): - parameters: Optional[str] = None # For hashability, a JSON string of a dictionary of parameters - - def __repr__(self): - return "TestId(module={}, suite={}, test={}, parameters={})".format( - self.parent_id.parent_id.name, - self.parent_id.name, - self.name, - self.parameters, - ) diff --git a/ddtrace/ext/test_visibility/_test_visibility_base.py b/ddtrace/ext/test_visibility/_test_visibility_base.py index 883d7da410a..04517bbfabe 100644 --- a/ddtrace/ext/test_visibility/_test_visibility_base.py +++ b/ddtrace/ext/test_visibility/_test_visibility_base.py @@ -2,11 +2,7 @@ import dataclasses from enum import Enum from pathlib import Path -from typing import Any -from typing import Dict from typing import Generic -from typing import List -from typing import NamedTuple from typing import Optional from typing import TypeVar from typing import Union @@ -61,46 +57,57 @@ def get_parent_id(self) -> PT: return self.parent_id -TestVisibilityItemId = TypeVar( - "TestVisibilityItemId", bound=Union[_TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId] -) +@dataclasses.dataclass(frozen=True) +class TestModuleId(_TestVisibilityRootItemIdBase): + name: str + def __repr__(self): + return "TestModuleId(module={})".format( + self.name, + ) -class _TestVisibilityAPIBase(abc.ABC): - __test__ = False - class GetTagArgs(NamedTuple): - item_id: Union[_TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId] - name: str +@dataclasses.dataclass(frozen=True) +class TestSuiteId(_TestVisibilityChildItemIdBase[TestModuleId]): + def __repr__(self): + return "TestSuiteId(module={}, suite={})".format(self.parent_id.name, self.name) - class SetTagArgs(NamedTuple): - item_id: Union[_TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId] - name: str - value: Any - class DeleteTagArgs(NamedTuple): - item_id: Union[_TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId] - name: str +@dataclasses.dataclass(frozen=True) +class TestId(_TestVisibilityChildItemIdBase[TestSuiteId]): + parameters: Optional[str] = None # For hashability, a JSON string of a dictionary of parameters - class SetTagsArgs(NamedTuple): - item_id: Union[_TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId] - tags: Dict[str, Any] + def __repr__(self): + return "TestId(module={}, suite={}, test={}, parameters={})".format( + self.parent_id.parent_id.name, + self.parent_id.name, + self.name, + self.parameters, + ) - class DeleteTagsArgs(NamedTuple): - item_id: Union[_TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId] - names: List[str] + +TestVisibilityItemId = TypeVar( + "TestVisibilityItemId", + bound=Union[ + _TestVisibilityChildItemIdBase, _TestVisibilityRootItemIdBase, TestSessionId, TestModuleId, TestSuiteId, TestId + ], +) + + +class _TestVisibilityAPIBase(abc.ABC): + __test__ = False def __init__(self): raise NotImplementedError("This class is not meant to be instantiated") @staticmethod @abc.abstractmethod - def discover(item_id: TestVisibilityItemId, *args, **kwargs): + def discover(*args, **kwargs): pass @staticmethod @abc.abstractmethod - def start(item_id: TestVisibilityItemId, *args, **kwargs): + def start(*args, **kwargs): pass @staticmethod diff --git a/ddtrace/ext/test_visibility/_utils.py b/ddtrace/ext/test_visibility/_utils.py index e5ea3ac5fc8..f7f9a70be9c 100644 --- a/ddtrace/ext/test_visibility/_utils.py +++ b/ddtrace/ext/test_visibility/_utils.py @@ -1,22 +1,19 @@ from functools import wraps -from typing import Any -from typing import Dict -from typing import List -from ddtrace.ext.test_visibility._test_visibility_base import TestVisibilityItemId -from ddtrace.ext.test_visibility._test_visibility_base import _TestVisibilityAPIBase -from ddtrace.internal import core +from ddtrace import config as ddconfig from ddtrace.internal.logger import get_logger log = get_logger(__name__) +def _noop_decorator(func): + return func + + def _catch_and_log_exceptions(func): """This decorator is meant to be used around all methods of the Test Visibility classes. - It accepts an optional parameter to allow it to be used on functions and methods. - No uncaught errors should ever reach the integration-side, and potentially cause crashes. """ @@ -30,36 +27,5 @@ def wrapper(*args, **kwargs): return wrapper -def _get_item_tag(item_id: TestVisibilityItemId, tag_name: str) -> Any: - log.debug("Getting tag for item %s: %s", item_id, tag_name) - tag_value = core.dispatch_with_results( - "test_visibility.item.get_tag", (_TestVisibilityAPIBase.GetTagArgs(item_id, tag_name),) - ).tag_value.value - return tag_value - - -def _set_item_tag(item_id: TestVisibilityItemId, tag_name: str, tag_value: Any, recurse: bool = False): - log.debug("Setting tag for item %s: %s=%s", item_id, tag_name, tag_value) - core.dispatch("test_visibility.item.set_tag", (_TestVisibilityAPIBase.SetTagArgs(item_id, tag_name, tag_value),)) - - -def _set_item_tags(item_id: TestVisibilityItemId, tags: Dict[str, Any], recurse: bool = False): - log.debug("Setting tags for item %s: %s", item_id, tags) - core.dispatch("test_visibility.item.set_tags", (_TestVisibilityAPIBase.SetTagsArgs(item_id, tags),)) - - -def _delete_item_tag(item_id: TestVisibilityItemId, tag_name: str, recurse: bool = False): - log.debug("Deleting tag for item %s: %s", item_id, tag_name) - core.dispatch("test_visibility.item.delete_tag", (_TestVisibilityAPIBase.DeleteTagArgs(item_id, tag_name),)) - - -def _delete_item_tags(item_id: TestVisibilityItemId, tag_names: List[str], recurse: bool = False): - log.debug("Deleting tags for item %s: %s", item_id, tag_names) - core.dispatch("test_visibility.item.delete_tags", (_TestVisibilityAPIBase.DeleteTagsArgs(item_id, tag_names),)) - - -def _is_item_finished(item_id: TestVisibilityItemId) -> bool: - log.debug("Checking if item %s is finished", item_id) - _is_finished = bool(core.dispatch_with_results("test_visibility.item.is_finished", (item_id,)).is_finished.value) - log.debug("Item %s is finished: %s", item_id, _is_finished) - return _is_finished +if ddconfig._raise: + _catch_and_log_decorator = _noop_decorator diff --git a/ddtrace/ext/test_visibility/api.py b/ddtrace/ext/test_visibility/api.py index 32db2dfedfb..93fbfbe19d3 100644 --- a/ddtrace/ext/test_visibility/api.py +++ b/ddtrace/ext/test_visibility/api.py @@ -12,52 +12,58 @@ All types and methods for interacting with the API are provided and documented in this file. """ -import dataclasses from enum import Enum from pathlib import Path -from types import TracebackType from typing import Any from typing import Dict from typing import List -from typing import NamedTuple from typing import Optional -from typing import Type from ddtrace._trace.context import Context -from ddtrace.ext.test import Status as _TestStatus -from ddtrace.ext.test_visibility._item_ids import TestId -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId from ddtrace.ext.test_visibility._test_visibility_base import TestSessionId -from ddtrace.ext.test_visibility._test_visibility_base import TestSourceFileInfoBase +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.ext.test_visibility._test_visibility_base import TestVisibilityItemId from ddtrace.ext.test_visibility._test_visibility_base import _TestVisibilityAPIBase from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -from ddtrace.ext.test_visibility._utils import _delete_item_tag -from ddtrace.ext.test_visibility._utils import _delete_item_tags -from ddtrace.ext.test_visibility._utils import _get_item_tag -from ddtrace.ext.test_visibility._utils import _is_item_finished -from ddtrace.ext.test_visibility._utils import _set_item_tag -from ddtrace.ext.test_visibility._utils import _set_item_tags -from ddtrace.internal import core +from ddtrace.ext.test_visibility.status import TestExcInfo +from ddtrace.ext.test_visibility.status import TestSourceFileInfo +from ddtrace.ext.test_visibility.status import TestStatus +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.logger import get_logger as _get_logger +def _get_item_tag(item_id: TestVisibilityItemId, tag_name: str) -> Any: + return require_ci_visibility_service().get_item_by_id(item_id).get_tag(tag_name) + + +def _set_item_tag(item_id: TestVisibilityItemId, tag_name: str, tag_value: Any) -> None: + require_ci_visibility_service().get_item_by_id(item_id).set_tag(tag_name, tag_value) + + +def _set_item_tags(item_id: TestVisibilityItemId, tags: Dict[str, Any]) -> None: + require_ci_visibility_service().get_item_by_id(item_id).set_tags(tags) + + +def _delete_item_tag(item_id: TestVisibilityItemId, tag_name: str) -> None: + require_ci_visibility_service().get_item_by_id(item_id).delete_tag(tag_name) + + +def _delete_item_tags(item_id: TestVisibilityItemId, tag_names: List[str]) -> None: + require_ci_visibility_service().get_item_by_id(item_id).delete_tags(tag_names) + + +def _is_item_finished(item_id: TestVisibilityItemId) -> bool: + return require_ci_visibility_service().get_item_by_id(item_id).is_finished() + + log = _get_logger(__name__) # this triggers the registration of trace handlers after civis startup import ddtrace._trace.trace_handlers # noqa: F401, E402 -class TestStatus(Enum): - __test__ = False - PASS = _TestStatus.PASS.value - FAIL = _TestStatus.FAIL.value - SKIP = _TestStatus.SKIP.value - XFAIL = _TestStatus.XFAIL.value - XPASS = _TestStatus.XPASS.value - - DEFAULT_SESSION_NAME = "test_visibility_session" @@ -68,25 +74,12 @@ class DEFAULT_OPERATION_NAMES(Enum): TEST = "test_visibility.test" -@dataclasses.dataclass(frozen=True) -class TestSourceFileInfo(TestSourceFileInfoBase): - path: Path - start_line: Optional[int] = None - end_line: Optional[int] = None - - -@dataclasses.dataclass(frozen=True) -class TestExcInfo: - __test__ = False - exc_type: Type[BaseException] - exc_value: BaseException - exc_traceback: TracebackType - - @_catch_and_log_exceptions def enable_test_visibility(config: Optional[Any] = None): log.debug("Enabling Test Visibility with config: %s", config) - core.dispatch("test_visibility.enable", (config,)) + from ddtrace.internal.ci_visibility.recorder import CIVisibility + + CIVisibility.enable(config=config) if not is_test_visibility_enabled(): log.warning("Failed to enable Test Visibility") @@ -94,14 +87,18 @@ def enable_test_visibility(config: Optional[Any] = None): @_catch_and_log_exceptions def is_test_visibility_enabled(): - return core.dispatch_with_results("test_visibility.is_enabled").is_enabled.value + try: + return require_ci_visibility_service().enabled + except RuntimeError: + return False @_catch_and_log_exceptions def disable_test_visibility(): log.debug("Disabling Test Visibility") - core.dispatch("test_visibility.disable") - if is_test_visibility_enabled(): + ci_visibility_instance = require_ci_visibility_service() + ci_visibility_instance.disable() + if ci_visibility_instance.enabled: log.warning("Failed to disable Test Visibility") @@ -111,20 +108,20 @@ def get_tag(item_id: TestVisibilityItemId, tag_name: str) -> Any: return _get_item_tag(item_id, tag_name) @staticmethod - def set_tag(item_id: TestVisibilityItemId, tag_name: str, tag_value: Any, recurse: bool = False): - _set_item_tag(item_id, tag_name, tag_value, recurse) + def set_tag(item_id: TestVisibilityItemId, tag_name: str, tag_value: Any): + _set_item_tag(item_id, tag_name, tag_value) @staticmethod - def set_tags(item_id: TestVisibilityItemId, tags: Dict[str, Any], recurse: bool = False): - _set_item_tags(item_id, tags, recurse) + def set_tags(item_id: TestVisibilityItemId, tags: Dict[str, Any]): + _set_item_tags(item_id, tags) @staticmethod - def delete_tag(item_id: TestVisibilityItemId, tag_name: str, recurse: bool = False): - _delete_item_tag(item_id, tag_name, recurse) + def delete_tag(item_id: TestVisibilityItemId, tag_name: str): + _delete_item_tag(item_id, tag_name) @staticmethod - def delete_tags(item_id: TestVisibilityItemId, tag_names: List[str], recurse: bool = False): - _delete_item_tags(item_id, tag_names, recurse) + def delete_tags(item_id: TestVisibilityItemId, tag_names: List[str]): + _delete_item_tags(item_id, tag_names) @staticmethod def is_finished(item_id: TestVisibilityItemId) -> bool: @@ -132,19 +129,7 @@ def is_finished(item_id: TestVisibilityItemId) -> bool: class TestSession(_TestVisibilityAPIBase): - class DiscoverArgs(NamedTuple): - test_command: str - reject_duplicates: bool - test_framework: str - test_framework_version: str - session_operation_name: str - module_operation_name: str - suite_operation_name: str - test_operation_name: str - root_dir: Optional[Path] = None - @staticmethod - @_catch_and_log_exceptions def discover( test_command: str, test_framework: str, @@ -161,32 +146,28 @@ def discover( log.debug("Test Visibility is not enabled, session not registered.") return - core.dispatch( - "test_visibility.session.discover", - ( - TestSession.DiscoverArgs( - test_command, - reject_duplicates, - test_framework, - test_framework_version, - session_operation_name, - module_operation_name, - suite_operation_name, - test_operation_name, - root_dir, - ), - ), + from ddtrace.internal.ci_visibility.recorder import on_discover_session + + on_discover_session( + test_command=test_command, + reject_duplicates=reject_duplicates, + test_framework=test_framework, + test_framework_version=test_framework_version, + session_operation_name=session_operation_name, + module_operation_name=module_operation_name, + suite_operation_name=suite_operation_name, + test_operation_name=test_operation_name, + root_dir=root_dir, ) @staticmethod @_catch_and_log_exceptions def start(distributed_children: bool = False, context: Optional[Context] = None): log.debug("Starting session") - core.dispatch("test_visibility.session.start", (distributed_children, context)) - - class FinishArgs(NamedTuple): - force_finish_children: bool - override_status: Optional[TestStatus] + session = require_ci_visibility_service().get_session() + session.start(context) + if distributed_children: + session.set_distributed_children() @staticmethod @_catch_and_log_exceptions @@ -196,52 +177,54 @@ def finish( ): log.debug("Finishing session, force_finish_session_modules: %s", force_finish_children) - core.dispatch( - "test_visibility.session.finish", (TestSession.FinishArgs(force_finish_children, override_status),) - ) + session = require_ci_visibility_service().get_session() + session.finish(force=force_finish_children, override_status=override_status) @staticmethod def get_tag(tag_name: str) -> Any: return _get_item_tag(TestSessionId(), tag_name) @staticmethod - def set_tag(tag_name: str, tag_value: Any, recurse: bool = False): - _set_item_tag(TestSessionId(), tag_name, tag_value, recurse) + def set_tag(tag_name: str, tag_value: Any): + _set_item_tag(TestSessionId(), tag_name, tag_value) @staticmethod - def set_tags(tags: Dict[str, Any], recurse: bool = False): - _set_item_tags(TestSessionId(), tags, recurse) + def set_tags(tags: Dict[str, Any]): + _set_item_tags(TestSessionId(), tags) @staticmethod - def delete_tag(tag_name: str, recurse: bool = False): - _delete_item_tag(TestSessionId(), tag_name, recurse) + def delete_tag(tag_name: str): + _delete_item_tag(TestSessionId(), tag_name) @staticmethod - def delete_tags(tag_names: List[str], recurse: bool = False): - _delete_item_tags(TestSessionId(), tag_names, recurse) + def delete_tags(tag_names: List[str]): + _delete_item_tags(TestSessionId(), tag_names) class TestModule(TestBase): - class DiscoverArgs(NamedTuple): - module_id: TestModuleId - module_path: Optional[Path] = None - - class FinishArgs(NamedTuple): - module_id: TestModuleId - override_status: Optional[TestStatus] = None - force_finish_children: bool = False - @staticmethod @_catch_and_log_exceptions def discover(item_id: TestModuleId, module_path: Optional[Path] = None): + from ddtrace.internal.ci_visibility.api._module import TestVisibilityModule + log.debug("Registered module %s", item_id) - core.dispatch("test_visibility.module.discover", (TestModule.DiscoverArgs(item_id, module_path),)) + ci_visibility_instance = require_ci_visibility_service() + session = ci_visibility_instance.get_session() + + session.add_child( + item_id, + TestVisibilityModule( + item_id.name, + ci_visibility_instance.get_session_settings(), + module_path, + ), + ) @staticmethod @_catch_and_log_exceptions - def start(item_id: TestModuleId): + def start(item_id: TestModuleId, *args, **kwargs): log.debug("Starting module %s", item_id) - core.dispatch("test_visibility.module.start", (item_id,)) + require_ci_visibility_service().get_module_by_id(item_id).start() @staticmethod @_catch_and_log_exceptions @@ -256,17 +239,13 @@ def finish( override_status, force_finish_children, ) - core.dispatch( - "test_visibility.module.finish", (TestModule.FinishArgs(item_id, override_status, force_finish_children),) + + require_ci_visibility_service().get_module_by_id(item_id).finish( + force=force_finish_children, override_status=override_status ) class TestSuite(TestBase): - class DiscoverArgs(NamedTuple): - suite_id: TestSuiteId - codeowners: Optional[List[str]] = None - source_file_info: Optional[TestSourceFileInfo] = None - @staticmethod @_catch_and_log_exceptions def discover( @@ -276,20 +255,26 @@ def discover( ): """Registers a test suite with the Test Visibility service.""" log.debug("Registering suite %s, source: %s", item_id, source_file_info) - core.dispatch( - "test_visibility.suite.discover", (TestSuite.DiscoverArgs(item_id, codeowners, source_file_info),) + from ddtrace.internal.ci_visibility.api._suite import TestVisibilitySuite + + ci_visibility_instance = require_ci_visibility_service() + module = ci_visibility_instance.get_module_by_id(item_id.parent_id) + + module.add_child( + item_id, + TestVisibilitySuite( + item_id.name, + ci_visibility_instance.get_session_settings(), + codeowners, + source_file_info, + ), ) @staticmethod @_catch_and_log_exceptions def start(item_id: TestSuiteId): log.debug("Starting suite %s", item_id) - core.dispatch("test_visibility.suite.start", (item_id,)) - - class FinishArgs(NamedTuple): - suite_id: TestSuiteId - force_finish_children: bool = False - override_status: Optional[TestStatus] = None + require_ci_visibility_service().get_suite_by_id(item_id).start() @staticmethod @_catch_and_log_exceptions @@ -304,19 +289,13 @@ def finish( force_finish_children, override_status, ) - core.dispatch( - "test_visibility.suite.finish", - (TestSuite.FinishArgs(item_id, force_finish_children, override_status),), + + require_ci_visibility_service().get_suite_by_id(item_id).finish( + force=force_finish_children, override_status=override_status ) class Test(TestBase): - class DiscoverArgs(NamedTuple): - test_id: TestId - codeowners: Optional[List[str]] = None - source_file_info: Optional[TestSourceFileInfo] = None - resource: Optional[str] = None - @staticmethod @_catch_and_log_exceptions def discover( @@ -326,6 +305,9 @@ def discover( resource: Optional[str] = None, ): """Registers a test with the Test Visibility service.""" + from ddtrace.internal.ci_visibility._api_client import TestProperties + from ddtrace.internal.ci_visibility.api._test import TestVisibilityTest + log.debug( "Discovering test %s, codeowners: %s, source file: %s, resource: %s", item_id, @@ -333,21 +315,48 @@ def discover( source_file_info, resource, ) - core.dispatch( - "test_visibility.test.discover", (Test.DiscoverArgs(item_id, codeowners, source_file_info, resource),) + + log.debug("Handling discovery for test %s", item_id) + ci_visibility_instance = require_ci_visibility_service() + suite = ci_visibility_instance.get_suite_by_id(item_id.parent_id) + + # New tests are currently only considered for EFD: + # - if known tests were fetched properly (enforced by is_known_test) + # - if they have no parameters + if ci_visibility_instance.is_known_tests_enabled() and item_id.parameters is None: + is_new = not ci_visibility_instance.is_known_test(item_id) + else: + is_new = False + + test_properties = None + if ci_visibility_instance.is_test_management_enabled(): + test_properties = ci_visibility_instance.get_test_properties(item_id) + + if not test_properties: + test_properties = TestProperties() + + suite.add_child( + item_id, + TestVisibilityTest( + item_id.name, + ci_visibility_instance.get_session_settings(), + parameters=item_id.parameters, + codeowners=codeowners, + source_file_info=source_file_info, + resource=resource, + is_new=is_new, + is_quarantined=test_properties.quarantined, + is_disabled=test_properties.disabled, + is_attempt_to_fix=test_properties.attempt_to_fix, + ), ) @staticmethod @_catch_and_log_exceptions def start(item_id: TestId): log.debug("Starting test %s", item_id) - core.dispatch("test_visibility.test.start", (item_id,)) - class FinishArgs(NamedTuple): - test_id: TestId - status: TestStatus - skip_reason: Optional[str] = None - exc_info: Optional[TestExcInfo] = None + require_ci_visibility_service().get_test_by_id(item_id).start() @staticmethod @_catch_and_log_exceptions @@ -364,16 +373,17 @@ def finish( skip_reason, exc_info, ) - core.dispatch( - "test_visibility.test.finish", - (Test.FinishArgs(item_id, status, skip_reason=skip_reason, exc_info=exc_info),), + + require_ci_visibility_service().get_test_by_id(item_id).finish_test( + status=status, skip_reason=skip_reason, exc_info=exc_info ) @staticmethod @_catch_and_log_exceptions def set_parameters(item_id: TestId, params: str): log.debug("Setting test %s parameters to %s", item_id, params) - core.dispatch("test_visibility.test.set_parameters", (item_id, params)) + + require_ci_visibility_service().get_test_by_id(item_id).set_parameters(parameters=params) @staticmethod @_catch_and_log_exceptions diff --git a/ddtrace/ext/test_visibility/status.py b/ddtrace/ext/test_visibility/status.py new file mode 100644 index 00000000000..0b9ffaf4240 --- /dev/null +++ b/ddtrace/ext/test_visibility/status.py @@ -0,0 +1,33 @@ +import dataclasses +from enum import Enum +from pathlib import Path +from types import TracebackType +from typing import Optional +from typing import Type + +from ddtrace.ext.test import Status as _TestStatus +from ddtrace.ext.test_visibility._test_visibility_base import TestSourceFileInfoBase + + +class TestStatus(Enum): + __test__ = False + PASS = _TestStatus.PASS.value + FAIL = _TestStatus.FAIL.value + SKIP = _TestStatus.SKIP.value + XFAIL = _TestStatus.XFAIL.value + XPASS = _TestStatus.XPASS.value + + +@dataclasses.dataclass(frozen=True) +class TestExcInfo: + __test__ = False + exc_type: Type[BaseException] + exc_value: BaseException + exc_traceback: TracebackType + + +@dataclasses.dataclass(frozen=True) +class TestSourceFileInfo(TestSourceFileInfoBase): + path: Path + start_line: Optional[int] = None + end_line: Optional[int] = None diff --git a/ddtrace/internal/ci_visibility/_api_client.py b/ddtrace/internal/ci_visibility/_api_client.py index 86f6ab4029a..86e062e23fa 100644 --- a/ddtrace/internal/ci_visibility/_api_client.py +++ b/ddtrace/internal/ci_visibility/_api_client.py @@ -10,8 +10,8 @@ from uuid import uuid4 from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.internal.ci_visibility.constants import AGENTLESS_API_KEY_HEADER_NAME from ddtrace.internal.ci_visibility.constants import AGENTLESS_DEFAULT_SITE from ddtrace.internal.ci_visibility.constants import EVP_PROXY_AGENT_BASE_PATH diff --git a/ddtrace/internal/ci_visibility/api/_base.py b/ddtrace/internal/ci_visibility/api/_base.py index dad5cf56e0b..6eb5a95e35b 100644 --- a/ddtrace/internal/ci_visibility/api/_base.py +++ b/ddtrace/internal/ci_visibility/api/_base.py @@ -18,11 +18,11 @@ from ddtrace.ext import SpanTypes from ddtrace.ext import test from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL -from ddtrace.ext.test_visibility._item_ids import TestId -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId -from ddtrace.ext.test_visibility.api import TestSourceFileInfo -from ddtrace.ext.test_visibility.api import TestStatus +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId +from ddtrace.ext.test_visibility.status import TestSourceFileInfo +from ddtrace.ext.test_visibility.status import TestStatus from ddtrace.internal.ci_visibility._api_client import EarlyFlakeDetectionSettings from ddtrace.internal.ci_visibility._api_client import TestManagementSettings from ddtrace.internal.ci_visibility.api._coverage_data import TestVisibilityCoverageData diff --git a/ddtrace/internal/ci_visibility/api/_module.py b/ddtrace/internal/ci_visibility/api/_module.py index 306f391ec3a..b029954b043 100644 --- a/ddtrace/internal/ci_visibility/api/_module.py +++ b/ddtrace/internal/ci_visibility/api/_module.py @@ -3,8 +3,8 @@ from ddtrace.ext import test from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.internal.ci_visibility.api._base import TestVisibilityChildItem from ddtrace.internal.ci_visibility.api._base import TestVisibilityParentItem from ddtrace.internal.ci_visibility.api._base import TestVisibilitySessionSettings diff --git a/ddtrace/internal/ci_visibility/api/_session.py b/ddtrace/internal/ci_visibility/api/_session.py index 35ef85001a0..90f87b6aa83 100644 --- a/ddtrace/internal/ci_visibility/api/_session.py +++ b/ddtrace/internal/ci_visibility/api/_session.py @@ -4,8 +4,8 @@ from ddtrace.ext import test from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility.api import TestStatus +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility.status import TestStatus from ddtrace.internal.ci_visibility.api._base import TestVisibilityParentItem from ddtrace.internal.ci_visibility.api._base import TestVisibilitySessionSettings from ddtrace.internal.ci_visibility.api._module import TestVisibilityModule diff --git a/ddtrace/internal/ci_visibility/api/_suite.py b/ddtrace/internal/ci_visibility/api/_suite.py index ba24f82c05c..6c8da13ac6e 100644 --- a/ddtrace/internal/ci_visibility/api/_suite.py +++ b/ddtrace/internal/ci_visibility/api/_suite.py @@ -4,10 +4,10 @@ from typing import Optional from ddtrace.ext import test -from ddtrace.ext.test_visibility._item_ids import TestId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId -from ddtrace.ext.test_visibility.api import TestSourceFileInfo -from ddtrace.ext.test_visibility.api import TestStatus +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId +from ddtrace.ext.test_visibility.status import TestSourceFileInfo +from ddtrace.ext.test_visibility.status import TestStatus from ddtrace.internal.ci_visibility.api._base import TestVisibilityChildItem from ddtrace.internal.ci_visibility.api._base import TestVisibilityParentItem from ddtrace.internal.ci_visibility.api._base import TestVisibilitySessionSettings diff --git a/ddtrace/internal/ci_visibility/api/_test.py b/ddtrace/internal/ci_visibility/api/_test.py index a12966d39af..55b91cb3d5d 100644 --- a/ddtrace/internal/ci_visibility/api/_test.py +++ b/ddtrace/internal/ci_visibility/api/_test.py @@ -9,10 +9,10 @@ from ddtrace.ext import SpanTypes from ddtrace.ext import test from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL -from ddtrace.ext.test_visibility._item_ids import TestId -from ddtrace.ext.test_visibility.api import TestExcInfo -from ddtrace.ext.test_visibility.api import TestSourceFileInfo -from ddtrace.ext.test_visibility.api import TestStatus +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility.status import TestExcInfo +from ddtrace.ext.test_visibility.status import TestSourceFileInfo +from ddtrace.ext.test_visibility.status import TestStatus from ddtrace.internal.ci_visibility.api._base import SPECIAL_STATUS from ddtrace.internal.ci_visibility.api._base import TestVisibilityChildItem from ddtrace.internal.ci_visibility.api._base import TestVisibilityItemBase @@ -190,18 +190,18 @@ def _telemetry_record_event_finished(self): def finish_test( self, status: Optional[TestStatus] = None, - reason: Optional[str] = None, + skip_reason: Optional[str] = None, exc_info: Optional[TestExcInfo] = None, override_finish_time: Optional[float] = None, ) -> None: - log.debug("Test Visibility: finishing %s, with status: %s, reason: %s", self, status, reason) + log.debug("Test Visibility: finishing %s, with status: %s, skip_reason: %s", self, status, skip_reason) self.set_tag(test.TYPE, SpanTypes.TEST) if status is not None: self.set_status(status) - if reason is not None: - self.set_tag(test.SKIP_REASON, reason) + if skip_reason is not None: + self.set_tag(test.SKIP_REASON, skip_reason) if exc_info is not None: self._exc_info = exc_info @@ -372,13 +372,19 @@ def efd_add_retry(self, start_immediately=False) -> Optional[int]: def efd_start_retry(self, retry_number: int) -> None: self._efd_get_retry_test(retry_number).start() - def efd_finish_retry(self, retry_number: int, status: TestStatus, exc_info: Optional[TestExcInfo] = None) -> None: + def efd_finish_retry( + self, + retry_number: int, + status: TestStatus, + skip_reason: Optional[str] = None, + exc_info: Optional[TestExcInfo] = None, + ) -> None: retry_test = self._efd_get_retry_test(retry_number) if status is not None: retry_test.set_status(status) - retry_test.finish_test(status, exc_info=exc_info) + retry_test.finish_test(status=status, skip_reason=skip_reason, exc_info=exc_info) def efd_get_final_status(self) -> EFDTestStatus: status_counts: Dict[TestStatus, int] = { @@ -465,7 +471,13 @@ def atr_add_retry(self, start_immediately=False) -> Optional[int]: def atr_start_retry(self, retry_number: int): self._atr_get_retry_test(retry_number).start() - def atr_finish_retry(self, retry_number: int, status: TestStatus, exc_info: Optional[TestExcInfo] = None): + def atr_finish_retry( + self, + retry_number: int, + status: TestStatus, + skip_reason: Optional[str] = None, + exc_info: Optional[TestExcInfo] = None, + ): retry_test = self._atr_get_retry_test(retry_number) if retry_number >= self._session_settings.atr_settings.max_retries: @@ -475,7 +487,7 @@ def atr_finish_retry(self, retry_number: int, status: TestStatus, exc_info: Opti if self.atr_get_final_status() == TestStatus.FAIL: retry_test.set_tag(TEST_HAS_FAILED_ALL_RETRIES, True) - retry_test.finish_test(status, exc_info=exc_info) + retry_test.finish_test(status=status, skip_reason=skip_reason, exc_info=exc_info) def atr_get_final_status(self) -> TestStatus: if self._status in [TestStatus.PASS, TestStatus.SKIP]: @@ -538,7 +550,11 @@ def attempt_to_fix_start_retry(self, retry_number: int): self._attempt_to_fix_get_retry_test(retry_number).start() def attempt_to_fix_finish_retry( - self, retry_number: int, status: TestStatus, exc_info: Optional[TestExcInfo] = None + self, + retry_number: int, + status: TestStatus, + skip_reason: Optional[str] = None, + exc_info: Optional[TestExcInfo] = None, ): retry_test = self._attempt_to_fix_get_retry_test(retry_number) @@ -557,7 +573,7 @@ def attempt_to_fix_finish_retry( retry_test.set_tag(TEST_ATTEMPT_TO_FIX_PASSED, all_passed) - retry_test.finish_test(status, exc_info=exc_info) + retry_test.finish_test(status, skip_reason=skip_reason, exc_info=exc_info) def attempt_to_fix_get_final_status(self) -> TestStatus: if all(retry._status == TestStatus.PASS for retry in self._attempt_to_fix_retries): diff --git a/ddtrace/internal/ci_visibility/recorder.py b/ddtrace/internal/ci_visibility/recorder.py index b2acd2fc250..538f2b09d17 100644 --- a/ddtrace/internal/ci_visibility/recorder.py +++ b/ddtrace/internal/ci_visibility/recorder.py @@ -6,7 +6,6 @@ from typing import Any from typing import Callable from typing import Dict -from typing import List from typing import Optional from typing import Set from typing import Union @@ -14,25 +13,17 @@ import ddtrace from ddtrace import config as ddconfig -from ddtrace._trace.context import Context from ddtrace.contrib import trace_utils from ddtrace.ext import ci from ddtrace.ext import test from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId from ddtrace.ext.test_visibility._test_visibility_base import TestSessionId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.ext.test_visibility._test_visibility_base import TestVisibilityItemId -from ddtrace.ext.test_visibility.api import Test -from ddtrace.ext.test_visibility.api import TestBase -from ddtrace.ext.test_visibility.api import TestId -from ddtrace.ext.test_visibility.api import TestModule -from ddtrace.ext.test_visibility.api import TestModuleId -from ddtrace.ext.test_visibility.api import TestSession -from ddtrace.ext.test_visibility.api import TestStatus -from ddtrace.ext.test_visibility.api import TestSuite -from ddtrace.ext.test_visibility.api import TestSuiteId from ddtrace.internal import agent from ddtrace.internal import atexit -from ddtrace.internal import core from ddtrace.internal import telemetry from ddtrace.internal.ci_visibility._api_client import AgentlessTestVisibilityAPIClient from ddtrace.internal.ci_visibility._api_client import EarlyFlakeDetectionSettings @@ -68,27 +59,20 @@ from ddtrace.internal.ci_visibility.git_client import CIVisibilityGitClient from ddtrace.internal.ci_visibility.git_data import GitData from ddtrace.internal.ci_visibility.git_data import get_git_data_from_tags +from ddtrace.internal.ci_visibility.service_registry import register_ci_visibility_instance +from ddtrace.internal.ci_visibility.service_registry import unregister_ci_visibility_instance from ddtrace.internal.ci_visibility.utils import _get_test_framework_telemetry_name from ddtrace.internal.ci_visibility.writer import CIVisibilityEventClient from ddtrace.internal.ci_visibility.writer import CIVisibilityWriter from ddtrace.internal.codeowners import Codeowners from ddtrace.internal.logger import get_logger from ddtrace.internal.service import Service -from ddtrace.internal.test_visibility._atr_mixins import ATRTestMixin from ddtrace.internal.test_visibility._atr_mixins import AutoTestRetriesSettings -from ddtrace.internal.test_visibility._attempt_to_fix_mixins import AttemptToFixTestMixin -from ddtrace.internal.test_visibility._benchmark_mixin import BenchmarkTestMixin -from ddtrace.internal.test_visibility._efd_mixins import EFDTestMixin -from ddtrace.internal.test_visibility._efd_mixins import EFDTestStatus from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId -from ddtrace.internal.test_visibility._itr_mixins import ITRMixin from ddtrace.internal.test_visibility._library_capabilities import LibraryCapabilities -from ddtrace.internal.test_visibility.api import InternalTest -from ddtrace.internal.test_visibility.coverage_lines import CoverageLines from ddtrace.internal.utils.formats import asbool from ddtrace.settings import IntegrationConfig from ddtrace.settings._agent import config as agent_config -from ddtrace.trace import Span from ddtrace.trace import Tracer @@ -145,6 +129,20 @@ def _get_custom_configurations() -> Dict[str, str]: return custom_configurations +def _is_item_itr_skippable(item_id: TestVisibilityItemId, suite_skipping_mode: bool, itr_data: Optional[ITRData]): + if itr_data is None: + return False + + if isinstance(item_id, TestSuiteId) and not suite_skipping_mode: + log.debug("Skipping mode is at test level, but item is a suite: %s", item_id) + return False + + if isinstance(item_id, TestId) and suite_skipping_mode: + log.debug("Skipping mode is at suite level, but item is a test: %s", item_id) + return False + return item_id in itr_data.skippable_items + + class CIVisibilityTracer(Tracer): def __init__(self, *args, **kwargs) -> None: # Allows for multiple instances of the civis tracer to be created without logging a warning @@ -572,6 +570,10 @@ def _should_skip_path(self, path: str, name: str, test_skipping_mode: Optional[s @classmethod def enable(cls, tracer=None, config=None, service=None) -> None: log.debug("Enabling %s", cls.__name__) + if cls._instance is not None: + log.debug("%s already enabled", cls.__name__) + return + if ddconfig._ci_visibility_agentless_enabled: if not os.getenv("_CI_DD_API_KEY", os.getenv("DD_API_KEY")): log.critical( @@ -582,12 +584,11 @@ def enable(cls, tracer=None, config=None, service=None) -> None: cls.enabled = False return - if cls._instance is not None: - log.debug("%s already enabled", cls.__name__) - return - try: cls._instance = cls(tracer=tracer, config=config, service=service) + # Register with service registry for other modules to access + register_ci_visibility_instance(cls._instance) + except CIVisibilityAuthenticationException: log.warning("Authentication error, disabling CI Visibility, please check Datadog API key") cls.enabled = False @@ -607,11 +608,11 @@ def enable(cls, tracer=None, config=None, service=None) -> None: "Flaky Test Management: %s, " "Known Tests: %s", cls._instance._collect_coverage_enabled, - CIVisibility.test_skipping_enabled(), - CIVisibility.is_efd_enabled(), - CIVisibility.is_atr_enabled(), - CIVisibility.is_test_management_enabled(), - CIVisibility.is_known_tests_enabled(), + cls._instance.test_skipping_enabled(), + cls._instance.is_efd_enabled(), + cls._instance.is_atr_enabled(), + cls._instance.is_test_management_enabled(), + cls._instance.is_known_tests_enabled(), ) @classmethod @@ -622,6 +623,9 @@ def disable(cls) -> None: log.debug("Disabling %s", cls.__name__) atexit.unregister(cls.disable) + # Unregister from service registry first + unregister_ci_visibility_instance() + cls._instance.stop() cls._instance = None cls.enabled = False @@ -807,61 +811,25 @@ def get_instance(cls) -> "CIVisibility": raise CIVisibilityError(error_msg) return cls._instance - @classmethod - def get_tracer(cls) -> Optional[Tracer]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None: - return None - return instance.tracer + def get_tracer(self) -> Optional[Tracer]: + return self.tracer - @classmethod - def get_service(cls) -> Optional[str]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None: - return None - return instance._service + def get_service(self) -> Optional[str]: + return self._service - @classmethod - def get_codeowners(cls) -> Optional[Codeowners]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None: - return None - return instance._codeowners + def get_codeowners(self) -> Optional[Codeowners]: + return self._codeowners - @classmethod - def get_efd_api_settings(cls) -> Optional[EarlyFlakeDetectionSettings]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None or instance._api_settings is None: + def get_efd_api_settings(self) -> Optional[EarlyFlakeDetectionSettings]: + if self._api_settings is None: return None - return instance._api_settings.early_flake_detection + return self._api_settings.early_flake_detection - @classmethod - def get_atr_api_settings(cls) -> Optional[AutoTestRetriesSettings]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None or instance._api_settings is None: + def get_atr_api_settings(self) -> Optional[AutoTestRetriesSettings]: + if self._api_settings is None: return None - if instance._api_settings.flaky_test_retries_enabled: + if self._api_settings.flaky_test_retries_enabled: # NOTE: this is meant to come from integration settings but current plans to rewrite how integration # settings are defined make it better for this logic to be temporarily defined here. @@ -894,68 +862,25 @@ def get_atr_api_settings(cls) -> Optional[AutoTestRetriesSettings]: return None - @classmethod - def get_test_management_api_settings(cls) -> Optional[TestManagementSettings]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None or instance._api_settings is None: + def get_test_management_api_settings(self) -> Optional[TestManagementSettings]: + if self._api_settings is None: return None - return instance._api_settings.test_management - - @classmethod - def get_workspace_path(cls) -> Optional[str]: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None: - return None - return instance._tags.get(ci.WORKSPACE_PATH) - - @classmethod - def is_item_itr_skippable(cls, item_id: TestVisibilityItemId) -> bool: - if not cls.enabled: - error_msg = "CI Visibility is not enabled" - log.warning(error_msg) - raise CIVisibilityError(error_msg) - instance = cls.get_instance() - if instance is None or instance._itr_data is None: - return False + return self._api_settings.test_management - if isinstance(item_id, TestSuiteId) and not instance._suite_skipping_mode: - log.debug("Skipping mode is suite, but item is not a suite: %s", item_id) - return False + def get_workspace_path(self) -> Optional[str]: + return self._tags.get(ci.WORKSPACE_PATH) - if isinstance(item_id, TestId) and instance._suite_skipping_mode: - log.debug("Skipping mode is test, but item is not a test: %s", item_id) - return False - return item_id in instance._itr_data.skippable_items + def is_item_itr_skippable(self, item_id: TestVisibilityItemId) -> bool: + return _is_item_itr_skippable(item_id, self._suite_skipping_mode, self._itr_data) - @classmethod - def is_unknown_ci(cls) -> bool: - instance = cls.get_instance() - if instance is None: - return False + def is_unknown_ci(self) -> bool: + return self._tags.get(ci.PROVIDER_NAME) is None - return instance._tags.get(ci.PROVIDER_NAME) is None + def ci_provider_name_for_telemetry(self) -> str: + return TELEMETRY_BY_PROVIDER_NAME.get(self._tags.get(ci.PROVIDER_NAME, UNSUPPORTED), UNSUPPORTED_PROVIDER) - @classmethod - def ci_provider_name_for_telemetry(cls) -> str: - instance = cls.get_instance() - if instance is None: - return UNSUPPORTED_PROVIDER - return TELEMETRY_BY_PROVIDER_NAME.get(instance._tags.get(ci.PROVIDER_NAME, UNSUPPORTED), UNSUPPORTED_PROVIDER) - - @classmethod - def is_auto_injected(cls) -> bool: - instance = cls.get_instance() - if instance is None: - return False - return instance._is_auto_injected + def is_auto_injected(self) -> bool: + return self._is_auto_injected def _get_ci_visibility_event_client(self) -> Optional[CIVisibilityEventClient]: writer = self.tracer._span_aggregator.writer @@ -966,10 +891,8 @@ def _get_ci_visibility_event_client(self) -> Optional[CIVisibilityEventClient]: return None - @classmethod - def set_test_session_name(cls, test_command: str) -> None: - instance = cls.get_instance() - client = instance._get_ci_visibility_event_client() + def set_test_session_name(self, test_command: str) -> None: + client = self._get_ci_visibility_event_client() if not client: log.debug("Not setting test session name because no CIVisibilityEventClient is active") return @@ -977,52 +900,36 @@ def set_test_session_name(cls, test_command: str) -> None: if ddconfig._test_session_name: test_session_name = ddconfig._test_session_name else: - job_name = instance._tags.get(ci.JOB_NAME) + job_name = self._tags.get(ci.JOB_NAME) test_session_name = f"{job_name}-{test_command}" if job_name else test_command log.debug("Setting test session name: %s", test_session_name) client.set_test_session_name(test_session_name) - @classmethod - def set_library_capabilities(cls, capabilities: LibraryCapabilities) -> None: - instance = cls.get_instance() - client = instance._get_ci_visibility_event_client() + def set_library_capabilities(self, capabilities: LibraryCapabilities) -> None: + client = self._get_ci_visibility_event_client() if not client: log.debug("Not setting library capabilities because no CIVisibilityEventClient is active") return client.set_metadata("test", capabilities.tags()) - @classmethod - def get_ci_tags(cls): - instance = cls.get_instance() - return instance._tags - - @classmethod - def get_dd_env(cls): - instance = cls.get_instance() - return instance._dd_env + def get_ci_tags(self): + return self._tags - @classmethod - def is_known_test(cls, test_id: Union[TestId, InternalTestId]) -> bool: - instance = cls.get_instance() - if instance is None: - return False + def get_dd_env(self): + return self._dd_env + def is_known_test(self, test_id: Union[TestId, InternalTestId]) -> bool: # The assumption that we were not able to fetch unique tests properly if the length is 0 is acceptable # because the current EFD usage would cause the session to be faulty even if the query was successful but # not unique tests exist. In this case, we assume all tests are unique. - if len(instance._known_test_ids) == 0: + if len(self._known_test_ids) == 0: return True - return test_id in instance._known_test_ids + return test_id in self._known_test_ids - @classmethod - def get_test_properties(cls, test_id: Union[TestId, InternalTestId]) -> Optional[TestProperties]: - instance = cls.get_instance() - if instance is None: - return None - - return instance._test_properties.get(test_id) + def get_test_properties(self, test_id: Union[TestId, InternalTestId]) -> Optional[TestProperties]: + return self._test_properties.get(test_id) def _requires_civisibility_enabled(func: Callable) -> Callable: @@ -1036,13 +943,23 @@ def wrapper(*args, **kwargs) -> Any: @_requires_civisibility_enabled -def _on_discover_session(discover_args: TestSession.DiscoverArgs) -> None: +def on_discover_session( + test_command, + reject_duplicates, + test_framework, + test_framework_version, + session_operation_name, + module_operation_name, + suite_operation_name, + test_operation_name, + root_dir, +) -> None: log.debug("Handling session discovery") # _requires_civisibility_enabled prevents us from getting here, but this makes type checkers happy - tracer = CIVisibility.get_tracer() - test_service = CIVisibility.get_service() instance = CIVisibility.get_instance() + test_service = instance.get_service() + tracer = instance.get_tracer() if tracer is None or test_service is None: error_msg = "Tracer or test service is None" @@ -1050,48 +967,48 @@ def _on_discover_session(discover_args: TestSession.DiscoverArgs) -> None: raise CIVisibilityError(error_msg) # If we're not provided a root directory, try and extract it from workspace, defaulting to CWD - workspace_path = discover_args.root_dir or Path(CIVisibility.get_workspace_path() or os.getcwd()) + workspace_path = root_dir or Path(instance.get_workspace_path() or os.getcwd()) # Prevent high cardinality of test framework telemetry tag by matching with known frameworks - test_framework_telemetry_name = _get_test_framework_telemetry_name(discover_args.test_framework) + test_framework_telemetry_name = _get_test_framework_telemetry_name(test_framework) - efd_api_settings = CIVisibility.get_efd_api_settings() - if efd_api_settings is None or not CIVisibility.is_efd_enabled(): + efd_api_settings = instance.get_efd_api_settings() + if efd_api_settings is None or not instance.is_efd_enabled(): efd_api_settings = EarlyFlakeDetectionSettings() - atr_api_settings = CIVisibility.get_atr_api_settings() + atr_api_settings = instance.get_atr_api_settings() if atr_api_settings is None or not CIVisibility.is_atr_enabled(): atr_api_settings = AutoTestRetriesSettings() - test_management_api_settings = CIVisibility.get_test_management_api_settings() - if test_management_api_settings is None or not CIVisibility.is_test_management_enabled(): + test_management_api_settings = instance.get_test_management_api_settings() + if test_management_api_settings is None or not instance.is_test_management_enabled(): test_management_api_settings = TestManagementSettings() session_settings = TestVisibilitySessionSettings( tracer=tracer, test_service=test_service, - test_command=discover_args.test_command, - reject_duplicates=discover_args.reject_duplicates, - test_framework=discover_args.test_framework, + test_command=test_command, + reject_duplicates=reject_duplicates, + test_framework=test_framework, test_framework_metric_name=test_framework_telemetry_name, - test_framework_version=discover_args.test_framework_version, - session_operation_name=discover_args.session_operation_name, - module_operation_name=discover_args.module_operation_name, - suite_operation_name=discover_args.suite_operation_name, - test_operation_name=discover_args.test_operation_name, + test_framework_version=test_framework_version, + session_operation_name=session_operation_name, + module_operation_name=module_operation_name, + suite_operation_name=suite_operation_name, + test_operation_name=test_operation_name, workspace_path=workspace_path, - is_unsupported_ci=CIVisibility.is_unknown_ci(), - itr_enabled=CIVisibility.is_itr_enabled(), - itr_test_skipping_enabled=CIVisibility.test_skipping_enabled(), + is_unsupported_ci=instance.is_unknown_ci(), + itr_enabled=instance.is_itr_enabled(), + itr_test_skipping_enabled=instance.test_skipping_enabled(), itr_test_skipping_level=instance._itr_skipping_level, itr_correlation_id=instance._itr_meta.get(ITR_CORRELATION_ID_TAG_NAME, ""), - coverage_enabled=CIVisibility.should_collect_coverage(), - known_tests_enabled=CIVisibility.is_known_tests_enabled(), + coverage_enabled=instance.should_collect_coverage(), + known_tests_enabled=instance.is_known_tests_enabled(), efd_settings=efd_api_settings, atr_settings=atr_api_settings, test_management_settings=test_management_api_settings, - ci_provider_name=CIVisibility.ci_provider_name_for_telemetry(), - is_auto_injected=CIVisibility.is_auto_injected(), + ci_provider_name=instance.ci_provider_name_for_telemetry(), + is_auto_injected=instance.is_auto_injected(), ) session = TestVisibilitySession( @@ -1099,663 +1016,4 @@ def _on_discover_session(discover_args: TestSession.DiscoverArgs) -> None: ) CIVisibility.add_session(session) - CIVisibility.set_test_session_name(test_command=discover_args.test_command) - - -@_requires_civisibility_enabled -def _on_start_session(distributed_children: bool = False, context: Optional[Context] = None) -> None: - log.debug("Handling start session") - session = CIVisibility.get_session() - session.start(context) - if distributed_children: - session.set_distributed_children() - - -@_requires_civisibility_enabled -def _on_finish_session(finish_args: TestSession.FinishArgs) -> None: - log.debug("Handling finish session") - session = CIVisibility.get_session() - session.finish(finish_args.force_finish_children, finish_args.override_status) - - -@_requires_civisibility_enabled -def _on_session_is_test_skipping_enabled() -> bool: - log.debug("Handling is test skipping enabled") - return CIVisibility.test_skipping_enabled() - - -@_requires_civisibility_enabled -def _on_session_get_workspace_path() -> Optional[Path]: - log.debug("Handling finish for session id %s") - path_str = CIVisibility.get_workspace_path() - return Path(path_str) if path_str is not None else None - - -@_requires_civisibility_enabled -def _on_session_should_collect_coverage() -> bool: - log.debug("Handling should collect coverage") - return CIVisibility.should_collect_coverage() - - -@_requires_civisibility_enabled -def _on_session_get_codeowners() -> Optional[Codeowners]: - log.debug("Getting codeowners") - return CIVisibility.get_codeowners() - - -@_requires_civisibility_enabled -def _on_session_get_tracer() -> Optional[Tracer]: - log.debug("Getting tracer") - return CIVisibility.get_tracer() - - -@_requires_civisibility_enabled -def _on_session_is_atr_enabled() -> bool: - log.debug("Getting Auto Test Retries enabled") - return CIVisibility.is_atr_enabled() - - -@_requires_civisibility_enabled -def _on_session_is_efd_enabled() -> bool: - log.debug("Getting Early Flake Detection enabled") - return CIVisibility.is_efd_enabled() - - -@_requires_civisibility_enabled -def _on_session_set_covered_lines_pct(coverage_pct) -> None: - log.debug("Setting coverage percentage for session to %s", coverage_pct) - CIVisibility.get_session().set_covered_lines_pct(coverage_pct) - - -@_requires_civisibility_enabled -def _on_session_set_library_capabilities(capabilities: LibraryCapabilities) -> None: - log.debug("Setting library capabilities") - CIVisibility.set_library_capabilities(capabilities) - - -@_requires_civisibility_enabled -def _on_session_set_itr_skipped_count(skipped_count: int) -> None: - log.debug("Setting skipped count: %d", skipped_count) - CIVisibility.get_session().set_skipped_count(skipped_count) - - -@_requires_civisibility_enabled -def _on_session_get_path_codeowners(path: Path) -> Optional[List[str]]: - log.debug("Getting codeowners for path %s", path) - codeowners = CIVisibility.get_codeowners() - if codeowners is None: - return None - return codeowners.of(str(path)) - - -def _register_session_handlers() -> None: - log.debug("Registering session handlers") - core.on("test_visibility.session.discover", _on_discover_session) - core.on("test_visibility.session.start", _on_start_session) - core.on("test_visibility.session.finish", _on_finish_session) - core.on("test_visibility.session.get_codeowners", _on_session_get_codeowners, "codeowners") - core.on("test_visibility.session.get_tracer", _on_session_get_tracer, "tracer") - core.on("test_visibility.session.get_path_codeowners", _on_session_get_path_codeowners, "path_codeowners") - core.on("test_visibility.session.get_workspace_path", _on_session_get_workspace_path, "workspace_path") - core.on("test_visibility.session.is_atr_enabled", _on_session_is_atr_enabled, "is_atr_enabled") - core.on("test_visibility.session.is_efd_enabled", _on_session_is_efd_enabled, "is_efd_enabled") - core.on( - "test_visibility.session.should_collect_coverage", - _on_session_should_collect_coverage, - "should_collect_coverage", - ) - core.on( - "test_visibility.session.is_test_skipping_enabled", - _on_session_is_test_skipping_enabled, - "is_test_skipping_enabled", - ) - core.on("test_visibility.session.set_covered_lines_pct", _on_session_set_covered_lines_pct) - core.on("test_visibility.session.set_library_capabilities", _on_session_set_library_capabilities) - core.on("test_visibility.session.set_itr_skipped_count", _on_session_set_itr_skipped_count) - - -@_requires_civisibility_enabled -def _on_discover_module(discover_args: TestModule.DiscoverArgs) -> None: - log.debug("Handling discovery for module %s", discover_args.module_id) - session = CIVisibility.get_session() - - session.add_child( - discover_args.module_id, - TestVisibilityModule( - discover_args.module_id.name, - CIVisibility.get_session_settings(), - discover_args.module_path, - ), - ) - - -@_requires_civisibility_enabled -def _on_start_module(module_id: TestModuleId) -> None: - log.debug("Handling start for module id %s", module_id) - CIVisibility.get_module_by_id(module_id).start() - - -@_requires_civisibility_enabled -def _on_finish_module(finish_args: TestModule.FinishArgs) -> None: - log.debug("Handling finish for module id %s", finish_args.module_id) - CIVisibility.get_module_by_id(finish_args.module_id).finish() - - -def _register_module_handlers() -> None: - log.debug("Registering module handlers") - core.on("test_visibility.module.discover", _on_discover_module) - core.on("test_visibility.module.start", _on_start_module) - core.on("test_visibility.module.finish", _on_finish_module) - - -@_requires_civisibility_enabled -def _on_discover_suite(discover_args: TestSuite.DiscoverArgs) -> None: - log.debug("Handling discovery for suite args %s", discover_args) - module = CIVisibility.get_module_by_id(discover_args.suite_id.parent_id) - - module.add_child( - discover_args.suite_id, - TestVisibilitySuite( - discover_args.suite_id.name, - CIVisibility.get_session_settings(), - discover_args.codeowners, - discover_args.source_file_info, - ), - ) - - -@_requires_civisibility_enabled -def _on_start_suite(suite_id: TestSuiteId) -> None: - log.debug("Handling start for suite id %s", suite_id) - CIVisibility.get_suite_by_id(suite_id).start() - - -@_requires_civisibility_enabled -def _on_finish_suite(finish_args: TestSuite.FinishArgs) -> None: - log.debug("Handling finish for suite id %s", finish_args.suite_id) - CIVisibility.get_suite_by_id(finish_args.suite_id).finish( - finish_args.force_finish_children, finish_args.override_status - ) - - -def _register_suite_handlers() -> None: - log.debug("Registering suite handlers") - core.on("test_visibility.suite.discover", _on_discover_suite) - core.on("test_visibility.suite.start", _on_start_suite) - core.on("test_visibility.suite.finish", _on_finish_suite) - - -@_requires_civisibility_enabled -def _on_discover_test(discover_args: Test.DiscoverArgs) -> None: - log.debug("Handling discovery for test %s", discover_args.test_id) - suite = CIVisibility.get_suite_by_id(discover_args.test_id.parent_id) - - # New tests are currently only considered for EFD: - # - if known tests were fetched properly (enforced by is_known_test) - # - if they have no parameters - if CIVisibility.is_known_tests_enabled() and discover_args.test_id.parameters is None: - is_new = not CIVisibility.is_known_test(discover_args.test_id) - else: - is_new = False - - test_properties = None - if CIVisibility.is_test_management_enabled(): - test_properties = CIVisibility.get_test_properties(discover_args.test_id) - - if not test_properties: - test_properties = TestProperties() - - suite.add_child( - discover_args.test_id, - TestVisibilityTest( - discover_args.test_id.name, - CIVisibility.get_session_settings(), - parameters=discover_args.test_id.parameters, - codeowners=discover_args.codeowners, - source_file_info=discover_args.source_file_info, - resource=discover_args.resource, - is_new=is_new, - is_quarantined=test_properties.quarantined, - is_disabled=test_properties.disabled, - is_attempt_to_fix=test_properties.attempt_to_fix, - ), - ) - - -@_requires_civisibility_enabled -def _on_is_new_test(test_id: Union[TestId, InternalTestId]) -> bool: - log.debug("Handling is new test for test %s", test_id) - return CIVisibility.get_test_by_id(test_id).is_new() - - -@_requires_civisibility_enabled -def _on_is_quarantined_test(test_id: Union[TestId, InternalTestId]) -> bool: - log.debug("Handling is quarantined test for test %s", test_id) - return CIVisibility.get_test_by_id(test_id).is_quarantined() - - -@_requires_civisibility_enabled -def _on_is_disabled_test(test_id: Union[TestId, InternalTestId]) -> bool: - log.debug("Handling is disabled test for test %s", test_id) - return CIVisibility.get_test_by_id(test_id).is_disabled() - - -@_requires_civisibility_enabled -def _on_is_attempt_to_fix(test_id: Union[TestId, InternalTestId]) -> bool: - log.debug("Handling is attempt to fix for test %s", test_id) - return CIVisibility.get_test_by_id(test_id).is_attempt_to_fix() - - -@_requires_civisibility_enabled -def _on_start_test(test_id: TestId) -> None: - log.debug("Handling start for test id %s", test_id) - CIVisibility.get_test_by_id(test_id).start() - - -@_requires_civisibility_enabled -def _on_finish_test(finish_args: Test.FinishArgs) -> None: - log.debug("Handling finish for test id %s, with status %s", finish_args.test_id, finish_args.status) - CIVisibility.get_test_by_id(finish_args.test_id).finish_test( - finish_args.status, finish_args.skip_reason, finish_args.exc_info - ) - - -@_requires_civisibility_enabled -def _on_set_test_parameters(item_id: TestId, parameters: str) -> None: - log.debug("Handling set parameters for test id %s, parameters %s", item_id, parameters) - CIVisibility.get_test_by_id(item_id).set_parameters(parameters) - - -@_requires_civisibility_enabled -def _on_set_benchmark_data(set_benchmark_data_args: BenchmarkTestMixin.SetBenchmarkDataArgs) -> None: - item_id = set_benchmark_data_args.test_id - data = set_benchmark_data_args.benchmark_data - is_benchmark = set_benchmark_data_args.is_benchmark - log.debug("Handling set benchmark data for test id %s, data %s, is_benchmark %s", item_id, data, is_benchmark) - CIVisibility.get_test_by_id(item_id).set_benchmark_data(data, is_benchmark) - - -@_requires_civisibility_enabled -def _on_test_overwrite_attributes(overwrite_attribute_args: InternalTest.OverwriteAttributesArgs) -> None: - item_id = overwrite_attribute_args.test_id - name = overwrite_attribute_args.name - suite_name = overwrite_attribute_args.suite_name - parameters = overwrite_attribute_args.parameters - codeowners = overwrite_attribute_args.codeowners - - log.debug("Handling overwrite attributes: %s", overwrite_attribute_args) - CIVisibility.get_test_by_id(item_id).overwrite_attributes(name, suite_name, parameters, codeowners) - - -def _register_test_handlers(): - log.debug("Registering test handlers") - core.on("test_visibility.test.discover", _on_discover_test) - core.on("test_visibility.test.is_new", _on_is_new_test, "is_new") - core.on("test_visibility.test.is_quarantined", _on_is_quarantined_test, "is_quarantined") - core.on("test_visibility.test.is_disabled", _on_is_disabled_test, "is_disabled") - core.on("test_visibility.test.is_attempt_to_fix", _on_is_attempt_to_fix, "is_attempt_to_fix") - core.on("test_visibility.test.start", _on_start_test) - core.on("test_visibility.test.finish", _on_finish_test) - core.on("test_visibility.test.set_parameters", _on_set_test_parameters) - core.on("test_visibility.test.set_benchmark_data", _on_set_benchmark_data) - core.on("test_visibility.test.overwrite_attributes", _on_test_overwrite_attributes) - - -@_requires_civisibility_enabled -def _on_item_get_span(item_id: TestVisibilityItemId) -> Optional[Span]: - log.debug("Handing get_span for item %s", item_id) - item = CIVisibility.get_item_by_id(item_id) - return item.get_span() - - -@_requires_civisibility_enabled -def _on_item_is_finished(item_id: TestVisibilityItemId) -> bool: - log.debug("Handling is finished for item %s", item_id) - return CIVisibility.get_item_by_id(item_id).is_finished() - - -@_requires_civisibility_enabled -def _on_item_stash_set(item_id: TestVisibilityItemId, key: str, value: object) -> None: - log.debug("Handling stash set for item %s, key %s, value %s", item_id, key, value) - CIVisibility.get_item_by_id(item_id).stash_set(key, value) - - -@_requires_civisibility_enabled -def _on_item_stash_get(item_id: TestVisibilityItemId, key: str) -> Optional[object]: - log.debug("Handling stash get for item %s, key %s", item_id, key) - return CIVisibility.get_item_by_id(item_id).stash_get(key) - - -@_requires_civisibility_enabled -def _on_item_stash_delete(item_id: TestVisibilityItemId, key: str) -> None: - log.debug("Handling stash delete for item %s, key %s", item_id, key) - CIVisibility.get_item_by_id(item_id).stash_delete(key) - - -def _register_item_handlers() -> None: - log.debug("Registering item handlers") - core.on("test_visibility.item.get_span", _on_item_get_span, "span") - core.on("test_visibility.item.is_finished", _on_item_is_finished, "is_finished") - core.on("test_visibility.item.stash_set", _on_item_stash_set) - core.on("test_visibility.item.stash_get", _on_item_stash_get, "stash_value") - core.on("test_visibility.item.stash_delete", _on_item_stash_delete) - - -@_requires_civisibility_enabled -def _on_get_coverage_data(item_id: Union[TestSuiteId, TestId]) -> Optional[Dict[Path, CoverageLines]]: - log.debug("Handling get coverage data for item %s", item_id) - return CIVisibility.get_item_by_id(item_id).get_coverage_data() - - -@_requires_civisibility_enabled -def _on_add_coverage_data(add_coverage_args: ITRMixin.AddCoverageArgs) -> None: - """Adds coverage data to an item, merging with existing coverage data if necessary""" - item_id = add_coverage_args.item_id - coverage_data = add_coverage_args.coverage_data - - log.debug("Handling add coverage data for item id %s", item_id) - - if not isinstance(item_id, (TestSuiteId, TestId)): - log.warning("Coverage data can only be added to suites and tests, not %s", type(item_id)) - return - - CIVisibility.get_item_by_id(item_id).add_coverage_data(coverage_data) - - -def _register_coverage_handlers() -> None: - log.debug("Registering coverage handlers") - core.on("test_visibility.item.get_coverage_data", _on_get_coverage_data, "coverage_data") - core.on("test_visibility.item.add_coverage_data", _on_add_coverage_data) - - -@_requires_civisibility_enabled -def _on_get_tag(get_tag_args: TestBase.GetTagArgs) -> Any: - item_id = get_tag_args.item_id - key = get_tag_args.name - log.debug("Handling get tag for item id %s, key %s", item_id, key) - return CIVisibility.get_item_by_id(item_id).get_tag(key) - - -@_requires_civisibility_enabled -def _on_set_tag(set_tag_args: TestBase.SetTagArgs) -> None: - item_id = set_tag_args.item_id - key = set_tag_args.name - value = set_tag_args.value - log.debug("Handling set tag for item id %s, key %s, value %s", item_id, key, value) - CIVisibility.get_item_by_id(item_id).set_tag(key, value) - - -@_requires_civisibility_enabled -def _on_set_tags(set_tags_args: TestBase.SetTagsArgs) -> None: - item_id = set_tags_args.item_id - tags = set_tags_args.tags - log.debug("Handling set tags for item id %s, tags %s", item_id, tags) - CIVisibility.get_item_by_id(item_id).set_tags(tags) - - -@_requires_civisibility_enabled -def _on_delete_tag(delete_tag_args: TestBase.DeleteTagArgs) -> None: - item_id = delete_tag_args.item_id - key = delete_tag_args.name - log.debug("Handling delete tag for item id %s, key %s", item_id, key) - CIVisibility.get_item_by_id(item_id).delete_tag(key) - - -@_requires_civisibility_enabled -def _on_delete_tags(delete_tags_args: TestBase.DeleteTagsArgs) -> None: - item_id = delete_tags_args.item_id - keys = delete_tags_args.names - log.debug("Handling delete tags for item id %s, keys %s", item_id, keys) - CIVisibility.get_item_by_id(item_id).delete_tags(keys) - - -def _register_tag_handlers() -> None: - log.debug("Registering tag handlers") - core.on("test_visibility.item.get_tag", _on_get_tag, "tag_value") - core.on("test_visibility.item.set_tag", _on_set_tag) - core.on("test_visibility.item.set_tags", _on_set_tags) - core.on("test_visibility.item.delete_tag", _on_delete_tag) - core.on("test_visibility.item.delete_tags", _on_delete_tags) - - -@_requires_civisibility_enabled -def _on_itr_finish_item_skipped(item_id: Union[TestSuiteId, TestId]) -> None: - log.debug("Handling finish ITR skipped for item id %s", item_id) - if not isinstance(item_id, (TestSuiteId, TestId)): - log.warning("Only suites or tests can be skipped, not %s", type(item_id)) - return - CIVisibility.get_item_by_id(item_id).finish_itr_skipped() - - -@_requires_civisibility_enabled -def _on_itr_mark_unskippable(item_id: Union[TestSuiteId, TestId]) -> None: - log.debug("Handling marking %s unskippable", item_id) - CIVisibility.get_item_by_id(item_id).mark_itr_unskippable() - - -@_requires_civisibility_enabled -def _on_itr_mark_forced_run(item_id: Union[TestSuiteId, TestId]) -> None: - log.debug("Handling marking %s as forced run", item_id) - CIVisibility.get_item_by_id(item_id).mark_itr_forced_run() - - -@_requires_civisibility_enabled -def _on_itr_was_forced_run(item_id: TestVisibilityItemId) -> bool: - log.debug("Handling marking %s as forced run", item_id) - return CIVisibility.get_item_by_id(item_id).was_itr_forced_run() - - -@_requires_civisibility_enabled -def _on_itr_is_item_skippable(item_id: Union[TestSuiteId, TestId]) -> bool: - """Skippable items are fetched as part CIVisibility.enable(), so they are assumed to be available.""" - log.debug("Handling is item skippable for item id %s", item_id) - - if not isinstance(item_id, (TestSuiteId, TestId)): - log.warning("Only suites or tests can be skippable, not %s", type(item_id)) - return False - - if not CIVisibility.test_skipping_enabled(): - log.debug("Test skipping is not enabled") - return False - - return CIVisibility.is_item_itr_skippable(item_id) - - -@_requires_civisibility_enabled -def _on_itr_is_item_unskippable(item_id: Union[TestSuiteId, TestId]) -> bool: - log.debug("Handling is item unskippable for %s", item_id) - if not isinstance(item_id, (TestSuiteId, TestId)): - raise CIVisibilityError("Only suites or tests can be unskippable") - return CIVisibility.get_item_by_id(item_id).is_itr_unskippable() - - -@_requires_civisibility_enabled -def _on_itr_was_item_skipped(item_id: Union[TestSuiteId, TestId]) -> bool: - log.debug("Handling was item skipped for %s", item_id) - return CIVisibility.get_item_by_id(item_id).is_itr_skipped() - - -def _register_itr_handlers() -> None: - log.debug("Registering ITR-related handlers") - core.on("test_visibility.itr.finish_skipped_by_itr", _on_itr_finish_item_skipped) - core.on("test_visibility.itr.is_item_skippable", _on_itr_is_item_skippable, "is_item_skippable") - core.on("test_visibility.itr.was_item_skipped", _on_itr_was_item_skipped, "was_item_skipped") - - core.on("test_visibility.itr.is_item_unskippable", _on_itr_is_item_unskippable, "is_item_unskippable") - core.on("test_visibility.itr.mark_forced_run", _on_itr_mark_forced_run) - core.on("test_visibility.itr.mark_unskippable", _on_itr_mark_unskippable) - core.on("test_visibility.itr.was_forced_run", _on_itr_was_forced_run, "was_forced_run") - - -# -# EFD handlers -# - - -@_requires_civisibility_enabled -def _on_efd_is_enabled() -> bool: - return CIVisibility.get_session().efd_is_enabled() - - -@_requires_civisibility_enabled -def _on_efd_session_is_faulty() -> bool: - return CIVisibility.get_session().efd_is_faulty_session() - - -@_requires_civisibility_enabled -def _on_efd_session_has_efd_failed_tests() -> bool: - return CIVisibility.get_session().efd_has_failed_tests() - - -@_requires_civisibility_enabled -def _on_efd_should_retry_test(test_id: InternalTestId) -> bool: - return CIVisibility.get_test_by_id(test_id).efd_should_retry() - - -@_requires_civisibility_enabled -def _on_efd_add_retry(test_id: InternalTestId, retry_number: int) -> Optional[int]: - return CIVisibility.get_test_by_id(test_id).efd_add_retry(retry_number) - - -@_requires_civisibility_enabled -def _on_efd_start_retry(test_id: InternalTestId, retry_number: int) -> None: - CIVisibility.get_test_by_id(test_id).efd_start_retry(retry_number) - - -@_requires_civisibility_enabled -def _on_efd_finish_retry(efd_finish_args: EFDTestMixin.EFDRetryFinishArgs) -> None: - CIVisibility.get_test_by_id(efd_finish_args.test_id).efd_finish_retry( - efd_finish_args.retry_number, efd_finish_args.status, efd_finish_args.exc_info - ) - - -@_requires_civisibility_enabled -def _on_efd_get_final_status(test_id: InternalTestId) -> EFDTestStatus: - return CIVisibility.get_test_by_id(test_id).efd_get_final_status() - - -def _register_efd_handlers() -> None: - log.debug("Registering EFD handlers") - core.on("test_visibility.efd.is_enabled", _on_efd_is_enabled, "is_enabled") - core.on("test_visibility.efd.session_is_faulty", _on_efd_session_is_faulty, "is_faulty_session") - core.on("test_visibility.efd.session_has_failed_tests", _on_efd_session_has_efd_failed_tests, "has_failed_tests") - core.on("test_visibility.efd.should_retry_test", _on_efd_should_retry_test, "should_retry_test") - core.on("test_visibility.efd.add_retry", _on_efd_add_retry, "retry_number") - core.on("test_visibility.efd.start_retry", _on_efd_start_retry) - core.on("test_visibility.efd.finish_retry", _on_efd_finish_retry) - core.on("test_visibility.efd.get_final_status", _on_efd_get_final_status, "efd_final_status") - - -@_requires_civisibility_enabled -def _on_atr_is_enabled() -> bool: - return CIVisibility.is_atr_enabled() - - -@_requires_civisibility_enabled -def _on_atr_session_has_failed_tests() -> bool: - return CIVisibility.get_session().atr_has_failed_tests() - - -@_requires_civisibility_enabled -def _on_atr_should_retry_test(item_id: InternalTestId) -> bool: - return CIVisibility.get_test_by_id(item_id).atr_should_retry() - - -@_requires_civisibility_enabled -def _on_atr_add_retry(item_id: InternalTestId, retry_number: int) -> Optional[int]: - return CIVisibility.get_test_by_id(item_id).atr_add_retry(retry_number) - - -@_requires_civisibility_enabled -def _on_atr_start_retry(test_id: InternalTestId, retry_number: int) -> None: - CIVisibility.get_test_by_id(test_id).atr_start_retry(retry_number) - - -@_requires_civisibility_enabled -def _on_atr_finish_retry(atr_finish_args: ATRTestMixin.ATRRetryFinishArgs) -> None: - CIVisibility.get_test_by_id(atr_finish_args.test_id).atr_finish_retry( - atr_finish_args.retry_number, atr_finish_args.status, atr_finish_args.exc_info - ) - - -@_requires_civisibility_enabled -def _on_atr_get_final_status(test_id: InternalTestId) -> TestStatus: - return CIVisibility.get_test_by_id(test_id).atr_get_final_status() - - -def _register_atr_handlers() -> None: - log.debug("Registering ATR handlers") - core.on("test_visibility.atr.is_enabled", _on_atr_is_enabled, "is_enabled") - core.on("test_visibility.atr.session_has_failed_tests", _on_atr_session_has_failed_tests, "has_failed_tests") - core.on("test_visibility.atr.should_retry_test", _on_atr_should_retry_test, "should_retry_test") - core.on("test_visibility.atr.add_retry", _on_atr_add_retry, "retry_number") - core.on("test_visibility.atr.start_retry", _on_atr_start_retry) - core.on("test_visibility.atr.finish_retry", _on_atr_finish_retry) - core.on("test_visibility.atr.get_final_status", _on_atr_get_final_status, "atr_final_status") - - -@_requires_civisibility_enabled -def _on_attempt_to_fix_should_retry_test(item_id: InternalTestId) -> bool: - return CIVisibility.get_test_by_id(item_id).attempt_to_fix_should_retry() - - -@_requires_civisibility_enabled -def _on_attempt_to_fix_add_retry(item_id: InternalTestId, retry_number: int) -> Optional[int]: - return CIVisibility.get_test_by_id(item_id).attempt_to_fix_add_retry(retry_number) - - -@_requires_civisibility_enabled -def _on_attempt_to_fix_start_retry(test_id: InternalTestId, retry_number: int) -> None: - CIVisibility.get_test_by_id(test_id).attempt_to_fix_start_retry(retry_number) - - -@_requires_civisibility_enabled -def _on_attempt_to_fix_finish_retry( - attempt_to_fix_finish_args: AttemptToFixTestMixin.AttemptToFixRetryFinishArgs, -) -> None: - CIVisibility.get_test_by_id(attempt_to_fix_finish_args.test_id).attempt_to_fix_finish_retry( - attempt_to_fix_finish_args.retry_number, attempt_to_fix_finish_args.status, attempt_to_fix_finish_args.exc_info - ) - - -@_requires_civisibility_enabled -def _on_attempt_to_fix_get_final_status(test_id: InternalTestId) -> TestStatus: - return CIVisibility.get_test_by_id(test_id).attempt_to_fix_get_final_status() - - -@_requires_civisibility_enabled -def _on_attempt_to_fix_session_has_failed_tests() -> bool: - return CIVisibility.get_session().attempt_to_fix_has_failed_tests() - - -def _register_attempt_to_fix_handlers() -> None: - log.debug("Registering AttemptToFix handlers") - core.on( - "test_visibility.attempt_to_fix.should_retry_test", _on_attempt_to_fix_should_retry_test, "should_retry_test" - ) - core.on("test_visibility.attempt_to_fix.add_retry", _on_attempt_to_fix_add_retry, "retry_number") - core.on("test_visibility.attempt_to_fix.start_retry", _on_attempt_to_fix_start_retry) - core.on("test_visibility.attempt_to_fix.finish_retry", _on_attempt_to_fix_finish_retry) - core.on( - "test_visibility.attempt_to_fix.session_has_failed_tests", - _on_attempt_to_fix_session_has_failed_tests, - "has_failed_tests", - ) - core.on( - "test_visibility.attempt_to_fix.get_final_status", - _on_attempt_to_fix_get_final_status, - "attempt_to_fix_final_status", - ) - - -_register_session_handlers() -_register_module_handlers() -_register_suite_handlers() -_register_test_handlers() -_register_item_handlers() -_register_tag_handlers() -_register_coverage_handlers() -_register_itr_handlers() -_register_efd_handlers() -_register_atr_handlers() -_register_attempt_to_fix_handlers() + instance.set_test_session_name(test_command=test_command) diff --git a/ddtrace/internal/ci_visibility/service_registry.py b/ddtrace/internal/ci_visibility/service_registry.py new file mode 100644 index 00000000000..dcf64c5be42 --- /dev/null +++ b/ddtrace/internal/ci_visibility/service_registry.py @@ -0,0 +1,27 @@ +"""Service registry to avoid circular imports in CI Visibility system.""" +import typing as t + + +if t.TYPE_CHECKING: + from ddtrace.internal.ci_visibility.recorder import CIVisibility + +CI_VISIBILITY_INSTANCE = None + + +def register_ci_visibility_instance(service: "CIVisibility") -> None: + """Register the CIVisibility service instance.""" + global CI_VISIBILITY_INSTANCE + CI_VISIBILITY_INSTANCE = service + + +def unregister_ci_visibility_instance() -> None: + """Unregister the current service instance.""" + global CI_VISIBILITY_INSTANCE + CI_VISIBILITY_INSTANCE = None + + +def require_ci_visibility_service() -> "CIVisibility": + """Get the CIVisibility service, raising if not available.""" + if not CI_VISIBILITY_INSTANCE: + raise RuntimeError("CIVisibility service not registered") + return CI_VISIBILITY_INSTANCE diff --git a/ddtrace/internal/test_visibility/_atr_mixins.py b/ddtrace/internal/test_visibility/_atr_mixins.py index 71737bf09e3..1892fc3eee4 100644 --- a/ddtrace/internal/test_visibility/_atr_mixins.py +++ b/ddtrace/internal/test_visibility/_atr_mixins.py @@ -2,8 +2,9 @@ import typing as t from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -import ddtrace.ext.test_visibility.api as ext_api -from ddtrace.internal import core +from ddtrace.ext.test_visibility.status import TestExcInfo +from ddtrace.ext.test_visibility.status import TestStatus +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.logger import get_logger from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId @@ -22,88 +23,53 @@ class ATRSessionMixin: @staticmethod @_catch_and_log_exceptions def atr_is_enabled() -> bool: - log.debug("Checking if Auto Test Retries is enabled for session") - is_enabled = core.dispatch_with_results("test_visibility.atr.is_enabled").is_enabled.value - log.debug("Auto Test Retries enabled: %s", is_enabled) - return is_enabled + log.debug("Checking if ATR is enabled") + return require_ci_visibility_service().is_atr_enabled() @staticmethod @_catch_and_log_exceptions def atr_has_failed_tests() -> bool: - log.debug("Checking if session has failed tests for Auto Test Retries") - has_failed_tests = core.dispatch_with_results( - "test_visibility.atr.session_has_failed_tests" - ).has_failed_tests.value - log.debug("Session has ATR failed tests: %s", has_failed_tests) - return has_failed_tests + log.debug("Checking if ATR session has failed tests") + return require_ci_visibility_service().get_session().atr_has_failed_tests() class ATRTestMixin: @staticmethod @_catch_and_log_exceptions def atr_should_retry(item_id: InternalTestId) -> bool: - log.debug("Checking if item %s should be retried for Auto Test Retries", item_id) - should_retry_test = core.dispatch_with_results( - "test_visibility.atr.should_retry_test", (item_id,) - ).should_retry_test.value - log.debug("Item %s should be retried: %s", item_id, should_retry_test) - return should_retry_test + log.debug("Checking if test %s should be retried by ATR", item_id) + return require_ci_visibility_service().get_test_by_id(item_id).atr_should_retry() @staticmethod @_catch_and_log_exceptions - def atr_add_retry(item_id: InternalTestId, start_immediately: bool = False) -> int: - log.debug("Adding Auto Test Retries retry for item %s", item_id) - retry_number = core.dispatch_with_results( - "test_visibility.atr.add_retry", (item_id, start_immediately) - ).retry_number.value - log.debug("Added Auto Test Retries retry %s for item %s", retry_number, item_id) + def atr_add_retry(item_id: InternalTestId, start_immediately: bool = False) -> t.Optional[int]: + retry_number = require_ci_visibility_service().get_test_by_id(item_id).atr_add_retry(start_immediately) + log.debug("Adding ATR retry %s for test %s", retry_number, item_id) return retry_number @staticmethod @_catch_and_log_exceptions - def atr_start_retry(item_id: InternalTestId) -> None: - log.debug("Starting retry for item %s", item_id) - core.dispatch("test_visibility.atr.start_retry", (item_id,)) - - class ATRRetryFinishArgs(t.NamedTuple): - test_id: InternalTestId - retry_number: int - status: ext_api.TestStatus - skip_reason: t.Optional[str] = None - exc_info: t.Optional[ext_api.TestExcInfo] = None + def atr_start_retry(item_id: InternalTestId, retry_number: int) -> None: + log.debug("Starting ATR retry %s for test %s", retry_number, item_id) + require_ci_visibility_service().get_test_by_id(item_id).atr_start_retry(retry_number) @staticmethod @_catch_and_log_exceptions def atr_finish_retry( item_id: InternalTestId, retry_number: int, - status: ext_api.TestStatus, + status: TestStatus, skip_reason: t.Optional[str] = None, - exc_info: t.Optional[ext_api.TestExcInfo] = None, - ): - log.debug( - "Finishing ATR test retry %s for item %s, status: %s, skip_reason: %s, exc_info: %s", - retry_number, - item_id, - status, - skip_reason, - exc_info, - ) - core.dispatch( - "test_visibility.atr.finish_retry", - ( - ATRTestMixin.ATRRetryFinishArgs( - item_id, retry_number, status, skip_reason=skip_reason, exc_info=exc_info - ), - ), + exc_info: t.Optional[TestExcInfo] = None, + ) -> None: + log.debug("Finishing ATR retry %s for test %s", retry_number, item_id) + require_ci_visibility_service().get_test_by_id(item_id).atr_finish_retry( + retry_number=retry_number, status=status, skip_reason=skip_reason, exc_info=exc_info ) @staticmethod @_catch_and_log_exceptions - def atr_get_final_status(item_id: InternalTestId) -> ext_api.TestStatus: - log.debug("Getting final ATR status for item %s", item_id) - atr_final_status = core.dispatch_with_results( - "test_visibility.atr.get_final_status", (item_id,) - ).atr_final_status.value - log.debug("Final ATR status for item %s: %s", item_id, atr_final_status) - return atr_final_status + def atr_get_final_status(test_id: InternalTestId) -> TestStatus: + log.debug("Getting ATR final status for test %s", test_id) + + return require_ci_visibility_service().get_test_by_id(test_id).atr_get_final_status() diff --git a/ddtrace/internal/test_visibility/_attempt_to_fix_mixins.py b/ddtrace/internal/test_visibility/_attempt_to_fix_mixins.py index a2d0dfec9c5..e2242a04dfb 100644 --- a/ddtrace/internal/test_visibility/_attempt_to_fix_mixins.py +++ b/ddtrace/internal/test_visibility/_attempt_to_fix_mixins.py @@ -1,8 +1,9 @@ import typing as t from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -import ddtrace.ext.test_visibility.api as ext_api -from ddtrace.internal import core +from ddtrace.ext.test_visibility.status import TestExcInfo +from ddtrace.ext.test_visibility.status import TestStatus +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.logger import get_logger from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId @@ -14,80 +15,49 @@ class AttemptToFixSessionMixin: @staticmethod @_catch_and_log_exceptions def attempt_to_fix_has_failed_tests() -> bool: - log.debug("Checking if session has failed tests for Attempt-to-Fix") - has_failed_tests = core.dispatch_with_results( - "test_visibility.attempt_to_fix.session_has_failed_tests" - ).has_failed_tests.value - log.debug("Session has Attempt-to-Fix failed tests: %s", has_failed_tests) - return has_failed_tests + log.debug("Checking if attempt to fix session has failed tests") + + return require_ci_visibility_service().get_session().attempt_to_fix_has_failed_tests() class AttemptToFixTestMixin: @staticmethod @_catch_and_log_exceptions def attempt_to_fix_should_retry(item_id: InternalTestId) -> bool: - log.debug("Checking if item %s should be retried for Attempt-to-Fix", item_id) - should_retry_test = core.dispatch_with_results( - "test_visibility.attempt_to_fix.should_retry_test", (item_id,) - ).should_retry_test.value - log.debug("Item %s should be retried: %s", item_id, should_retry_test) - return should_retry_test + log.debug("Checking if test %s should be retried by attempt to fix", item_id) + return require_ci_visibility_service().get_test_by_id(item_id).attempt_to_fix_should_retry() @staticmethod @_catch_and_log_exceptions - def attempt_to_fix_add_retry(item_id: InternalTestId, start_immediately: bool = False) -> int: - log.debug("Adding Attempt-to-Fix retry for item %s", item_id) - retry_number = core.dispatch_with_results( - "test_visibility.attempt_to_fix.add_retry", (item_id, start_immediately) - ).retry_number.value - log.debug("Added Auto Test Retries retry %s for item %s", retry_number, item_id) + def attempt_to_fix_add_retry(item_id: InternalTestId, start_immediately: bool = False) -> t.Optional[int]: + retry_number = ( + require_ci_visibility_service().get_test_by_id(item_id).attempt_to_fix_add_retry(start_immediately) + ) + log.debug("Adding attempt to fix retry %s for test %s", retry_number, item_id) return retry_number @staticmethod @_catch_and_log_exceptions - def attempt_to_fix_start_retry(item_id: InternalTestId) -> None: - log.debug("Starting retry for item %s", item_id) - core.dispatch("test_visibility.attempt_to_fix.start_retry", (item_id,)) - - class AttemptToFixRetryFinishArgs(t.NamedTuple): - test_id: InternalTestId - retry_number: int - status: ext_api.TestStatus - skip_reason: t.Optional[str] = None - exc_info: t.Optional[ext_api.TestExcInfo] = None + def attempt_to_fix_start_retry(item_id: InternalTestId, retry_number: int) -> None: + log.debug("Starting attempt to fix retry %s for test %s", retry_number, item_id) + require_ci_visibility_service().get_test_by_id(item_id).attempt_to_fix_start_retry(retry_number) @staticmethod @_catch_and_log_exceptions def attempt_to_fix_finish_retry( item_id: InternalTestId, retry_number: int, - status: ext_api.TestStatus, + status: TestStatus, skip_reason: t.Optional[str] = None, - exc_info: t.Optional[ext_api.TestExcInfo] = None, - ): - log.debug( - "Finishing Attempt-to-Fix test retry %s for item %s, status: %s, skip_reason: %s, exc_info: %s", - retry_number, - item_id, - status, - skip_reason, - exc_info, - ) - core.dispatch( - "test_visibility.attempt_to_fix.finish_retry", - ( - AttemptToFixTestMixin.AttemptToFixRetryFinishArgs( - item_id, retry_number, status, skip_reason=skip_reason, exc_info=exc_info - ), - ), + exc_info: t.Optional[TestExcInfo] = None, + ) -> None: + log.debug("Finishing attempt to fix retry %s for test %s", retry_number, item_id) + require_ci_visibility_service().get_test_by_id(item_id).attempt_to_fix_finish_retry( + retry_number=retry_number, status=status, skip_reason=skip_reason, exc_info=exc_info ) @staticmethod @_catch_and_log_exceptions - def attempt_to_fix_get_final_status(item_id: InternalTestId) -> ext_api.TestStatus: - log.debug("Getting final Attempt-to-Fix status for item %s", item_id) - attempt_to_fix_final_status = core.dispatch_with_results( - "test_visibility.attempt_to_fix.get_final_status", (item_id,) - ).attempt_to_fix_final_status.value - log.debug("Final Attempt-to-Fix status for item %s: %s", item_id, attempt_to_fix_final_status) - return attempt_to_fix_final_status + def attempt_to_fix_get_final_status(item_id: InternalTestId) -> TestStatus: + log.debug("Getting attempt to fix final status for test %s", item_id) + return require_ci_visibility_service().get_test_by_id(item_id).attempt_to_fix_get_final_status() diff --git a/ddtrace/internal/test_visibility/_benchmark_mixin.py b/ddtrace/internal/test_visibility/_benchmark_mixin.py index c41d45b10a5..74a341dee25 100644 --- a/ddtrace/internal/test_visibility/_benchmark_mixin.py +++ b/ddtrace/internal/test_visibility/_benchmark_mixin.py @@ -1,7 +1,7 @@ import typing as t from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -from ddtrace.internal import core +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.logger import get_logger from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId @@ -32,11 +32,6 @@ class BenchmarkDurationData(t.NamedTuple): class BenchmarkTestMixin: - class SetBenchmarkDataArgs(t.NamedTuple): - test_id: InternalTestId - benchmark_data: t.Optional[BenchmarkDurationData] - is_benchmark: bool = True - @classmethod @_catch_and_log_exceptions def set_benchmark_data( @@ -46,10 +41,7 @@ def set_benchmark_data( is_benchmark: bool = True, ): log.debug("Setting benchmark data for test %s: %s", item_id, benchmark_data) - core.dispatch( - "test_visibility.test.set_benchmark_data", - (BenchmarkTestMixin.SetBenchmarkDataArgs(item_id, benchmark_data, is_benchmark),), - ) + require_ci_visibility_service().get_test_by_id(item_id).set_benchmark_data(benchmark_data, is_benchmark) BENCHMARK_TAG_MAP = { diff --git a/ddtrace/internal/test_visibility/_efd_mixins.py b/ddtrace/internal/test_visibility/_efd_mixins.py index ac49c5641c3..b98742e5f5d 100644 --- a/ddtrace/internal/test_visibility/_efd_mixins.py +++ b/ddtrace/internal/test_visibility/_efd_mixins.py @@ -2,8 +2,9 @@ import typing as t from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -import ddtrace.ext.test_visibility.api as ext_api -from ddtrace.internal import core +from ddtrace.ext.test_visibility.status import TestExcInfo +from ddtrace.ext.test_visibility.status import TestStatus +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.logger import get_logger from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId @@ -22,76 +23,55 @@ class EFDSessionMixin: @staticmethod @_catch_and_log_exceptions def efd_enabled() -> bool: - log.debug("Checking if Early Flake Detection is enabled for the session") - is_enabled = core.dispatch_with_results("test_visibility.efd.is_enabled").is_enabled.value - log.debug("Early Flake Detection enabled: %s", is_enabled) - return is_enabled + log.debug("Checking if EFD is enabled") + + return require_ci_visibility_service().get_session().efd_is_enabled() @staticmethod @_catch_and_log_exceptions def efd_is_faulty_session() -> bool: - log.debug("Checking if session is faulty for Early Flake Detection") - is_faulty_session = core.dispatch_with_results("test_visibility.efd.session_is_faulty").is_faulty_session.value - log.debug("Session faulty: %s", is_faulty_session) - return is_faulty_session + log.debug("Checking if EFD session is faulty") + + return require_ci_visibility_service().get_session().efd_is_faulty_session() @staticmethod @_catch_and_log_exceptions def efd_has_failed_tests() -> bool: - log.debug("Checking if session has failed tests for Early Flake Detection") - has_failed_tests = core.dispatch_with_results( - "test_visibility.efd.session_has_failed_tests" - ).has_failed_tests.value - log.debug("Session has EFD failed tests: %s", has_failed_tests) - return has_failed_tests + log.debug("Checking if EFD session has failed tests") + + return require_ci_visibility_service().get_session().efd_has_failed_tests() class EFDTestMixin: @staticmethod @_catch_and_log_exceptions def efd_should_retry(item_id: InternalTestId) -> bool: - """Checks whether a test should be retried + log.debug("Checking if test %s should be retried by EFD", item_id) - This does not differentiate between the feature being disabled, the test retries having completed, or the - session maximums having been reached. - """ - log.debug("Checking if item %s should be retried for Early Flake Detection", item_id) - should_retry_test = core.dispatch_with_results( - "test_visibility.efd.should_retry_test", (item_id,) - ).should_retry_test.value - return should_retry_test + return require_ci_visibility_service().get_test_by_id(item_id).efd_should_retry() @staticmethod @_catch_and_log_exceptions def efd_add_retry(item_id: InternalTestId, start_immediately: bool = False) -> t.Optional[int]: - log.debug("Adding Early Flake Detection retry for item %s", item_id) - retry_number = core.dispatch_with_results( - "test_visibility.efd.add_retry", (item_id, start_immediately) - ).retry_number.value - log.debug("Added Early Flake Detection retry number %s for item %s", retry_number, item_id) + retry_number = require_ci_visibility_service().get_test_by_id(item_id).efd_add_retry(start_immediately) + log.debug("Adding EFD retry %s for test %s", retry_number, item_id) return retry_number @staticmethod @_catch_and_log_exceptions - def efd_start_retry(item_id: InternalTestId, retry_number: int): - log.debug("Starting Early Flake Detection retry number %s for item %s", retry_number, item_id) - core.dispatch("test_visibility.efd.start_retry", (item_id, retry_number)) + def efd_start_retry(item_id: InternalTestId, retry_number: int) -> None: + log.debug("Starting EFD retry %s for test %s", retry_number, item_id) - class EFDRetryFinishArgs(t.NamedTuple): - test_id: InternalTestId - retry_number: int - status: ext_api.TestStatus - skip_reason: t.Optional[str] = None - exc_info: t.Optional[ext_api.TestExcInfo] = None + require_ci_visibility_service().get_test_by_id(item_id).efd_start_retry(retry_number) @staticmethod @_catch_and_log_exceptions def efd_finish_retry( item_id: InternalTestId, retry_number: int, - status: ext_api.TestStatus, + status: TestStatus, skip_reason: t.Optional[str] = None, - exc_info: t.Optional[ext_api.TestExcInfo] = None, + exc_info: t.Optional[TestExcInfo] = None, ): log.debug( "Finishing EFD test retry %s for item %s, status: %s, skip_reason: %s, exc_info: %s", @@ -101,20 +81,13 @@ def efd_finish_retry( skip_reason, exc_info, ) - core.dispatch( - "test_visibility.efd.finish_retry", - ( - EFDTestMixin.EFDRetryFinishArgs( - item_id, retry_number, status, skip_reason=skip_reason, exc_info=exc_info - ), - ), + require_ci_visibility_service().get_test_by_id(item_id).efd_finish_retry( + retry_number=retry_number, status=status, skip_reason=skip_reason, exc_info=exc_info ) @staticmethod @_catch_and_log_exceptions - def efd_get_final_status(item_id) -> EFDTestStatus: - log.debug("Getting final status for item %s in Early Flake Detection", item_id) - final_status = core.dispatch_with_results( - "test_visibility.efd.get_final_status", (item_id,) - ).efd_final_status.value - return final_status + def efd_get_final_status(item_id: InternalTestId) -> EFDTestStatus: + log.debug("Getting EFD final status for test %s", item_id) + + return require_ci_visibility_service().get_test_by_id(item_id).efd_get_final_status() diff --git a/ddtrace/internal/test_visibility/_itr_mixins.py b/ddtrace/internal/test_visibility/_itr_mixins.py index 60572885a21..ce66272af4a 100644 --- a/ddtrace/internal/test_visibility/_itr_mixins.py +++ b/ddtrace/internal/test_visibility/_itr_mixins.py @@ -3,7 +3,8 @@ from ddtrace.ext.test_visibility import api as ext_api from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -from ddtrace.internal import core +from ddtrace.internal.ci_visibility.errors import CIVisibilityError +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.logger import get_logger from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId from ddtrace.internal.test_visibility.coverage_lines import CoverageLines @@ -19,77 +20,76 @@ class ITRMixin: @_catch_and_log_exceptions def mark_itr_skipped(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]): log.debug("Marking item %s as skipped by ITR", item_id) - core.dispatch("test_visibility.itr.finish_skipped_by_itr", (item_id,)) + + if not isinstance(item_id, (ext_api.TestSuiteId, InternalTestId)): + log.warning("Only suites or tests can be skipped, not %s", type(item_id)) + return + require_ci_visibility_service().get_item_by_id(item_id).finish_itr_skipped() @staticmethod @_catch_and_log_exceptions def mark_itr_unskippable(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]): log.debug("Marking item %s as unskippable by ITR", item_id) - core.dispatch("test_visibility.itr.mark_unskippable", (item_id,)) + + require_ci_visibility_service().get_item_by_id(item_id).mark_itr_unskippable() @staticmethod @_catch_and_log_exceptions def mark_itr_forced_run(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]): - log.debug("Marking item %s as unskippable by ITR", item_id) - core.dispatch("test_visibility.itr.mark_forced_run", (item_id,)) + log.debug("Marking item %s as forced run by ITR", item_id) + + require_ci_visibility_service().get_item_by_id(item_id).mark_itr_forced_run() @staticmethod @_catch_and_log_exceptions - def was_forced_run(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]) -> bool: - """Skippable items are not currently tied to a test session, so no session ID is passed""" - log.debug("Checking if item %s was forced to run", item_id) - _was_forced_run = bool( - core.dispatch_with_results("test_visibility.itr.was_forced_run", (item_id,)).was_forced_run.value - ) - log.debug("Item %s was forced run: %s", item_id, _was_forced_run) - return _was_forced_run + def was_itr_forced_run(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]) -> bool: + log.debug("Checking if item %s was forced run by ITR", item_id) + + return require_ci_visibility_service().get_item_by_id(item_id).was_itr_forced_run() @staticmethod @_catch_and_log_exceptions def is_itr_skippable(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]) -> bool: - """Skippable items are not currently tied to a test session, so no session ID is passed""" - log.debug("Checking if item %s is skippable", item_id) - is_item_skippable = bool( - core.dispatch_with_results("test_visibility.itr.is_item_skippable", (item_id,)).is_item_skippable.value - ) - log.debug("Item %s is skippable: %s", item_id, is_item_skippable) + log.debug("Checking if item %s is skippable by ITR", item_id) + ci_visibility_instance = require_ci_visibility_service() - return is_item_skippable + if not isinstance(item_id, (ext_api.TestSuiteId, InternalTestId)): + log.warning("Only suites or tests can be skippable, not %s", type(item_id)) + return False + + if not ci_visibility_instance.test_skipping_enabled(): + log.debug("Test skipping is not enabled") + return False + + return ci_visibility_instance.is_item_itr_skippable(item_id) @staticmethod @_catch_and_log_exceptions def is_itr_unskippable(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]) -> bool: - """Skippable items are not currently tied to a test session, so no session ID is passed""" - log.debug("Checking if item %s is unskippable", item_id) - is_item_unskippable = bool( - core.dispatch_with_results("test_visibility.itr.is_item_unskippable", (item_id,)).is_item_unskippable.value - ) - log.debug("Item %s is unskippable: %s", item_id, is_item_unskippable) + log.debug("Checking if item %s is unskippable by ITR", item_id) - return is_item_unskippable + if not isinstance(item_id, (ext_api.TestSuiteId, InternalTestId)): + raise CIVisibilityError("Only suites or tests can be unskippable") + return require_ci_visibility_service().get_item_by_id(item_id).is_itr_unskippable() @staticmethod @_catch_and_log_exceptions - def was_skipped_by_itr(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]) -> bool: - """Skippable items are not currently tied to a test session, so no session ID is passed""" + def was_itr_skipped(item_id: t.Union[ext_api.TestSuiteId, InternalTestId]) -> bool: log.debug("Checking if item %s was skipped by ITR", item_id) - was_item_skipped = bool( - core.dispatch_with_results("test_visibility.itr.was_item_skipped", (item_id,)).was_item_skipped.value - ) - log.debug("Item %s was skipped by ITR: %s", item_id, was_item_skipped) - return was_item_skipped - class AddCoverageArgs(t.NamedTuple): - item_id: t.Union[ext_api.TestSuiteId, ext_api.TestId] - coverage_data: t.Dict[Path, CoverageLines] + return require_ci_visibility_service().get_item_by_id(item_id).is_itr_skipped() @staticmethod @_catch_and_log_exceptions - def add_coverage_data( - item_id: t.Union[ext_api.TestSuiteId, InternalTestId], coverage_data: t.Dict[Path, CoverageLines] - ): - log.debug("Adding coverage data for item %s: %s", item_id, coverage_data) - core.dispatch("test_visibility.item.add_coverage_data", (ITRMixin.AddCoverageArgs(item_id, coverage_data),)) + def add_coverage_data(item_id, coverage_data) -> None: + """Adds coverage data to an item, merging with existing coverage data if necessary""" + log.debug("Adding coverage data for item id %s", item_id) + + if not isinstance(item_id, (ext_api.TestSuiteId, InternalTestId)): + log.warning("Coverage data can only be added to suites and tests, not %s", type(item_id)) + return + + require_ci_visibility_service().get_item_by_id(item_id).add_coverage_data(coverage_data) @staticmethod @_catch_and_log_exceptions @@ -97,8 +97,5 @@ def get_coverage_data( item_id: t.Union[ext_api.TestSuiteId, InternalTestId] ) -> t.Optional[t.Dict[Path, CoverageLines]]: log.debug("Getting coverage data for item %s", item_id) - coverage_data = core.dispatch_with_results( - "test_visibility.item.get_coverage_data", (item_id,) - ).coverage_data.value - log.debug("Coverage data for item %s: %s", item_id, coverage_data) - return coverage_data + + return require_ci_visibility_service().get_item_by_id(item_id).get_coverage_data() diff --git a/ddtrace/internal/test_visibility/_utils.py b/ddtrace/internal/test_visibility/_utils.py deleted file mode 100644 index 0e22a15dc94..00000000000 --- a/ddtrace/internal/test_visibility/_utils.py +++ /dev/null @@ -1,13 +0,0 @@ -from ddtrace.ext.test_visibility._test_visibility_base import TestVisibilityItemId -from ddtrace.internal import core -from ddtrace.internal.logger import get_logger -from ddtrace.trace import Span - - -log = get_logger(__name__) - - -def _get_item_span(item_id: TestVisibilityItemId) -> Span: - log.debug("Getting span for item %s", item_id) - span: Span = core.dispatch_with_results("test_visibility.item.get_span", (item_id,)).span.value - return span diff --git a/ddtrace/internal/test_visibility/api.py b/ddtrace/internal/test_visibility/api.py index 4b8efaada61..0c74b66b4e3 100644 --- a/ddtrace/internal/test_visibility/api.py +++ b/ddtrace/internal/test_visibility/api.py @@ -1,14 +1,10 @@ from pathlib import Path import typing as t -from typing import NamedTuple from ddtrace.ext.test_visibility import api as ext_api from ddtrace.ext.test_visibility._test_visibility_base import TestSessionId from ddtrace.ext.test_visibility._utils import _catch_and_log_exceptions -from ddtrace.ext.test_visibility._utils import _is_item_finished -from ddtrace.ext.test_visibility.api import TestExcInfo -from ddtrace.ext.test_visibility.api import TestStatus -from ddtrace.internal import core +from ddtrace.internal.ci_visibility.service_registry import require_ci_visibility_service from ddtrace.internal.codeowners import Codeowners as _Codeowners from ddtrace.internal.logger import get_logger from ddtrace.internal.test_visibility._atr_mixins import ATRSessionMixin @@ -21,7 +17,6 @@ from ddtrace.internal.test_visibility._internal_item_ids import InternalTestId from ddtrace.internal.test_visibility._itr_mixins import ITRMixin from ddtrace.internal.test_visibility._library_capabilities import LibraryCapabilities -from ddtrace.internal.test_visibility._utils import _get_item_span from ddtrace.trace import Span from ddtrace.trace import Tracer @@ -29,6 +24,10 @@ log = get_logger(__name__) +def _get_item_span(item_id: t.Union[ext_api.TestVisibilityItemId, InternalTestId]) -> Span: + return require_ci_visibility_service().get_item_by_id(item_id).get_span() + + class InternalTestBase(ext_api.TestBase): @staticmethod @_catch_and_log_exceptions @@ -39,21 +38,40 @@ def get_span(item_id: t.Union[ext_api.TestVisibilityItemId, InternalTestId]) -> @_catch_and_log_exceptions def stash_set(item_id, key: str, value: object): log.debug("Stashing value %s for key %s in item %s", value, key, item_id) - core.dispatch("test_visibility.item.stash_set", (item_id, key, value)) + + require_ci_visibility_service().get_item_by_id(item_id).stash_set(key, value) @staticmethod @_catch_and_log_exceptions - def stash_get(item_id: ext_api.TestVisibilityItemId, key: str): + def stash_get(item_id: ext_api.TestVisibilityItemId, key: str) -> t.Optional[object]: log.debug("Getting stashed value for key %s in item %s", key, item_id) - stash_value = core.dispatch_with_results("test_visibility.item.stash_get", (item_id, key)).stash_value.value - log.debug("Got stashed value %s for key %s in item %s", stash_value, key, item_id) - return stash_value + + return require_ci_visibility_service().get_item_by_id(item_id).stash_get(key) @staticmethod @_catch_and_log_exceptions def stash_delete(item_id: ext_api.TestVisibilityItemId, key: str): log.debug("Deleting stashed value for key %s in item %s", key, item_id) - core.dispatch("test_visibility.item.stash_delete", (item_id, key)) + + require_ci_visibility_service().get_item_by_id(item_id).stash_delete(key) + + @staticmethod + @_catch_and_log_exceptions + def overwrite_attributes( + item_id: InternalTestId, + name: t.Optional[str] = None, + suite_name: t.Optional[str] = None, + parameters: t.Optional[str] = None, + codeowners: t.Optional[t.List[str]] = None, + ) -> None: + log.debug("Overwriting attributes for: %s", item_id) + + require_ci_visibility_service().get_test_by_id(item_id).overwrite_attributes( + name, + suite_name, + parameters, + codeowners, + ) class InternalTestSession(ext_api.TestSession, EFDSessionMixin, ATRSessionMixin, AttemptToFixSessionMixin): @@ -63,89 +81,74 @@ def get_span() -> Span: @staticmethod def is_finished() -> bool: - return _is_item_finished(TestSessionId()) + return ext_api._is_item_finished(TestSessionId()) @staticmethod @_catch_and_log_exceptions def get_codeowners() -> t.Optional[_Codeowners]: - log.debug("Getting codeowners object") + log.debug("Getting codeowners") - codeowners: t.Optional[_Codeowners] = core.dispatch_with_results( - "test_visibility.session.get_codeowners", - ).codeowners.value - return codeowners + return require_ci_visibility_service().get_codeowners() @staticmethod @_catch_and_log_exceptions def get_tracer() -> t.Optional[Tracer]: - log.debug("Getting test session tracer") - tracer: t.Optional[Tracer] = core.dispatch_with_results("test_visibility.session.get_tracer").tracer.value - log.debug("Got test session tracer: %s", tracer) - return tracer + log.debug("Getting tracer") + + return require_ci_visibility_service().get_tracer() @staticmethod @_catch_and_log_exceptions def get_workspace_path() -> t.Optional[Path]: - log.debug("Getting session workspace path") + log.debug("Getting workspace path") - workspace_path: Path = core.dispatch_with_results( - "test_visibility.session.get_workspace_path" - ).workspace_path.value - return workspace_path + path_str = require_ci_visibility_service().get_workspace_path() + return Path(path_str) if path_str is not None else None @staticmethod @_catch_and_log_exceptions def should_collect_coverage() -> bool: - log.debug("Checking if coverage should be collected for session") - - _should_collect_coverage = bool( - core.dispatch_with_results("test_visibility.session.should_collect_coverage").should_collect_coverage.value - ) - log.debug("Coverage should be collected: %s", _should_collect_coverage) + log.debug("Checking if should collect coverage") - return _should_collect_coverage + return require_ci_visibility_service().should_collect_coverage() @staticmethod @_catch_and_log_exceptions def is_test_skipping_enabled() -> bool: log.debug("Checking if test skipping is enabled") - _is_test_skipping_enabled = bool( - core.dispatch_with_results( - "test_visibility.session.is_test_skipping_enabled" - ).is_test_skipping_enabled.value - ) - log.debug("Test skipping is enabled: %s", _is_test_skipping_enabled) - - return _is_test_skipping_enabled + return require_ci_visibility_service().test_skipping_enabled() @staticmethod @_catch_and_log_exceptions - def set_covered_lines_pct(coverage_pct: float): - log.debug("Setting covered lines percentage for session to %s", coverage_pct) + def set_covered_lines_pct(coverage_pct: float) -> None: + log.debug("Setting coverage percentage for session to %s", coverage_pct) - core.dispatch("test_visibility.session.set_covered_lines_pct", (coverage_pct,)) + require_ci_visibility_service().get_session().set_covered_lines_pct(coverage_pct) @staticmethod @_catch_and_log_exceptions def get_path_codeowners(path: Path) -> t.Optional[t.List[str]]: - log.debug("Getting codeowners object for path %s", path) + log.debug("Getting codeowners for path %s", path) - path_codeowners: t.Optional[t.List[str]] = core.dispatch_with_results( - "test_visibility.session.get_path_codeowners", (path,) - ).path_codeowners.value - return path_codeowners + codeowners = require_ci_visibility_service().get_codeowners() + if codeowners is None: + return None + return codeowners.of(str(path)) @staticmethod @_catch_and_log_exceptions def set_library_capabilities(capabilities: LibraryCapabilities) -> None: - core.dispatch("test_visibility.session.set_library_capabilities", (capabilities,)) + log.debug("Setting library capabilities") + + require_ci_visibility_service().set_library_capabilities(capabilities) @staticmethod @_catch_and_log_exceptions - def set_itr_tags(skipped_count: int): - log.debug("Setting itr session tags: %d", skipped_count) - core.dispatch("test_visibility.session.set_itr_skipped_count", (skipped_count,)) + def set_itr_skipped_count(skipped_count: int) -> None: + log.debug("Setting skipped count: %d", skipped_count) + + require_ci_visibility_service().get_session().set_skipped_count(skipped_count) class InternalTestModule(ext_api.TestModule, InternalTestBase): @@ -159,72 +162,47 @@ class InternalTestSuite(ext_api.TestSuite, InternalTestBase, ITRMixin): class InternalTest( ext_api.Test, InternalTestBase, ITRMixin, EFDTestMixin, ATRTestMixin, AttemptToFixTestMixin, BenchmarkTestMixin ): - class FinishArgs(NamedTuple): - """InternalTest allows finishing with an overridden finish time (for EFD and other retry purposes)""" - - test_id: InternalTestId - status: t.Optional[TestStatus] = None - skip_reason: t.Optional[str] = None - exc_info: t.Optional[TestExcInfo] = None - override_finish_time: t.Optional[float] = None - @staticmethod @_catch_and_log_exceptions def finish( item_id: InternalTestId, status: t.Optional[ext_api.TestStatus] = None, - reason: t.Optional[str] = None, + skip_reason: t.Optional[str] = None, exc_info: t.Optional[ext_api.TestExcInfo] = None, override_finish_time: t.Optional[float] = None, ): - log.debug("Finishing test with status: %s, reason: %s", status, reason) - core.dispatch( - "test_visibility.test.finish", - (InternalTest.FinishArgs(item_id, status, reason, exc_info, override_finish_time),), + log.debug("Finishing test with status: %s, skip_reason: %s", status, skip_reason) + require_ci_visibility_service().get_test_by_id(item_id).finish_test( + status=status, skip_reason=skip_reason, exc_info=exc_info, override_finish_time=override_finish_time ) @staticmethod @_catch_and_log_exceptions - def is_new_test(item_id: InternalTestId) -> bool: - log.debug("Checking if test %s is new", item_id) - is_new = bool(core.dispatch_with_results("test_visibility.test.is_new", (item_id,)).is_new.value) - log.debug("Test %s is new: %s", item_id, is_new) - return is_new + def is_new_test(test_id: t.Union[ext_api.TestId, InternalTestId]) -> bool: + log.debug("Checking if test %s is new", test_id) + + return require_ci_visibility_service().get_test_by_id(test_id).is_new() @staticmethod @_catch_and_log_exceptions - def is_quarantined_test(item_id: InternalTestId) -> bool: - log.debug("Checking if test %s is quarantined", item_id) - is_quarantined = bool( - core.dispatch_with_results("test_visibility.test.is_quarantined", (item_id,)).is_quarantined.value - ) - log.debug("Test %s is quarantined: %s", item_id, is_quarantined) - return is_quarantined + def is_quarantined_test(test_id: t.Union[ext_api.TestId, InternalTestId]) -> bool: + log.debug("Checking if test %s is quarantined", test_id) + + return require_ci_visibility_service().get_test_by_id(test_id).is_quarantined() @staticmethod @_catch_and_log_exceptions - def is_disabled_test(item_id: InternalTestId) -> bool: - log.debug("Checking if test %s is disabled", item_id) - is_disabled = bool(core.dispatch_with_results("test_visibility.test.is_disabled", (item_id,)).is_disabled.value) - log.debug("Test %s is disabled: %s", item_id, is_disabled) - return is_disabled + def is_disabled_test(test_id: t.Union[ext_api.TestId, InternalTestId]) -> bool: + log.debug("Checking if test %s is disabled", test_id) + + return require_ci_visibility_service().get_test_by_id(test_id).is_disabled() @staticmethod @_catch_and_log_exceptions - def is_attempt_to_fix(item_id: InternalTestId) -> bool: - log.debug("Checking if test %s is attempt to fix", item_id) - is_attempt_to_fix = bool( - core.dispatch_with_results("test_visibility.test.is_attempt_to_fix", (item_id,)).is_attempt_to_fix.value - ) - log.debug("Test %s is attempt to fix: %s", item_id, is_attempt_to_fix) - return is_attempt_to_fix + def is_attempt_to_fix(test_id: t.Union[ext_api.TestId, InternalTestId]) -> bool: + log.debug("Checking if test %s is attempt to fix", test_id) - class OverwriteAttributesArgs(NamedTuple): - test_id: InternalTestId - name: t.Optional[str] = None - suite_name: t.Optional[str] = None - parameters: t.Optional[str] = None - codeowners: t.Optional[t.List[str]] = None + return require_ci_visibility_service().get_test_by_id(test_id).is_attempt_to_fix() @staticmethod @_catch_and_log_exceptions @@ -235,15 +213,8 @@ def overwrite_attributes( parameters: t.Optional[str] = None, codeowners: t.Optional[t.List[str]] = None, ): - log.debug( - "Overwriting attributes for test %s: name=%s" ", suite_name=%s" ", parameters=%s" ", codeowners=%s", - item_id, - name, - suite_name, - parameters, - codeowners, - ) - core.dispatch( - "test_visibility.test.overwrite_attributes", - (InternalTest.OverwriteAttributesArgs(item_id, name, suite_name, parameters, codeowners),), + log.debug("Overwriting attributes for test %s", item_id) + + require_ci_visibility_service().get_test_by_id(item_id).overwrite_attributes( + name, suite_name, parameters, codeowners ) diff --git a/tests/ci_visibility/api/test_internal_ci_visibility_api.py b/tests/ci_visibility/api/test_internal_ci_visibility_api.py index e463f042241..37eea33989d 100644 --- a/tests/ci_visibility/api/test_internal_ci_visibility_api.py +++ b/tests/ci_visibility/api/test_internal_ci_visibility_api.py @@ -2,9 +2,9 @@ import pytest -from ddtrace.ext.test_visibility._item_ids import TestId -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.ext.test_visibility.api import TestSourceFileInfo from ddtrace.internal.ci_visibility.api._base import TestVisibilitySessionSettings from ddtrace.internal.ci_visibility.api._suite import TestVisibilitySuite diff --git a/tests/ci_visibility/api_client/_util.py b/tests/ci_visibility/api_client/_util.py index 35826186c08..949d3de97a2 100644 --- a/tests/ci_visibility/api_client/_util.py +++ b/tests/ci_visibility/api_client/_util.py @@ -7,8 +7,8 @@ import ddtrace from ddtrace.ext import ci from ddtrace.ext.test_visibility import ITR_SKIPPING_LEVEL -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.internal.ci_visibility import CIVisibility from ddtrace.internal.ci_visibility._api_client import AgentlessTestVisibilityAPIClient from ddtrace.internal.ci_visibility._api_client import EVPProxyTestVisibilityAPIClient diff --git a/tests/ci_visibility/test_ci_visibility.py b/tests/ci_visibility/test_ci_visibility.py index ccf48ad315b..c4c5566648a 100644 --- a/tests/ci_visibility/test_ci_visibility.py +++ b/tests/ci_visibility/test_ci_visibility.py @@ -7,7 +7,6 @@ import textwrap import time from typing import Set -from unittest.mock import Mock import mock import msgpack @@ -32,6 +31,7 @@ from ddtrace.internal.ci_visibility.git_client import CIVisibilityGitClientSerializerV1 from ddtrace.internal.ci_visibility.recorder import CIVisibilityTracer from ddtrace.internal.ci_visibility.recorder import _extract_repository_name_from_url +from ddtrace.internal.ci_visibility.recorder import _is_item_itr_skippable import ddtrace.internal.test_visibility._internal_item_ids from ddtrace.internal.test_visibility._library_capabilities import LibraryCapabilities from ddtrace.internal.utils.http import Response @@ -1382,65 +1382,54 @@ def _get_all_test_ids(self): return {getattr(self, test_id) for test_id in vars(self.__class__) if re.match(r"^m\d_s\d_t\d$", test_id)} def test_is_item_itr_skippable_test_level(self): - with mock.patch.object(CIVisibility, "enabled", True), mock.patch.object( - CIVisibility, "_instance", Mock() - ) as mock_instance: - mock_instance._itr_data = ITRData(skippable_items=self.test_level_tests_to_skip) - mock_instance._suite_skipping_mode = False - - expected_skippable_test_ids = { - self.m1_s1_t1, - self.m1_s1_t2, - self.m1_s1_t5, - self.m2_s1_t3, - self.m2_s2_t2, - self.m2_s2_t4, - self.m2_s2_t6, - self.m2_s2_t7, - self.m3_s1_t5, - self.m3_s2_t1, - self.m3_s2_t6, - self.m3_s2_t7, - } - expected_non_skippable_test_ids = self._get_all_test_ids() - expected_skippable_test_ids - - assert CIVisibility._instance is not None + expected_skippable_test_ids = { + self.m1_s1_t1, + self.m1_s1_t2, + self.m1_s1_t5, + self.m2_s1_t3, + self.m2_s2_t2, + self.m2_s2_t4, + self.m2_s2_t6, + self.m2_s2_t7, + self.m3_s1_t5, + self.m3_s2_t1, + self.m3_s2_t6, + self.m3_s2_t7, + } + expected_non_skippable_test_ids = self._get_all_test_ids() - expected_skippable_test_ids + itr_data = ITRData(skippable_items=self.test_level_tests_to_skip) + suite_skipping_mode = False - # Check skippable tests are correct - for test_id in expected_skippable_test_ids: - assert CIVisibility.is_item_itr_skippable(test_id) is True + # Check skippable tests are correct + for test_id in expected_skippable_test_ids: + assert _is_item_itr_skippable(test_id, suite_skipping_mode, itr_data) is True - # Check non-skippable tests are correct - for test_id in expected_non_skippable_test_ids: - assert CIVisibility.is_item_itr_skippable(test_id) is False + # Check non-skippable tests are correct + for test_id in expected_non_skippable_test_ids: + assert _is_item_itr_skippable(test_id, suite_skipping_mode, itr_data) is False - # Check all suites are not skippable - for suite_id in self._get_all_suite_ids(): - assert CIVisibility.is_item_itr_skippable(suite_id) is False + # Check all suites are not skippable + for suite_id in self._get_all_suite_ids(): + assert _is_item_itr_skippable(suite_id, suite_skipping_mode, itr_data) is False def test_is_item_itr_skippable_suite_level(self): - with mock.patch.object(CIVisibility, "enabled", True), mock.patch.object( - CIVisibility, "_instance", Mock() - ) as mock_instance: - mock_instance._itr_data = ITRData(skippable_items=self.suite_level_test_suites_to_skip) - mock_instance._suite_skipping_mode = True - - expected_skippable_suite_ids = {self.m1_s1, self.m2_s1, self.m2_s2, self.m3_s1} - expected_non_skippable_suite_ids = self._get_all_suite_ids() - set(expected_skippable_suite_ids) + itr_data = ITRData(skippable_items=self.suite_level_test_suites_to_skip) + suite_skipping_mode = True - assert CIVisibility._instance is not None + expected_skippable_suite_ids = {self.m1_s1, self.m2_s1, self.m2_s2, self.m3_s1} + expected_non_skippable_suite_ids = self._get_all_suite_ids() - set(expected_skippable_suite_ids) - # Check skippable suites are correct - for suite_id in expected_skippable_suite_ids: - assert CIVisibility.is_item_itr_skippable(suite_id) is True + # Check skippable suites are correct + for suite_id in expected_skippable_suite_ids: + assert _is_item_itr_skippable(suite_id, suite_skipping_mode, itr_data) is True - # Check non-skippable suites are correct - for suite_id in expected_non_skippable_suite_ids: - assert CIVisibility.is_item_itr_skippable(suite_id) is False + # Check non-skippable suites are correct + for suite_id in expected_non_skippable_suite_ids: + assert _is_item_itr_skippable(suite_id, suite_skipping_mode, itr_data) is False - # Check all tests are not skippable - for test_id in self._get_all_test_ids(): - assert CIVisibility.is_item_itr_skippable(test_id) is False + # Check all tests are not skippable + for test_id in self._get_all_test_ids(): + assert _is_item_itr_skippable(test_id, suite_skipping_mode, itr_data) is False class TestCIVisibilitySetTestSessionName(TracerTestCase): @@ -1468,7 +1457,7 @@ def test_set_test_session_name_from_command(self): """ with _ci_override_env(dict()), set_up_mock_civisibility(), _patch_dummy_writer(): CIVisibility.enable() - CIVisibility.set_test_session_name(test_command="some_command") + CIVisibility._instance.set_test_session_name(test_command="some_command") self.assert_test_session_name("some_command") def test_set_test_session_name_from_dd_test_session_name_env_var(self): @@ -1479,7 +1468,7 @@ def test_set_test_session_name_from_dd_test_session_name_env_var(self): ) ), set_up_mock_civisibility(), _patch_dummy_writer(): CIVisibility.enable() - CIVisibility.set_test_session_name(test_command="some_command") + CIVisibility._instance.set_test_session_name(test_command="some_command") self.assert_test_session_name("the_name") def test_set_test_session_name_from_job_name_and_command(self): @@ -1493,7 +1482,7 @@ def test_set_test_session_name_from_job_name_and_command(self): ) ), set_up_mock_civisibility(), _patch_dummy_writer(): CIVisibility.enable() - CIVisibility.set_test_session_name(test_command="some_command") + CIVisibility._instance.set_test_session_name(test_command="some_command") self.assert_test_session_name("the_job-some_command") def test_set_test_session_name_from_dd_test_session_name_env_var_priority(self): @@ -1506,7 +1495,7 @@ def test_set_test_session_name_from_dd_test_session_name_env_var_priority(self): ) ), set_up_mock_civisibility(), _patch_dummy_writer(): CIVisibility.enable() - CIVisibility.set_test_session_name(test_command="some_command") + CIVisibility._instance.set_test_session_name(test_command="some_command") self.assert_test_session_name("the_name") @@ -1522,7 +1511,7 @@ def tearDown(self): def test_set_library_capabilities(self): with _ci_override_env(), set_up_mock_civisibility(), _patch_dummy_writer(): CIVisibility.enable() - CIVisibility.set_library_capabilities( + CIVisibility._instance.set_library_capabilities( LibraryCapabilities( early_flake_detection="1", auto_test_retries=None, diff --git a/tests/ci_visibility/test_efd.py b/tests/ci_visibility/test_efd.py index 0e2de603c6a..d47f824a0e2 100644 --- a/tests/ci_visibility/test_efd.py +++ b/tests/ci_visibility/test_efd.py @@ -4,8 +4,8 @@ import pytest -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.ext.test_visibility.api import TestStatus from ddtrace.internal.ci_visibility._api_client import EarlyFlakeDetectionSettings from ddtrace.internal.ci_visibility.api._base import TestVisibilitySessionSettings diff --git a/tests/contrib/pytest/test_pytest_xdist_itr.py b/tests/contrib/pytest/test_pytest_xdist_itr.py index fc8491e1bfd..6efa7f41a2d 100644 --- a/tests/contrib/pytest/test_pytest_xdist_itr.py +++ b/tests/contrib/pytest/test_pytest_xdist_itr.py @@ -13,9 +13,9 @@ from ddtrace.contrib.internal.pytest._utils import _USE_PLUGIN_V2 from ddtrace.contrib.internal.pytest._utils import _pytest_version_supports_itr from ddtrace.ext import test -from ddtrace.ext.test_visibility._item_ids import TestId -from ddtrace.ext.test_visibility._item_ids import TestModuleId -from ddtrace.ext.test_visibility._item_ids import TestSuiteId +from ddtrace.ext.test_visibility._test_visibility_base import TestId +from ddtrace.ext.test_visibility._test_visibility_base import TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId from ddtrace.internal.ci_visibility._api_client import EarlyFlakeDetectionSettings from ddtrace.internal.ci_visibility._api_client import TestManagementSettings from ddtrace.internal.ci_visibility._api_client import TestVisibilityAPISettings @@ -97,7 +97,7 @@ def test_pytest_xdist_itr_skips_tests(self): from ddtrace.internal.ci_visibility._api_client import EarlyFlakeDetectionSettings from ddtrace.internal.ci_visibility._api_client import TestManagementSettings from ddtrace.internal.ci_visibility._api_client import ITRData -from ddtrace.ext.test_visibility._item_ids import TestSuiteId, TestModuleId +from ddtrace.ext.test_visibility._test_visibility_base import TestSuiteId, TestModuleId # Create ITR settings and data diff --git a/tests/contrib/pytest/test_report_links.py b/tests/contrib/pytest/test_report_links.py index e41d5e22533..11232e034e2 100644 --- a/tests/contrib/pytest/test_report_links.py +++ b/tests/contrib/pytest/test_report_links.py @@ -88,9 +88,13 @@ def line(self, text): def test_print_report_links_full(mocker): terminalreporter = TerminalReporterMock() - + ci_visibility_instance = mocker.Mock(spec=CIVisibility) + mocker.patch( + "ddtrace.contrib.internal.pytest._report_links.require_ci_visibility_service", + return_value=ci_visibility_instance, + ) mocker.patch.object( - CIVisibility, + ci_visibility_instance, "get_ci_tags", lambda: { ci.git.REPOSITORY_URL: "https://github.com/some-org/some-repo", @@ -100,8 +104,8 @@ def test_print_report_links_full(mocker): ci.PIPELINE_ID: "123456", }, ) - mocker.patch.object(CIVisibility, "get_session_settings", lambda: _get_session_settings()) - mocker.patch.object(CIVisibility, "get_dd_env", lambda: None) + mocker.patch.object(ci_visibility_instance, "get_session_settings", lambda: _get_session_settings()) + mocker.patch.object(ci_visibility_instance, "get_dd_env", lambda: None) with override_env({}): _report_links.print_test_report_links(terminalreporter) @@ -122,8 +126,13 @@ def test_print_report_links_full(mocker): def test_print_report_links_only_commit_report(mocker): terminalreporter = TerminalReporterMock() + ci_visibility_instance = mocker.Mock(spec=CIVisibility) + mocker.patch( + "ddtrace.contrib.internal.pytest._report_links.require_ci_visibility_service", + return_value=ci_visibility_instance, + ) mocker.patch.object( - CIVisibility, + ci_visibility_instance, "get_ci_tags", lambda: { ci.git.REPOSITORY_URL: "https://github.com/some-org/some-repo", @@ -131,8 +140,8 @@ def test_print_report_links_only_commit_report(mocker): ci.git.COMMIT_SHA: "abcd0123", }, ) - mocker.patch.object(CIVisibility, "get_session_settings", lambda: _get_session_settings()) - mocker.patch.object(CIVisibility, "get_dd_env", lambda: None) + mocker.patch.object(ci_visibility_instance, "get_session_settings", lambda: _get_session_settings()) + mocker.patch.object(ci_visibility_instance, "get_dd_env", lambda: None) with override_env({}): _report_links.print_test_report_links(terminalreporter) @@ -149,16 +158,21 @@ def test_print_report_links_only_commit_report(mocker): def test_print_report_links_only_test_runs_report(mocker): terminalreporter = TerminalReporterMock() + ci_visibility_instance = mocker.Mock(spec=CIVisibility) + mocker.patch( + "ddtrace.contrib.internal.pytest._report_links.require_ci_visibility_service", + return_value=ci_visibility_instance, + ) mocker.patch.object( - CIVisibility, + ci_visibility_instance, "get_ci_tags", lambda: { ci.JOB_NAME: "the_job", ci.PIPELINE_ID: "123456", }, ) - mocker.patch.object(CIVisibility, "get_session_settings", lambda: _get_session_settings()) - mocker.patch.object(CIVisibility, "get_dd_env", lambda: None) + mocker.patch.object(ci_visibility_instance, "get_session_settings", lambda: _get_session_settings()) + mocker.patch.object(ci_visibility_instance, "get_dd_env", lambda: None) with override_env({}): _report_links.print_test_report_links(terminalreporter) @@ -176,13 +190,18 @@ def test_print_report_links_only_test_runs_report(mocker): def test_print_report_links_no_report(mocker): terminalreporter = TerminalReporterMock() + ci_visibility_instance = mocker.Mock(spec=CIVisibility) + mocker.patch( + "ddtrace.contrib.internal.pytest._report_links.require_ci_visibility_service", + return_value=ci_visibility_instance, + ) mocker.patch.object( - CIVisibility, + ci_visibility_instance, "get_ci_tags", lambda: {}, ) - mocker.patch.object(CIVisibility, "get_session_settings", lambda: _get_session_settings()) - mocker.patch.object(CIVisibility, "get_dd_env", lambda: None) + mocker.patch.object(ci_visibility_instance, "get_session_settings", lambda: _get_session_settings()) + mocker.patch.object(ci_visibility_instance, "get_dd_env", lambda: None) with override_env({}): _report_links.print_test_report_links(terminalreporter) @@ -193,8 +212,13 @@ def test_print_report_links_no_report(mocker): def test_print_report_links_escape_names(mocker): terminalreporter = TerminalReporterMock() + ci_visibility_instance = mocker.Mock(spec=CIVisibility) + mocker.patch( + "ddtrace.contrib.internal.pytest._report_links.require_ci_visibility_service", + return_value=ci_visibility_instance, + ) mocker.patch.object( - CIVisibility, + ci_visibility_instance, "get_ci_tags", lambda: { ci.git.REPOSITORY_URL: "https://github.com/some-org/some-repo", @@ -204,8 +228,8 @@ def test_print_report_links_escape_names(mocker): ci.PIPELINE_ID: 'a "strange" id', }, ) - mocker.patch.object(CIVisibility, "get_session_settings", lambda: _get_session_settings()) - mocker.patch.object(CIVisibility, "get_dd_env", lambda: None) + mocker.patch.object(ci_visibility_instance, "get_session_settings", lambda: _get_session_settings()) + mocker.patch.object(ci_visibility_instance, "get_dd_env", lambda: None) with override_env({}): _report_links.print_test_report_links(terminalreporter) @@ -225,8 +249,13 @@ def test_print_report_links_escape_names(mocker): def test_print_report_links_commit_report_with_env(mocker): terminalreporter = TerminalReporterMock() + ci_visibility_instance = mocker.Mock(spec=CIVisibility) + mocker.patch( + "ddtrace.contrib.internal.pytest._report_links.require_ci_visibility_service", + return_value=ci_visibility_instance, + ) mocker.patch.object( - CIVisibility, + ci_visibility_instance, "get_ci_tags", lambda: { ci.git.REPOSITORY_URL: "https://github.com/some-org/some-repo", @@ -234,8 +263,8 @@ def test_print_report_links_commit_report_with_env(mocker): ci.git.COMMIT_SHA: "abcd0123", }, ) - mocker.patch.object(CIVisibility, "get_session_settings", lambda: _get_session_settings()) - mocker.patch.object(CIVisibility, "get_dd_env", lambda: "the_env") + mocker.patch.object(ci_visibility_instance, "get_session_settings", lambda: _get_session_settings()) + mocker.patch.object(ci_visibility_instance, "get_dd_env", lambda: "the_env") with override_env({}): _report_links.print_test_report_links(terminalreporter)