From 5e349664e29cda78e33e580f527e5ed09264af05 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 11 Jun 2025 14:52:48 -0400 Subject: [PATCH 1/8] perf(rc): improve performance of subprocess service sharing This change will only write service names to the shared file queue only if we have not shared that service name yet. This improvement avoids locking and writing to disk every time we call Pin.onto which can happen a lot when database connections are cycled/recreated frequently. It also helps ensure that the same process doesn't continually write the same service name to the same file over and over again. --- ddtrace/settings/_config.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ddtrace/settings/_config.py b/ddtrace/settings/_config.py index 91eb9d084fb..628e2e1579b 100644 --- a/ddtrace/settings/_config.py +++ b/ddtrace/settings/_config.py @@ -504,6 +504,7 @@ def __init__(self): self.version = _get_config("DD_VERSION", self.tags.get("version")) self._http_server = self._HTTPServerConfig() + self._extra_services_sent = set() # type: set[str] self._extra_services_queue = None if self._remote_config_enabled and not in_aws_lambda(): # lazy load slow import @@ -667,8 +668,12 @@ def __getattr__(self, name) -> Any: def _add_extra_service(self, service_name: str) -> None: if self._extra_services_queue is None: return - if service_name != self.service: - self._extra_services_queue.put(service_name) + + if service_name == self.service or service_name in self._extra_services_sent: + return + + self._extra_services_queue.put(service_name) + self._extra_services_sent.add(service_name) def _get_extra_services(self): # type: () -> set[str] From 9f36e6300a6491d9d492a30670bbffae8bf57482 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 11 Jun 2025 15:33:27 -0400 Subject: [PATCH 2/8] refactor extra services tests to use subprocesses --- .../service_name/test_extra_services_names.py | 71 +++++++++---------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index f20e752d605..400e647dce6 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -1,42 +1,41 @@ -import random -import re -import threading -import time +import os import pytest -import ddtrace - - -MAX_NAMES = 64 - - -@pytest.mark.parametrize("nb_service", [2, 16, 64, 256]) -def test_service_name(nb_service): - ddtrace.config._extra_services = set() - def write_in_subprocess(id_nb): - time.sleep(random.random()) - ddtrace.config._add_extra_service(f"extra_service_{id_nb}") - - default_remote_config_enabled = ddtrace.config._remote_config_enabled - ddtrace.config._remote_config_enabled = True - if ddtrace.config._extra_services_queue is None: - import ddtrace.internal._file_queue as file_queue - - ddtrace.config._extra_services_queue = file_queue.File_Queue() - - threads = [threading.Thread(target=write_in_subprocess, args=(i,)) for i in range(nb_service)] - for thread in threads: - thread.start() - for thread in threads: - thread.join() +@pytest.mark.parametrize("child_services", [1, 20]) +def test_config_extra_service_names(child_services, run_python_code_in_subprocess): + code = f""" +import ddtrace.auto +import ddtrace - extra_services = ddtrace.config._get_extra_services() - assert len(extra_services) == min(nb_service, MAX_NAMES) - assert all(re.match(r"extra_service_\d+", service) for service in extra_services) +import re +import os +import sys +import time - ddtrace.config._remote_config_enabled = default_remote_config_enabled - if not default_remote_config_enabled: - ddtrace.config._extra_services_queue = None - ddtrace.config._extra_services = set() +children = [] +for i in range(10): + pid = os.fork() + if pid == 0: + # Child process + print(ddtrace.config._extra_services_queue) + for c in range({child_services}): + ddtrace.config._add_extra_service(f"extra_service_{{i}}_{{c}}") + sys.exit(0) + else: + # Parent process + children.append(pid) + +for pid in children: + os.waitpid(pid, 0) + +extra_services = ddtrace.config._get_extra_services() +assert len(extra_services) == min(10 * {child_services}, 64), extra_services +assert all(re.match(r"extra_service_\\d+_\\d+", service) for service in extra_services) +""" + + env = os.environ.copy() + env["DD_REMOTE_CONFIGURATION_ENABLED"] = "true" + stdout, stderr, status, _ = run_python_code_in_subprocess(code, env=env) + assert status == 0, (stdout, stderr, status) From a48bd99e05e8f657074b84f29ce8fcd85350ee73 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 11 Jun 2025 15:48:34 -0400 Subject: [PATCH 3/8] ignore coverage service name --- tests/internal/service_name/test_extra_services_names.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index 400e647dce6..45f8d4a6270 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -31,6 +31,7 @@ def test_config_extra_service_names(child_services, run_python_code_in_subproces os.waitpid(pid, 0) extra_services = ddtrace.config._get_extra_services() +extra_services.discard("sqlite") # coverage assert len(extra_services) == min(10 * {child_services}, 64), extra_services assert all(re.match(r"extra_service_\\d+_\\d+", service) for service in extra_services) """ From bf07200b0ab666d9c116ba1429d17081deeb3d0a Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Thu, 12 Jun 2025 11:19:11 -0400 Subject: [PATCH 4/8] add additional non-forking tests --- .../service_name/test_extra_services_names.py | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index 45f8d4a6270..44c9c563bbe 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -1,10 +1,12 @@ import os +import sys import pytest @pytest.mark.parametrize("child_services", [1, 20]) -def test_config_extra_service_names(child_services, run_python_code_in_subprocess): +@pytest.mark.skipif(sys.platform in ("win32", "cygwin"), reason="Fork not supported on Windows") +def test_config_extra_service_names_fork(child_services, run_python_code_in_subprocess): code = f""" import ddtrace.auto import ddtrace @@ -40,3 +42,48 @@ def test_config_extra_service_names(child_services, run_python_code_in_subproces env["DD_REMOTE_CONFIGURATION_ENABLED"] = "true" stdout, stderr, status, _ = run_python_code_in_subprocess(code, env=env) assert status == 0, (stdout, stderr, status) + + +def test_config_extra_service_names_duplicates(run_python_code_in_subprocess): + code = """ +import ddtrace.auto +import ddtrace +import re +import os +import sys +import time + +for _ in range(10): + ddtrace.config._add_extra_service("extra_service_1") + +extra_services = ddtrace.config._get_extra_services() +extra_services.discard("sqlite") # coverage +assert extra_services == {"extra_service_1"} + """ + + env = os.environ.copy() + env["DD_REMOTE_CONFIGURATION_ENABLED"] = "true" + stdout, stderr, status, _ = run_python_code_in_subprocess(code, env=env) + assert status == 0, (stdout, stderr, status) + + +def test_config_extra_service_names_rc_disabled(run_python_code_in_subprocess): + code = """ +import ddtrace.auto +import ddtrace +import re +import os +import sys +import time + +for _ in range(10): + ddtrace.config._add_extra_service("extra_service_1") + +extra_services = ddtrace.config._get_extra_services() +assert len(extra_services) == 0 + """ + + env = os.environ.copy() + env["DD_REMOTE_CONFIGURATION_ENABLED"] = "false" + stdout, stderr, status, _ = run_python_code_in_subprocess(code, env=env) + assert status == 0, (stdout, stderr, status) From be755c6123c2c6f310b0c3e150d109459dd43bb5 Mon Sep 17 00:00:00 2001 From: Brett Langdon Date: Thu, 12 Jun 2025 11:57:37 -0400 Subject: [PATCH 5/8] Update tests/internal/service_name/test_extra_services_names.py --- tests/internal/service_name/test_extra_services_names.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index 44c9c563bbe..b7692899b1a 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -21,7 +21,6 @@ def test_config_extra_service_names_fork(child_services, run_python_code_in_subp pid = os.fork() if pid == 0: # Child process - print(ddtrace.config._extra_services_queue) for c in range({child_services}): ddtrace.config._add_extra_service(f"extra_service_{{i}}_{{c}}") sys.exit(0) From f15583f9c9ec63f69b3b95cf8608db1282e56986 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Thu, 12 Jun 2025 12:02:21 -0400 Subject: [PATCH 6/8] make test more reliable --- tests/internal/service_name/test_extra_services_names.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index b7692899b1a..5eb2e6c7f7b 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -4,9 +4,8 @@ import pytest -@pytest.mark.parametrize("child_services", [1, 20]) @pytest.mark.skipif(sys.platform in ("win32", "cygwin"), reason="Fork not supported on Windows") -def test_config_extra_service_names_fork(child_services, run_python_code_in_subprocess): +def test_config_extra_service_names_fork(run_python_code_in_subprocess): code = f""" import ddtrace.auto import ddtrace @@ -21,8 +20,8 @@ def test_config_extra_service_names_fork(child_services, run_python_code_in_subp pid = os.fork() if pid == 0: # Child process - for c in range({child_services}): - ddtrace.config._add_extra_service(f"extra_service_{{i}}_{{c}}") + ddtrace.config._add_extra_service(f"extra_service_{{i}}") + time.sleep(0.1) # Ensure the child has time to save the service sys.exit(0) else: # Parent process @@ -33,7 +32,7 @@ def test_config_extra_service_names_fork(child_services, run_python_code_in_subp extra_services = ddtrace.config._get_extra_services() extra_services.discard("sqlite") # coverage -assert len(extra_services) == min(10 * {child_services}, 64), extra_services +assert len(extra_services) == 10, extra_services assert all(re.match(r"extra_service_\\d+_\\d+", service) for service in extra_services) """ From e12bfb2e1aade785be9cf50364570da65b681145 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Mon, 16 Jun 2025 08:21:58 -0400 Subject: [PATCH 7/8] fix linting --- tests/internal/service_name/test_extra_services_names.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index 5eb2e6c7f7b..38f069a1782 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -6,7 +6,7 @@ @pytest.mark.skipif(sys.platform in ("win32", "cygwin"), reason="Fork not supported on Windows") def test_config_extra_service_names_fork(run_python_code_in_subprocess): - code = f""" + code = """ import ddtrace.auto import ddtrace @@ -20,7 +20,7 @@ def test_config_extra_service_names_fork(run_python_code_in_subprocess): pid = os.fork() if pid == 0: # Child process - ddtrace.config._add_extra_service(f"extra_service_{{i}}") + ddtrace.config._add_extra_service(f"extra_service_{i}") time.sleep(0.1) # Ensure the child has time to save the service sys.exit(0) else: From 484844499503f78e313f88eb076bea1437b29d77 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Mon, 16 Jun 2025 12:51:39 -0400 Subject: [PATCH 8/8] fix missed assertion update --- tests/internal/service_name/test_extra_services_names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/internal/service_name/test_extra_services_names.py b/tests/internal/service_name/test_extra_services_names.py index 38f069a1782..9c0a5f04beb 100644 --- a/tests/internal/service_name/test_extra_services_names.py +++ b/tests/internal/service_name/test_extra_services_names.py @@ -33,7 +33,7 @@ def test_config_extra_service_names_fork(run_python_code_in_subprocess): extra_services = ddtrace.config._get_extra_services() extra_services.discard("sqlite") # coverage assert len(extra_services) == 10, extra_services -assert all(re.match(r"extra_service_\\d+_\\d+", service) for service in extra_services) +assert all(re.match(r"extra_service_\\d+", service) for service in extra_services), extra_services """ env = os.environ.copy()