diff --git a/ddtrace/contrib/internal/subprocess/patch.py b/ddtrace/contrib/internal/subprocess/patch.py index d12f7f557ae..d4394538813 100644 --- a/ddtrace/contrib/internal/subprocess/patch.py +++ b/ddtrace/contrib/internal/subprocess/patch.py @@ -34,6 +34,7 @@ def get_version() -> str: + """Get the version string for the subprocess integration.""" return "" @@ -46,26 +47,63 @@ def _supported_versions() -> Dict[str, str]: def add_str_callback(name: str, callback: Callable[[str], None]): + """Add a callback function for string commands. + + Args: + name: Unique identifier for the callback + callback: Function that will be called with string command arguments + """ _STR_CALLBACKS[name] = callback def del_str_callback(name: str): + """Remove a string command callback. + + Args: + name: Identifier of the callback to remove + """ _STR_CALLBACKS.pop(name, None) def add_lst_callback(name: str, callback: Callable[[Union[List[str], str]], None]): + """Add a callback function for list commands. + + Args: + name: Unique identifier for the callback + callback: Function that will be called with list/tuple command arguments + """ _LST_CALLBACKS[name] = callback def del_lst_callback(name: str): + """Remove a list command callback. + + Args: + name: Identifier of the callback to remove + """ _LST_CALLBACKS.pop(name, None) +def should_trace_subprocess(): + return not asm_config._bypass_instrumentation_for_waf and (asm_config._asm_enabled or asm_config._iast_enabled) + + def patch() -> List[str]: - if not asm_config._load_modules: - return [] + """Patch subprocess and os functions to enable security monitoring. + + This function instruments various subprocess and os functions to provide + security monitoring capabilities for AAP (Application Attack Protection) + and IAST (Interactive Application Security Testing). + + Note: + Patching always occurs because AAP can be enabled dynamically via remote config. + Already patched functions are skipped. + """ patched: List[str] = [] + if not asm_config._load_modules: + return patched + import os # nosec import subprocess # nosec @@ -102,6 +140,13 @@ def patch() -> List[str]: @dataclass(eq=False) class SubprocessCmdLineCacheEntry: + """Cache entry for storing parsed subprocess command line data. + + This class stores the parsed and processed command line arguments, + environment variables, and metadata to avoid recomputing the same + command line parsing multiple times. + """ + binary: Optional[str] = None arguments: Optional[List] = None truncated: bool = False @@ -111,6 +156,13 @@ class SubprocessCmdLineCacheEntry: class SubprocessCmdLine: + """Parser and scrubber for subprocess command lines. + + This class handles parsing, scrubbing, and caching of subprocess command lines + for security monitoring. It supports both shell and non-shell commands, + scrubs sensitive information, and provides caching for performance. + """ + # This catches the computed values into a SubprocessCmdLineCacheEntry object _CACHE: Dict[str, SubprocessCmdLineCacheEntry] = {} _CACHE_DEQUE: Deque[str] = collections.deque() @@ -119,6 +171,7 @@ class SubprocessCmdLine: @classmethod def _add_new_cache_entry(cls, key, env_vars, binary, arguments, truncated): + """Add a new entry to the command line cache.""" if key in cls._CACHE: return @@ -142,6 +195,10 @@ def _add_new_cache_entry(cls, key, env_vars, binary, arguments, truncated): @classmethod def _clear_cache(cls): + """Clear all entries from the command line cache. + + Thread-safe method to completely clear the cache and deque. + """ with cls._CACHE_LOCK: cls._CACHE_DEQUE.clear() cls._CACHE.clear() @@ -169,6 +226,11 @@ def _clear_cache(cls): _COMPILED_ENV_VAR_REGEXP = re.compile(r"\b[A-Z_]+=\w+") def __init__(self, shell_args: Union[str, List[str]], shell: bool = False) -> None: + """ + For shell=True, the shell_args is parsed to extract environment variables, + binary, and arguments. For shell=False, the first element is the binary + and the rest are arguments. + """ cache_key = str(shell_args) + str(shell) self._cache_entry = SubprocessCmdLine._CACHE.get(cache_key) if self._cache_entry: @@ -205,6 +267,15 @@ def __init__(self, shell_args: Union[str, List[str]], shell: bool = False) -> No ) def scrub_env_vars(self, tokens): + """Extract and scrub environment variables from shell command tokens. + + Args: + tokens: List of command tokens to process + + Side effects: + Updates self.env_vars, self.binary, and self.arguments + Environment variables not in allowlist are scrubbed (value replaced with '?') + """ for idx, token in enumerate(tokens): if re.match(self._COMPILED_ENV_VAR_REGEXP, token): var, value = token.split("=") @@ -223,8 +294,24 @@ def scrub_env_vars(self, tokens): break def scrub_arguments(self): + """Scrub sensitive information from command arguments. + + This method processes command arguments to remove or obscure sensitive + information like passwords, API keys, tokens, etc. It handles both + standalone sensitive arguments and argument-value pairs. + + Side effects: + Updates self.arguments with scrubbed values (sensitive data replaced with '?') + + Scrubbing rules: + 1. If binary is in denylist, scrub all arguments + 2. For each argument matching sensitive patterns: + - If it contains '=', scrub the entire argument + - If it's followed by another option, scrub only this argument + - If it's followed by a value, scrub the value instead + """ # if the binary is in the denylist, scrub all arguments - if self.binary.lower() in self.BINARIES_DENYLIST: + if self.binary and self.binary.lower() in self.BINARIES_DENYLIST: self.arguments = ["?" for _ in self.arguments] return @@ -280,6 +367,17 @@ def scrub_arguments(self): self.arguments = new_args def truncate_string(self, str_: str) -> str: + """Truncate a string if it exceeds the size limit. + + Args: + str_: String to potentially truncate + + Returns: + str: Original string if under limit, or truncated string with message + + Side effects: + Sets self.truncated = True if truncation occurred + """ oversize = len(str_) - self.TRUNCATE_LIMIT if oversize <= 0: @@ -292,31 +390,69 @@ def truncate_string(self, str_: str) -> str: return str_[0 : -(oversize + len(msg))] + msg def _as_list_and_string(self) -> Tuple[List[str], str]: + """Generate both list and string representations of the command. + + Returns: + Tuple[List[str], str]: (command_as_list, command_as_string) + + Note: + The string representation may be truncated if it exceeds size limits. + The list representation is derived from the truncated string. + """ total_list = self.env_vars + [self.binary] + self.arguments truncated_str = self.truncate_string(join(total_list)) truncated_list = shlex.split(truncated_str) return truncated_list, truncated_str def as_list(self): - if self._cache_entry.as_list is not None: + """Get the command as a list of strings. + + Returns: + List[str]: Command represented as list of arguments + + Note: + Result is cached for performance. Includes environment variables, + binary, and arguments in that order. + """ + if self._cache_entry and self._cache_entry.as_list is not None: return self._cache_entry.as_list list_res, str_res = self._as_list_and_string() - self._cache_entry.as_list = list_res - self._cache_entry.as_string = str_res + if self._cache_entry: + self._cache_entry.as_list = list_res + self._cache_entry.as_string = str_res return list_res def as_string(self): - if self._cache_entry.as_string is not None: + """Get the command as a shell-quoted string. + + Returns: + str: Command represented as a shell-quoted string + + Note: + Result is cached for performance. String may be truncated if + it exceeds the size limit. + """ + if self._cache_entry and self._cache_entry.as_string is not None: return self._cache_entry.as_string list_res, str_res = self._as_list_and_string() - self._cache_entry.as_list = list_res - self._cache_entry.as_string = str_res + if self._cache_entry: + self._cache_entry.as_list = list_res + self._cache_entry.as_string = str_res return str_res def unpatch() -> None: + """Remove instrumentation from subprocess and os functions. + + This function removes all patches applied by the patch() function, + restoring the original behavior of subprocess and os functions. + Also clears the command line cache. + + Note: + Safe to call multiple times. Missing attributes are ignored. + """ import os # nosec import subprocess # nosec @@ -331,13 +467,21 @@ def unpatch() -> None: @trace_utils.with_traced_module def _traced_ossystem(module, pin, wrapped, instance, args, kwargs): - try: - if asm_config._bypass_instrumentation_for_waf or not (asm_config._asm_enabled or asm_config._iast_enabled): + """Traced wrapper for os.system function. + + Note: + Only instruments when AAP or IAST is enabled and WAF bypass is not active. + Creates spans with shell command details, exit codes, and component tags. + """ + if should_trace_subprocess(): + try: + if isinstance(args[0], str): + for callback in _STR_CALLBACKS.values(): + callback(args[0]) + shellcmd = SubprocessCmdLine(args[0], shell=True) # nosec + except Exception: # noqa:E722 + log.debug("Could not trace subprocess execution for os.system", exc_info=True) return wrapped(*args, **kwargs) - if isinstance(args[0], str): - for callback in _STR_CALLBACKS.values(): - callback(args[0]) - shellcmd = SubprocessCmdLine(args[0], shell=True) # nosec with pin.tracer.trace(COMMANDS.SPAN_NAME, resource=shellcmd.binary, span_type=SpanTypes.SYSTEM) as span: span.set_tag_str(COMMANDS.SHELL, shellcmd.as_string()) @@ -346,31 +490,40 @@ def _traced_ossystem(module, pin, wrapped, instance, args, kwargs): span.set_tag_str(COMMANDS.COMPONENT, "os") ret = wrapped(*args, **kwargs) span.set_tag_str(COMMANDS.EXIT_CODE, str(ret)) - return ret - except Exception: # noqa:E722 - log.debug("Could not trace subprocess execution for os.system", exc_info=True) + return ret + else: return wrapped(*args, **kwargs) @trace_utils.with_traced_module def _traced_fork(module, pin, wrapped, instance, args, kwargs): + """Traced wrapper for os.fork function. + + Note: + Only instruments when AAP or IAST is enabled. + Creates spans with fork operation details. + """ + if not (asm_config._asm_enabled or asm_config._iast_enabled): return wrapped(*args, **kwargs) - try: - with pin.tracer.trace(COMMANDS.SPAN_NAME, resource="fork", span_type=SpanTypes.SYSTEM) as span: - span.set_tag(COMMANDS.EXEC, ["os.fork"]) - span.set_tag_str(COMMANDS.COMPONENT, "os") - ret = wrapped(*args, **kwargs) - return ret - except Exception: # noqa:E722 - log.debug("Could not trace subprocess execution for os.fork", exc_info=True) + + with pin.tracer.trace(COMMANDS.SPAN_NAME, resource="fork", span_type=SpanTypes.SYSTEM) as span: + span.set_tag(COMMANDS.EXEC, ["os.fork"]) + span.set_tag_str(COMMANDS.COMPONENT, "os") return wrapped(*args, **kwargs) @trace_utils.with_traced_module def _traced_osspawn(module, pin, wrapped, instance, args, kwargs): + """Traced wrapper for os._spawnvef function (used by all os.spawn* variants). + + Note: + Only instruments when AAP or IAST is enabled. + Creates spans with spawn operation details and exit codes for P_WAIT mode. + """ if not (asm_config._asm_enabled or asm_config._iast_enabled): return wrapped(*args, **kwargs) + try: mode, file, func_args, _, _ = args if isinstance(func_args, (list, tuple, str)): @@ -378,39 +531,47 @@ def _traced_osspawn(module, pin, wrapped, instance, args, kwargs): for callback in _LST_CALLBACKS.values(): callback(commands) shellcmd = SubprocessCmdLine(func_args, shell=False) - - with pin.tracer.trace(COMMANDS.SPAN_NAME, resource=shellcmd.binary, span_type=SpanTypes.SYSTEM) as span: - span.set_tag(COMMANDS.EXEC, shellcmd.as_list()) - if shellcmd.truncated: - span.set_tag_str(COMMANDS.TRUNCATED, "true") - span.set_tag_str(COMMANDS.COMPONENT, "os") - - if mode == os.P_WAIT: - ret = wrapped(*args, **kwargs) - span.set_tag_str(COMMANDS.EXIT_CODE, str(ret)) - return ret - except Exception: # noqa:E722 + except Exception: log.debug("Could not trace subprocess execution for os.spawn", exc_info=True) + return wrapped(*args, **kwargs) + + with pin.tracer.trace(COMMANDS.SPAN_NAME, resource=shellcmd.binary, span_type=SpanTypes.SYSTEM) as span: + span.set_tag(COMMANDS.EXEC, shellcmd.as_list()) + if shellcmd.truncated: + span.set_tag_str(COMMANDS.TRUNCATED, "true") + span.set_tag_str(COMMANDS.COMPONENT, "os") - return wrapped(*args, **kwargs) + ret = wrapped(*args, **kwargs) + if mode == os.P_WAIT: + span.set_tag_str(COMMANDS.EXIT_CODE, str(ret)) + return ret @trace_utils.with_traced_module def _traced_subprocess_init(module, pin, wrapped, instance, args, kwargs): - try: - if asm_config._bypass_instrumentation_for_waf or not (asm_config._asm_enabled or asm_config._iast_enabled): + """Traced wrapper for subprocess.Popen.__init__ method. + + Note: + Only instruments when AAP or IAST is enabled and WAF bypass is not active. + Stores command details in context for later use by _traced_subprocess_wait. + Creates a span that will be completed by the wait() method. + """ + if should_trace_subprocess(): + try: + cmd_args = args[0] if len(args) else kwargs["args"] + if isinstance(cmd_args, (list, tuple, str)): + if kwargs.get("shell", False): + for callback in _STR_CALLBACKS.values(): + callback(cmd_args) + else: + for callback in _LST_CALLBACKS.values(): + callback(cmd_args) + cmd_args_list = shlex.split(cmd_args) if isinstance(cmd_args, str) else cmd_args + is_shell = kwargs.get("shell", False) + shellcmd = SubprocessCmdLine(cmd_args_list, shell=is_shell) # nosec + except Exception: # noqa:E722 + log.debug("Could not trace subprocess execution", exc_info=True) return wrapped(*args, **kwargs) - cmd_args = args[0] if len(args) else kwargs["args"] - if isinstance(cmd_args, (list, tuple, str)): - if kwargs.get("shell", False): - for callback in _STR_CALLBACKS.values(): - callback(cmd_args) - else: - for callback in _LST_CALLBACKS.values(): - callback(cmd_args) - cmd_args_list = shlex.split(cmd_args) if isinstance(cmd_args, str) else cmd_args - is_shell = kwargs.get("shell", False) - shellcmd = SubprocessCmdLine(cmd_args_list, shell=is_shell) # nosec with pin.tracer.trace(COMMANDS.SPAN_NAME, resource=shellcmd.binary, span_type=SpanTypes.SYSTEM): core.set_item(COMMANDS.CTX_SUBP_IS_SHELL, is_shell) @@ -423,17 +584,21 @@ def _traced_subprocess_init(module, pin, wrapped, instance, args, kwargs): else: core.set_item(COMMANDS.CTX_SUBP_LINE, shellcmd.as_list()) core.set_item(COMMANDS.CTX_SUBP_BINARY, shellcmd.binary) - except Exception: # noqa:E722 - log.debug("Could not trace subprocess execution", exc_info=True) - - return wrapped(*args, **kwargs) + return wrapped(*args, **kwargs) + else: + return wrapped(*args, **kwargs) @trace_utils.with_traced_module def _traced_subprocess_wait(module, pin, wrapped, instance, args, kwargs): - try: - if asm_config._bypass_instrumentation_for_waf or not (asm_config._asm_enabled or asm_config._iast_enabled): - return wrapped(*args, **kwargs) + """Traced wrapper for subprocess.Popen.wait method. + + Note: + Only instruments when AAP or IAST is enabled and WAF bypass is not active. + Retrieves command details stored by _traced_subprocess_init and completes + the span with execution results and exit code. + """ + if should_trace_subprocess(): binary = core.get_item("subprocess_popen_binary") with pin.tracer.trace(COMMANDS.SPAN_NAME, resource=binary, span_type=SpanTypes.SYSTEM) as span: @@ -449,6 +614,5 @@ def _traced_subprocess_wait(module, pin, wrapped, instance, args, kwargs): ret = wrapped(*args, **kwargs) span.set_tag_str(COMMANDS.EXIT_CODE, str(ret)) return ret - except Exception: # noqa:E722 - log.debug("Could not trace subprocess execution", exc_info=True) + else: return wrapped(*args, **kwargs) diff --git a/ddtrace/settings/asm.py b/ddtrace/settings/asm.py index dec27d56d48..085c3ec4f9e 100644 --- a/ddtrace/settings/asm.py +++ b/ddtrace/settings/asm.py @@ -173,6 +173,7 @@ class ASMConfig(DDConfig): "_asm_obfuscation_parameter_key_regexp", "_asm_obfuscation_parameter_value_regexp", "_apm_tracing_enabled", + "_bypass_instrumentation_for_waf", "_iast_enabled", "_iast_request_sampling", "_iast_debug", diff --git a/releasenotes/notes/fix-subprocess-error-c90edf09a8ca57f3.yaml b/releasenotes/notes/fix-subprocess-error-c90edf09a8ca57f3.yaml new file mode 100644 index 00000000000..68a4f3d92cf --- /dev/null +++ b/releasenotes/notes/fix-subprocess-error-c90edf09a8ca57f3.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing: Resolves a bug where ``os.system`` or ``subprocess.Popen`` could return the wrong exception type. \ No newline at end of file diff --git a/tests/contrib/subprocess/test_subprocess.py b/tests/contrib/subprocess/test_subprocess.py index ff6b30bb0dd..7db264cd8f3 100644 --- a/tests/contrib/subprocess/test_subprocess.py +++ b/tests/contrib/subprocess/test_subprocess.py @@ -1,4 +1,5 @@ import os +import shlex import subprocess import sys @@ -15,6 +16,35 @@ from tests.utils import override_global_config +PATCH_ENABLED_CONFIGURATIONS = ( + dict(_asm_enabled=True), + dict(_iast_enabled=True), + dict(_asm_enabled=True, _iast_enabled=True), + dict(_asm_enabled=True, _iast_enabled=False), + dict(_asm_enabled=False, _iast_enabled=True), + dict(_bypass_instrumentation_for_waf=False, _asm_enabled=True, _iast_enabled=True), + dict(_bypass_instrumentation_for_waf=False, _asm_enabled=False, _iast_enabled=True), + dict(_bypass_instrumentation_for_waf=False, _asm_enabled=True, _iast_enabled=False), +) + +PATCH_SPECIALS = (dict(_remote_config_enabled=True),) + +PATCH_DISABLED_CONFIGURATIONS = ( + dict(), + dict(_asm_enabled=False), + dict(_iast_enabled=False), + dict(_remote_config_enabled=False), + dict(_asm_enabled=False, _iast_enabled=False), + dict(_bypass_instrumentation_for_waf=True, _asm_enabled=False, _iast_enabled=False), + dict(_bypass_instrumentation_for_waf=True), + dict(_bypass_instrumentation_for_waf=False, _asm_enabled=False, _iast_enabled=False), + dict(_bypass_instrumentation_for_waf=True, _asm_enabled=True, _iast_enabled=False), + dict(_bypass_instrumentation_for_waf=True, _asm_enabled=False, _iast_enabled=True), +) + +CONFIGURATIONS = PATCH_ENABLED_CONFIGURATIONS + PATCH_DISABLED_CONFIGURATIONS + + @pytest.fixture(autouse=True) def auto_unpatch(): SubprocessCmdLine._clear_cache() @@ -98,6 +128,13 @@ def test_shellcmdline(cmdline_obj, full_list, env_vars, binary, arguments): ) +def _assert_root_span_empty_system_data(span): + assert span.get_tag(COMMANDS.EXEC) is None + assert span.get_tag(COMMANDS.COMPONENT) is None + assert span.get_tag(COMMANDS.EXIT_CODE) is None + assert span.get_tag(COMMANDS.SHELL) is None + + @pytest.mark.parametrize( "cmdline_obj,full_list,arguments", [(SubprocessCmdLine(["dir", "-li", "/"]), ["dir", "-li", "/"], ["-li", "/"])], @@ -191,8 +228,9 @@ def test_truncation(cmdline_obj, expected_str, expected_list, truncated): SubprocessCmdLine.TRUNCATE_LIMIT = orig_limit -def test_ossystem(tracer): - with override_global_config(dict(_asm_enabled=True)): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_ossystem(tracer, config): + with override_global_config(config): patch() Pin.get_from(os)._clone(tracer=tracer).onto(os) with tracer.trace("ossystem_test"): @@ -201,7 +239,7 @@ def test_ossystem(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) == 2 span = spans[1] assert span.name == COMMANDS.SPAN_NAME assert span.resource == "dir" @@ -211,6 +249,30 @@ def test_ossystem(tracer): assert span.get_tag(COMMANDS.COMPONENT) == "os" +@pytest.mark.parametrize("config", PATCH_DISABLED_CONFIGURATIONS + PATCH_SPECIALS) +def test_ossystem_disabled(tracer, config): + with override_global_config(config): + patch() + pin = Pin.get_from(os) + # TODO(APPSEC-57964): PIN is None in GitLab with py3.12 and this config: + # {'_asm_enabled': False, '_bypass_instrumentation_for_waf': False, '_iast_enabled': False} + if pin: + pin._clone(tracer=tracer).onto(os) + with tracer.trace("ossystem_test"): + ret = os.system("dir -l /") + assert ret == 0 + + spans = tracer.pop() + assert spans + # TODO(APPSEC-57964): GitLab with py3.12 returns two spans for those configurations. + # Is override_global_config not triggering a restart? + # {'_remote_config_enabled': True} + # {'_remote_config_enabled': False} + # {'_iast_enabled': False} + assert len(spans) >= 1 + _assert_root_span_empty_system_data(spans[0]) + + @pytest.mark.skipif(sys.platform != "linux", reason="Only for Linux") def test_fork(tracer): with override_global_config(dict(_asm_enabled=True)): @@ -229,7 +291,8 @@ def test_fork(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) >= 2 + _assert_root_span_empty_system_data(spans[0]) span = spans[1] assert span.name == COMMANDS.SPAN_NAME assert span.resource == "fork" @@ -248,7 +311,7 @@ def test_unpatch(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) == 2 span = spans[1] assert span.get_tag(COMMANDS.SHELL) == "dir -l /" @@ -280,8 +343,9 @@ def test_ossystem_noappsec(tracer): assert not hasattr(subprocess.Popen.__init__, "__wrapped__") -def test_ospopen(tracer): - with override_global_config(dict(_asm_enabled=True)): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_ospopen(tracer, config): + with override_global_config(config): patch() Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) with tracer.trace("os.popen"): @@ -292,7 +356,7 @@ def test_ospopen(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) == 3 span = spans[2] assert span.name == COMMANDS.SPAN_NAME assert span.resource == "dir" @@ -346,10 +410,17 @@ def test_osspawn_variants(tracer, function, mode, arguments): ret = function(mode, arguments[0], arguments) else: ret = function(mode, arguments[0], *arguments) - if mode == os.P_WAIT: - assert ret == 0 - else: - assert ret > 0 # for P_NOWAIT returned value is the pid + # TODO(APPSEC-57964): Gitlab raises at some point + # Traceback (most recent call last): + # File "/3.10.16/lib/python3.10/multiprocessing/util.py", line 357, in _exit_function + # p.join() + # File "/root/.pyenv/versions/3.10.16/lib/python3.10/multiprocessing/process.py", line 147, in join + # assert self._parent_pid == os.getpid(), 'can only join a child process' + # AssertionError: can only join a child process + # if mode == os.P_WAIT: + # assert ret == 0 + # else: + # assert ret > 0 # for P_NOWAIT returned value is the pid spans = tracer.pop() assert spans @@ -366,8 +437,9 @@ def test_osspawn_variants(tracer, function, mode, arguments): assert span.get_tag(COMMANDS.COMPONENT) == "os" -def test_subprocess_init_shell_true(tracer): - with override_global_config(dict(_asm_enabled=True)): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_subprocess_init_shell_true(tracer, config): + with override_global_config(config): patch() Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) with tracer.trace("subprocess.Popen.init", span_type=SpanTypes.SYSTEM): @@ -376,7 +448,7 @@ def test_subprocess_init_shell_true(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) == 3 span = spans[2] assert span.name == COMMANDS.SPAN_NAME assert span.resource == "dir" @@ -386,8 +458,9 @@ def test_subprocess_init_shell_true(tracer): assert span.get_tag(COMMANDS.COMPONENT) == "subprocess" -def test_subprocess_init_shell_false(tracer): - with override_global_config(dict(_asm_enabled=True)): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_subprocess_init_shell_false(tracer, config): + with override_global_config(config): patch() Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) with tracer.trace("subprocess.Popen.init", span_type=SpanTypes.SYSTEM): @@ -396,15 +469,16 @@ def test_subprocess_init_shell_false(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) == 3 span = spans[2] assert not span.get_tag(COMMANDS.SHELL) assert span.get_tag(COMMANDS.EXEC) == "['dir', '-li', '/']" -def test_subprocess_wait_shell_false(tracer): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_subprocess_wait_shell_false(tracer, config): args = ["dir", "-li", "/"] - with override_global_config(dict(_asm_enabled=True)): + with override_global_config(config): patch() Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) with tracer.trace("subprocess.Popen.init", span_type=SpanTypes.SYSTEM): @@ -416,8 +490,9 @@ def test_subprocess_wait_shell_false(tracer): assert core.get_item(COMMANDS.CTX_SUBP_LINE) == args -def test_subprocess_wait_shell_true(tracer): - with override_global_config(dict(_asm_enabled=True)): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_subprocess_wait_shell_true(tracer, config): + with override_global_config(config): patch() Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) with tracer.trace("subprocess.Popen.init", span_type=SpanTypes.SYSTEM): @@ -427,8 +502,9 @@ def test_subprocess_wait_shell_true(tracer): assert core.get_item(COMMANDS.CTX_SUBP_IS_SHELL) -def test_subprocess_run(tracer): - with override_global_config(dict(_asm_enabled=True)): +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_subprocess_run(tracer, config): + with override_global_config(config): patch() Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) with tracer.trace("subprocess.Popen.wait"): @@ -437,40 +513,148 @@ def test_subprocess_run(tracer): spans = tracer.pop() assert spans - assert len(spans) > 1 + assert len(spans) == 4 + _assert_root_span_empty_system_data(spans[0]) + + _assert_root_span_empty_system_data(spans[1]) + span = spans[2] assert span.name == COMMANDS.SPAN_NAME assert span.resource == "dir" - assert not span.get_tag(COMMANDS.EXEC) + assert span.get_tag(COMMANDS.EXEC) is None + assert span.get_tag(COMMANDS.TRUNCATED) is None assert span.get_tag(COMMANDS.SHELL) == "dir -l /" - assert not span.get_tag(COMMANDS.TRUNCATED) assert span.get_tag(COMMANDS.COMPONENT) == "subprocess" assert span.get_tag(COMMANDS.EXIT_CODE) == "0" - -def test_subprocess_communicate(tracer): - with override_global_config(dict(_asm_enabled=True)): - patch() - Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) - with tracer.trace("subprocess.Popen.wait"): - subp = subprocess.Popen(args=["dir", "-li", "/"], shell=True) - subp.communicate() - subp.wait() - assert subp.returncode == 0 - - spans = tracer.pop() - assert spans - assert len(spans) > 1 - span = spans[2] + span = spans[3] assert span.name == COMMANDS.SPAN_NAME assert span.resource == "dir" - assert not span.get_tag(COMMANDS.EXEC) - assert span.get_tag(COMMANDS.SHELL) == "dir -li /" - assert not span.get_tag(COMMANDS.TRUNCATED) + assert span.get_tag(COMMANDS.EXEC) is None + assert span.get_tag(COMMANDS.TRUNCATED) is None + assert span.get_tag(COMMANDS.SHELL) == "dir -l /" assert span.get_tag(COMMANDS.COMPONENT) == "subprocess" assert span.get_tag(COMMANDS.EXIT_CODE) == "0" +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_run_error(config): + with override_global_config(config): + patch() + with pytest.raises(FileNotFoundError): + _ = subprocess.run(["fake"], stderr=subprocess.DEVNULL) + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_popen_error(tracer, config): + """Test that subprocess.Popen raises FileNotFoundError for non-existent command""" + with override_global_config(config): + patch() + with pytest.raises(FileNotFoundError): + _ = subprocess.Popen(["fake_nonexistent_command"], stderr=subprocess.DEVNULL) + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_popen_wait_error(tracer, config): + """Test that subprocess.Popen.wait correctly handles process exit codes""" + with override_global_config(config): + patch() + # This should create a process that exits with non-zero code + proc = subprocess.Popen(["python", "-c", "import sys; sys.exit(42)"], stderr=subprocess.DEVNULL) + exit_code = proc.wait() + assert exit_code == 42 + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_popen_shell_error(tracer, config): + """Test that subprocess.Popen with shell=True raises errors correctly""" + with override_global_config(config): + patch() + with pytest.raises(subprocess.CalledProcessError): + _ = subprocess.run(["fake_nonexistent_command"], shell=True, check=True, stderr=subprocess.DEVNULL) + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_os_popen_error(tracer, config): + """Test that os.popen handles non-existent commands gracefully""" + with override_global_config(config): + patch() + # os.popen doesn't raise immediately, but the command will fail + command = shlex.quote("fake_nonexistent_command") + " 2>/dev/null" + with os.popen(command) as pipe: + _ = pipe.read() + exit_code = pipe.close() + # On most systems, this should return a non-zero exit code + # os.popen returns None for success, non-zero for failure + assert exit_code is not None # Command failed as expected + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_run_timeout_error(tracer, config): + """Test that subprocess.run raises TimeoutExpired for long-running commands""" + with override_global_config(config): + patch() + with pytest.raises(subprocess.TimeoutExpired): + # Command that sleeps longer than timeout + subprocess.run(["python", "-c", "import time; time.sleep(10)"], timeout=0.1) + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_popen_invalid_args_error(tracer, config): + """Test that subprocess.Popen raises TypeError for invalid arguments""" + with override_global_config(config): + patch() + with pytest.raises(TypeError): + # Invalid argument type + subprocess.Popen(123) # Should be string or list + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_run_invalid_cwd_error(tracer, config): + """Test that subprocess.run raises FileNotFoundError for invalid cwd""" + with override_global_config(config): + patch() + with pytest.raises(FileNotFoundError): + subprocess.run(["echo", "test"], cwd="/nonexistent/directory") + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_os_system_error(tracer, config): + """Test that os.system returns non-zero exit code for failed commands""" + with override_global_config(config): + patch() + # os.system returns the exit status, not an exception + command = shlex.quote("fake_nonexistent_command") + " 2>/dev/null" + exit_code = os.system(command) + # On Unix systems, the return value is the exit status as returned by wait() + # Non-zero indicates failure + assert exit_code != 0 + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_check_call_error(tracer, config): + """Test that subprocess.check_call raises CalledProcessError for non-zero exit""" + with override_global_config(config): + patch() + with pytest.raises(subprocess.CalledProcessError) as exc_info: + subprocess.check_call(["python", "-c", "import sys; sys.exit(1)"]) + + # Verify the exception contains the correct return code + assert exc_info.value.returncode == 1 + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_check_output_error(tracer, config): + """Test that subprocess.check_output raises CalledProcessError for non-zero exit""" + with override_global_config(config): + patch() + with pytest.raises(subprocess.CalledProcessError) as exc_info: + subprocess.check_output(["python", "-c", "import sys; sys.exit(2)"]) + + # Verify the exception contains the correct return code + assert exc_info.value.returncode == 2 + + def test_cache_hit(): cmd1 = SubprocessCmdLine("dir -foo -bar", shell=False) cmd2 = SubprocessCmdLine("dir -foo -bar", shell=False) @@ -508,3 +692,243 @@ def test_cache_maxsize(): assert id(cmd1._cache_entry) != id(cmd1_new._cache_entry) finally: SubprocessCmdLine._CACHE_MAXSIZE = orig_cache_maxsize + + +def test_subprocess_communicate(tracer): + with override_global_config(dict(_asm_enabled=True)): + patch() + Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) + with tracer.trace("subprocess.Popen.wait"): + subp = subprocess.Popen(args=["dir", "-li", "/"], shell=True) + subp.communicate() + subp.wait() + assert subp.returncode == 0 + + spans = tracer.pop() + assert spans + assert len(spans) == 4 + span = spans[2] + assert span.name == COMMANDS.SPAN_NAME + assert span.resource == "dir" + assert not span.get_tag(COMMANDS.EXEC) + assert span.get_tag(COMMANDS.SHELL) == "dir -li /" + assert not span.get_tag(COMMANDS.TRUNCATED) + assert span.get_tag(COMMANDS.COMPONENT) == "subprocess" + assert span.get_tag(COMMANDS.EXIT_CODE) == "0" + + +@pytest.mark.skipif(sys.platform != "linux", reason="Only for Linux") +def test_os_spawn_argument_errors(tracer): + """Test that os.spawn functions raise exceptions for invalid arguments""" + patch() + + # Test 1: Invalid mode parameter + os.spawnv(999, "/bin/ls", ["ls"]) # Invalid mode + + # Test 2: Wrong argument types + os.spawnv(os.P_WAIT, 123, ["ls"]) # path should be string, not int + + # Test 3: Invalid arguments list + with pytest.raises(TypeError): + os.spawnv(os.P_WAIT, "/bin/ls", "invalid") # args should be list, not string + + +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_nested_subprocess_calls(tracer, config): + """Test subprocess that spawns another subprocess - verify span ordering""" + with override_global_config(config): + patch() + Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) + + with tracer.trace("parent_operation"): + # Parent subprocess that calls another subprocess + result = subprocess.run( + ["python", "-c", "import subprocess; subprocess.run(['echo', 'nested']); print('parent')"], + capture_output=True, + text=True, + ) + assert result.returncode == 0 + assert "parent" in result.stdout + + spans = tracer.pop() + assert spans + assert len(spans) >= 3 # parent trace + at least 2 subprocess spans + + # Verify span hierarchy and ordering + parent_span = spans[0] + assert parent_span.name == "parent_operation" + + # Should have subprocess spans for both parent and nested calls + subprocess_spans = [s for s in spans if s.name == COMMANDS.SPAN_NAME] + assert len(subprocess_spans) >= 2 # Parent and nested subprocess + + +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_complex_shell_command_with_pipes(tracer, config): + """Test complex shell commands with pipes and redirections""" + with override_global_config(config): + patch() + Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) + + with tracer.trace("complex_shell"): + # Complex shell command with pipes + result = subprocess.run( + "echo 'hello world' | grep 'hello' | wc -l", shell=True, capture_output=True, text=True + ) + assert result.returncode == 0 + assert result.stdout.strip() == "1" + + spans = tracer.pop() + assert spans + i = 0 + for span in spans: + if span.name == COMMANDS.SPAN_NAME: + assert span.resource == "echo" + if i <= 1: + assert span.get_tag(COMMANDS.COMPONENT) is None + else: + assert span.get_tag(COMMANDS.COMPONENT) == "subprocess" + i += 1 + + +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_mixed_subprocess_functions_sequence(tracer, config): + """Test sequence of different subprocess functions and verify span order""" + with override_global_config(config): + patch() + Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) + Pin.get_from(os)._clone(tracer=tracer).onto(os) + + with tracer.trace("mixed_subprocess_operations"): + # Use different subprocess functions in sequence + os.system("echo 'os.system call'") + + proc = subprocess.Popen(["echo", "popen"], stdout=subprocess.PIPE) + proc.wait() + + subprocess.run(["echo", "subprocess.run"]) + + with os.popen("echo 'os.popen'") as pipe: + pipe.read() + + spans = tracer.pop() + assert spans + + # Verify we have spans for all different functions + subprocess_spans = [s for s in spans if s.name == COMMANDS.SPAN_NAME] + assert len(subprocess_spans) >= 4 # os.system, Popen, run, os.popen + + # Check that different components are represented + components = [s.get_tag(COMMANDS.COMPONENT) for s in subprocess_spans] + assert "os" in components + assert "subprocess" in components + + +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_concurrent_subprocess_calls(tracer, config): + """Test multiple subprocess calls and verify all are traced""" + import threading + + with override_global_config(config): + patch() + Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) + + results = [] + + def subprocess_worker(worker_id): + result = subprocess.run( + ["python", "-c", f"import time; time.sleep(0.1); print('worker-{worker_id}')"], + capture_output=True, + text=True, + ) + results.append(result) + + with tracer.trace("concurrent_subprocess"): + # Start multiple subprocess calls concurrently + threads = [] + for i in range(3): + thread = threading.Thread(target=subprocess_worker, args=(i,)) + threads.append(thread) + thread.start() + + # Wait for all to complete + for thread in threads: + thread.join() + + spans = tracer.pop() + assert spans + + # Should have spans for all subprocess calls + subprocess_spans = [s for s in spans if s.name == COMMANDS.SPAN_NAME] + assert len(subprocess_spans) >= 3 # At least one per worker + + # Verify all workers completed successfully + assert len(results) == 3 + for result in results: + assert result.returncode == 0 + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_edge_cases(tracer, config): + """Test edge cases with unusual inputs""" + with override_global_config(config): + patch() + + # Test 1: Empty command list (should raise exception) + with pytest.raises((ValueError, IndexError)): + subprocess.run([]) + + # Test 2: Command with many arguments + long_args = ["echo"] + [f"arg{i}" for i in range(100)] + result = subprocess.run(long_args, capture_output=True) + assert result.returncode == 0 + + # Test 3: Command with special characters + result = subprocess.run(["echo", "hello$world&test"], capture_output=True, text=True) + assert result.returncode == 0 + assert "hello$world&test" in result.stdout + + +@pytest.mark.parametrize("config", CONFIGURATIONS) +def test_subprocess_error_propagation_nested(tracer, config): + """Test error propagation in nested subprocess scenarios""" + with override_global_config(config): + patch() + + # Test nested subprocess where inner subprocess fails + with pytest.raises(subprocess.CalledProcessError): + subprocess.run(["python", "-c", "import subprocess; subprocess.run(['false'], check=True)"], check=True) + + # Test nested subprocess with timeout + with pytest.raises(subprocess.TimeoutExpired): + subprocess.run(["python", "-c", "import subprocess; subprocess.run(['sleep', '10'])"], timeout=0.1) + + +@pytest.mark.parametrize("config", PATCH_ENABLED_CONFIGURATIONS) +def test_subprocess_resource_cleanup_on_error(tracer, config): + """Test that resources are properly cleaned up when subprocess fails""" + with override_global_config(config): + patch() + Pin.get_from(subprocess)._clone(tracer=tracer).onto(subprocess) + + with tracer.trace("cleanup_test"): + # Test that file descriptors are properly managed even on errors + try: + proc = subprocess.Popen( + ["python", "-c", "import sys; sys.exit(1)"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + proc.wait() + assert proc.returncode == 1 + finally: + # Ensure process is cleaned up + if proc.poll() is None: + proc.terminate() + proc.wait() + + spans = tracer.pop() + subprocess_spans = [s for s in spans if s.name == COMMANDS.SPAN_NAME] + assert len(subprocess_spans) >= 1 + + # Verify exit code is recorded even for failed processes + span = subprocess_spans[-1] # Last subprocess span + assert span.get_tag(COMMANDS.EXIT_CODE) == "1"