diff --git a/ddtrace/appsec/_common_module_patches.py b/ddtrace/appsec/_common_module_patches.py index 7031301e104..388dccc2052 100644 --- a/ddtrace/appsec/_common_module_patches.py +++ b/ddtrace/appsec/_common_module_patches.py @@ -328,7 +328,7 @@ def try_unwrap(module, name): original = _DD_ORIGINAL_ATTRIBUTES[(parent, attribute)] apply_patch(parent, attribute, original) del _DD_ORIGINAL_ATTRIBUTES[(parent, attribute)] - except ModuleNotFoundError: + except (ModuleNotFoundError, AttributeError): log.debug("ERROR unwrapping %s.%s ", module, name) diff --git a/ddtrace/appsec/_iast/_patch_modules.py b/ddtrace/appsec/_iast/_patch_modules.py index e4b7a5488ce..ff0a8763482 100644 --- a/ddtrace/appsec/_iast/_patch_modules.py +++ b/ddtrace/appsec/_iast/_patch_modules.py @@ -10,8 +10,9 @@ The module uses wrapt's function wrapping capabilities to intercept calls to security-sensitive functions and enable taint tracking and vulnerability detection. """ - +import functools from typing import Callable +from typing import Optional from typing import Set from typing import Text @@ -21,6 +22,12 @@ from ddtrace.appsec._common_module_patches import try_wrap_function_wrapper from ddtrace.appsec._common_module_patches import wrap_object from ddtrace.appsec._iast._logs import iast_instrumentation_wrapt_debug_log +from ddtrace.appsec._iast.secure_marks import SecurityControl +from ddtrace.appsec._iast.secure_marks import get_security_controls_from_env +from ddtrace.appsec._iast.secure_marks.configuration import SC_SANITIZER +from ddtrace.appsec._iast.secure_marks.configuration import SC_VALIDATOR +from ddtrace.appsec._iast.secure_marks.sanitizers import create_sanitizer +from ddtrace.appsec._iast.secure_marks.validators import create_validator from ddtrace.internal.logger import get_logger from ddtrace.settings.asm import config as asm_config @@ -181,3 +188,55 @@ def _testing_unpatch_iast(): """ iast_funcs = WrapFunctonsForIAST() iast_funcs.testing_unpatch() + + +def _apply_custom_security_controls(iast_funcs: Optional[WrapFunctonsForIAST] = None): + """Apply custom security controls from DD_IAST_SECURITY_CONTROLS_CONFIGURATION environment variable.""" + try: + if iast_funcs is None: + iast_funcs = WrapFunctonsForIAST() + security_controls = get_security_controls_from_env() + + if not security_controls: + log.debug("No custom security controls configured") + return + + log.debug("Applying %s custom security controls", len(security_controls)) + + for control in security_controls: + try: + _apply_security_control(iast_funcs, control) + except Exception: + log.warning("Failed to apply security control %s", control, exc_info=True) + return iast_funcs + except Exception: + log.warning("Failed to load custom security controls", exc_info=True) + + +def _apply_security_control(iast_funcs: WrapFunctonsForIAST, control: SecurityControl): + """Apply a single security control configuration. + + Args: + control: SecurityControl object containing the configuration + """ + # Create the appropriate wrapper function + if control.control_type == SC_SANITIZER: + wrapper_func = functools.partial(create_sanitizer, control.vulnerability_types) + elif control.control_type == SC_VALIDATOR: + wrapper_func = functools.partial(create_validator, control.vulnerability_types, control.parameters) + else: + log.warning("Unknown control type: %s", control.control_type) + return + + iast_funcs.wrap_function( + control.module_path, + control.method_name, + wrapper_func, + ) + log.debug( + "Configured %s for %s.%s (vulnerabilities: %s)", + control.control_type, + control.module_path, + control.method_name, + [v.name for v in control.vulnerability_types], + ) diff --git a/ddtrace/appsec/_iast/main.py b/ddtrace/appsec/_iast/main.py index 9a46fab0215..a702b21c1bb 100644 --- a/ddtrace/appsec/_iast/main.py +++ b/ddtrace/appsec/_iast/main.py @@ -23,6 +23,7 @@ from wrapt import when_imported from ddtrace.appsec._iast._patch_modules import WrapFunctonsForIAST +from ddtrace.appsec._iast._patch_modules import _apply_custom_security_controls from ddtrace.appsec._iast.secure_marks import cmdi_sanitizer from ddtrace.appsec._iast.secure_marks import path_traversal_sanitizer from ddtrace.appsec._iast.secure_marks import sqli_sanitizer @@ -72,6 +73,9 @@ def patch_iast(patch_modules=IAST_PATCH): when_imported("hashlib")(_on_import_factory(module, "ddtrace.appsec._iast.taint_sinks.%s", raise_errors=False)) iast_funcs = WrapFunctonsForIAST() + + _apply_custom_security_controls(iast_funcs) + # CMDI sanitizers iast_funcs.wrap_function("shlex", "quote", cmdi_sanitizer) diff --git a/ddtrace/appsec/_iast/secure_marks/README_CONFIGURATION.md b/ddtrace/appsec/_iast/secure_marks/README_CONFIGURATION.md new file mode 100644 index 00000000000..5a98f8ebc52 --- /dev/null +++ b/ddtrace/appsec/_iast/secure_marks/README_CONFIGURATION.md @@ -0,0 +1,118 @@ +# IAST Security Controls Configuration + +This document explains how to configure custom security controls for IAST using the `DD_IAST_SECURITY_CONTROLS_CONFIGURATION` environment variable. + +## Overview + +The `DD_IAST_SECURITY_CONTROLS_CONFIGURATION` environment variable allows you to specify custom sanitizers and validators that IAST should recognize when analyzing your application for security vulnerabilities. + +## Format + +The configuration uses the following format: + +``` +CONTROL_TYPE:VULNERABILITY_TYPES:MODULE:METHOD[:PARAMETER_POSITIONS] +``` + +Multiple security controls are separated by semicolons (`;`). + +### Fields + +1. **CONTROL_TYPE**: Either `INPUT_VALIDATOR` or `SANITIZER` +2. **VULNERABILITY_TYPES**: Comma-separated list of vulnerability types or `*` for all types +3. **MODULE**: Python module path (e.g., `shlex`, `django.utils.http`) +4. **METHOD**: Method name to instrument +5. **PARAMETER_POSITIONS** (Optional): Zero-based parameter positions to validate (INPUT_VALIDATOR only) + +### Vulnerability Types + +Supported vulnerability types: +- `COMMAND_INJECTION` / `CMDI` +- `CODE_INJECTION` +- `SQL_INJECTION` / `SQLI` +- `XSS` +- `HEADER_INJECTION` +- `PATH_TRAVERSAL` +- `SSRF` +- `UNVALIDATED_REDIRECT` +- `INSECURE_COOKIE` +- `NO_HTTPONLY_COOKIE` +- `NO_SAMESITE_COOKIE` +- `WEAK_CIPHER` +- `WEAK_HASH` +- `WEAK_RANDOMNESS` +- `STACKTRACE_LEAK` + +Use `*` to apply to all vulnerability types. + +## Examples + +### Basic Examples + +#### Input Validator for Command Injection +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="INPUT_VALIDATOR:COMMAND_INJECTION:shlex:quote" +``` + +#### Sanitizer for XSS +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="SANITIZER:XSS:html:escape" +``` + +#### Multiple Vulnerability Types +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="INPUT_VALIDATOR:COMMAND_INJECTION,XSS:custom.validator:validate_input" +``` + +#### All Vulnerability Types +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="SANITIZER:*:custom.sanitizer:sanitize_all" +``` + +### Advanced Examples + +#### Multiple Security Controls +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="INPUT_VALIDATOR:COMMAND_INJECTION:shlex:quote;SANITIZER:XSS:html:escape;SANITIZER:SQLI:custom.db:escape_sql" +``` + +#### Validator with Specific Parameter Positions +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="INPUT_VALIDATOR:COMMAND_INJECTION:custom.validator:validate:0,2" +``` +This validates only the 1st and 3rd parameters (0-based indexing). + +#### Complex Configuration +```bash +export DD_IAST_SECURITY_CONTROLS_CONFIGURATION="INPUT_VALIDATOR:COMMAND_INJECTION,XSS:security.validators:validate_user_input:0,1;SANITIZER:SQLI:database.utils:escape_sql_string;SANITIZER:*:security.sanitizers:clean_all_inputs" +``` + +## How It Works + +### Input Validators +- **Purpose**: Mark input parameters as safe after validation +- **When to use**: When your function validates input and returns a boolean or throws an exception +- **Effect**: Parameters are marked as secure for the specified vulnerability types +- **Parameter positions**: Optionally specify which parameters to mark (0-based index) + +### Sanitizers +- **Purpose**: Mark return values as safe after sanitization +- **When to use**: When your function cleans/escapes input and returns the sanitized value +- **Effect**: Return value is marked as secure for the specified vulnerability types + +## Integration with Existing Controls + +Your custom security controls work alongside the built-in IAST security controls: + +- `shlex.quote` (Command injection sanitizer) +- `html.escape` (XSS sanitizer) +- Database escape functions (SQL injection sanitizers) +- Django validators (Various validators) +- And more... + +## Error Handling + +If there are errors in the configuration: +- Invalid configurations are logged and skipped +- The application continues to run with built-in security controls +- Check application logs for configuration warnings/errors \ No newline at end of file diff --git a/ddtrace/appsec/_iast/secure_marks/__init__.py b/ddtrace/appsec/_iast/secure_marks/__init__.py index d9692780aa5..72ac3a122b8 100644 --- a/ddtrace/appsec/_iast/secure_marks/__init__.py +++ b/ddtrace/appsec/_iast/secure_marks/__init__.py @@ -3,8 +3,15 @@ This package provides functions to mark values as secure from specific vulnerabilities. It includes both sanitizers (which transform and secure values) and validators (which verify values are secure). + +It also provides configuration capabilities for custom security controls via the +DD_IAST_SECURITY_CONTROLS_CONFIGURATION environment variable. """ +from .configuration import VULNERABILITY_TYPE_MAPPING +from .configuration import SecurityControl +from .configuration import get_security_controls_from_env +from .configuration import parse_security_controls_config from .sanitizers import cmdi_sanitizer from .sanitizers import path_traversal_sanitizer from .sanitizers import sqli_sanitizer @@ -22,4 +29,9 @@ "path_traversal_validator", "sqli_validator", "cmdi_validator", + # Configuration + "get_security_controls_from_env", + "parse_security_controls_config", + "SecurityControl", + "VULNERABILITY_TYPE_MAPPING", ] diff --git a/ddtrace/appsec/_iast/secure_marks/configuration.py b/ddtrace/appsec/_iast/secure_marks/configuration.py new file mode 100644 index 00000000000..1b62b5ff6cc --- /dev/null +++ b/ddtrace/appsec/_iast/secure_marks/configuration.py @@ -0,0 +1,204 @@ +"""Module for parsing and applying DD_IAST_SECURITY_CONTROLS_CONFIGURATION. + +This module handles the configuration of custom security controls via environment variables. +It supports both INPUT_VALIDATOR and SANITIZER types with configurable vulnerability types, +modules, methods, and parameter positions. + +Format: CONTROL_TYPE:VULNERABILITY_TYPES:MODULE:METHOD[:PARAMETER_POSITIONS] +Example: INPUT_VALIDATOR:COMMAND_INJECTION,XSS:shlex:quote +""" + +from typing import List +from typing import Optional + +from ddtrace.appsec._iast._taint_tracking import VulnerabilityType +from ddtrace.internal.logger import get_logger +from ddtrace.settings.asm import config as asm_config + + +log = get_logger(__name__) + +# Mapping from string names to VulnerabilityType enum values +VULNERABILITY_TYPE_MAPPING = { + "CODE_INJECTION": VulnerabilityType.CODE_INJECTION, + "COMMAND_INJECTION": VulnerabilityType.COMMAND_INJECTION, + "HEADER_INJECTION": VulnerabilityType.HEADER_INJECTION, + "UNVALIDATED_REDIRECT": VulnerabilityType.UNVALIDATED_REDIRECT, + "INSECURE_COOKIE": VulnerabilityType.INSECURE_COOKIE, + "NO_HTTPONLY_COOKIE": VulnerabilityType.NO_HTTPONLY_COOKIE, + "NO_SAMESITE_COOKIE": VulnerabilityType.NO_SAMESITE_COOKIE, + "PATH_TRAVERSAL": VulnerabilityType.PATH_TRAVERSAL, + "SQL_INJECTION": VulnerabilityType.SQL_INJECTION, + "SQLI": VulnerabilityType.SQL_INJECTION, # Alias + "SSRF": VulnerabilityType.SSRF, + "STACKTRACE_LEAK": VulnerabilityType.STACKTRACE_LEAK, + "WEAK_CIPHER": VulnerabilityType.WEAK_CIPHER, + "WEAK_HASH": VulnerabilityType.WEAK_HASH, + "WEAK_RANDOMNESS": VulnerabilityType.WEAK_RANDOMNESS, + "XSS": VulnerabilityType.XSS, +} + +SC_SANITIZER = "SANITIZER" +SC_VALIDATOR = "INPUT_VALIDATOR" + + +class SecurityControl: + """Represents a single security control configuration.""" + + def __init__( + self, + control_type: str, + vulnerability_types: List[VulnerabilityType], + module_path: str, + method_name: str, + parameters: Optional[List[int]] = None, + ): + """Initialize a security control configuration. + + Args: + control_type: Either SC_VALIDATOR or SC_SANITIZER + vulnerability_types: List of vulnerability types this control applies to + module_path: Python module path (e.g., "shlex", "django.utils.http") + method_name: Name of the method to wrap + parameters: Optional list of parameter types for overloaded methods + """ + self.control_type = control_type.upper() + self.vulnerability_types = vulnerability_types + self.module_path = module_path + self.method_name = method_name + self.parameters = parameters or [] + + if self.control_type not in (SC_VALIDATOR, SC_SANITIZER): + raise ValueError(f"Invalid control type: {control_type}") + + def __repr__(self): + return ( + f"SecurityControl(type={self.control_type}, " + f"vulns={[v.name for v in self.vulnerability_types]}, " + f"module={self.module_path}, method={self.method_name})" + ) + + +def parse_vulnerability_types(vuln_string: str) -> List[VulnerabilityType]: + """Parse comma-separated vulnerability types or '*' for all types. + + Args: + vuln_string: Comma-separated vulnerability type names or '*' + + Returns: + List of VulnerabilityType enum values + + Raises: + ValueError: If an unknown vulnerability type is specified + """ + if vuln_string.strip() == "*": + return list(VULNERABILITY_TYPE_MAPPING.values()) + + vulnerability_types = [] + for vuln_name in vuln_string.split(","): + vuln_name = vuln_name.strip().upper() + if vuln_name not in VULNERABILITY_TYPE_MAPPING: + raise ValueError(f"Unknown vulnerability type: {vuln_name}") + vulnerability_types.append(VULNERABILITY_TYPE_MAPPING[vuln_name]) + + return vulnerability_types + + +def parse_parameters(positions_string: str) -> List[int]: + """Parse comma-separated parameter positions. + + Args: + positions_string: Comma-separated parameter positions (e.g., "0,1,3") + + Returns: + List of integer positions + + Raises: + ValueError: If positions cannot be parsed as integers + """ + if not positions_string.strip(): + return [] + + try: + return [int(pos.strip()) for pos in positions_string.split(",")] + except ValueError as e: + raise ValueError(f"Invalid parameter positions: {positions_string}") from e + + +def parse_security_controls_config(config_string: str) -> List[SecurityControl]: + """Parse the DD_IAST_SECURITY_CONTROLS_CONFIGURATION environment variable. + + Args: + config_string: Configuration string with format: + CONTROL_TYPE:VULNERABILITY_TYPES:MODULE:METHOD[:PARAMETERS][:PARAMETER_POSITIONS] + + Returns: + List of SecurityControl objects + + Raises: + ValueError: If the configuration format is invalid + """ + if not config_string.strip(): + return [] + + security_controls = [] + + # Split by semicolon to get individual security controls + for control_config in config_string.split(";"): + control_config = control_config.strip() + if not control_config: + continue + + # Split by colon to get control fields + fields = control_config.split(":") + if len(fields) < 4: + log.warning("Invalid security control configuration (missing fields): %s", control_config) + continue + + try: + control_type = fields[0].strip() + vulnerability_types = parse_vulnerability_types(fields[1].strip()) + module_path = fields[2].strip() + method_name = fields[3].strip() + + # Optional fields + parameters = None + + if len(fields) > 4 and fields[4].strip(): + parameters = parse_parameters(fields[4]) + + security_control = SecurityControl( + control_type=control_type, + vulnerability_types=vulnerability_types, + module_path=module_path, + method_name=method_name, + parameters=parameters, + ) + + security_controls.append(security_control) + log.debug("Parsed security control: %s", security_control) + + except Exception: + log.warning("Failed to parse security control %s", control_config, exc_info=True) + continue + + return security_controls + + +def get_security_controls_from_env() -> List[SecurityControl]: + """Get security controls configuration from DD_IAST_SECURITY_CONTROLS_CONFIGURATION environment variable. + + Returns: + List of SecurityControl objects parsed from the environment variable + """ + config_string = asm_config._iast_security_controls + if not config_string: + return [] + + try: + controls = parse_security_controls_config(config_string) + log.info("Loaded %s custom security controls from environment", len(controls)) + return controls + except Exception: + log.error("Failed to parse DD_IAST_SECURITY_CONTROLS_CONFIGURATION", exc_info=True) + return [] diff --git a/ddtrace/appsec/_iast/secure_marks/sanitizers.py b/ddtrace/appsec/_iast/secure_marks/sanitizers.py index 2b28c60e037..bb067aac8af 100644 --- a/ddtrace/appsec/_iast/secure_marks/sanitizers.py +++ b/ddtrace/appsec/_iast/secure_marks/sanitizers.py @@ -6,6 +6,7 @@ from typing import Any from typing import Callable +from typing import List from typing import Sequence from ddtrace.appsec._iast._taint_tracking import VulnerabilityType @@ -13,17 +14,18 @@ def create_sanitizer( - vulnerability_type: VulnerabilityType, wrapped: Callable, instance: Any, args: Sequence, kwargs: dict + vulnerability_types: List[VulnerabilityType], wrapped: Callable, instance: Any, args: Sequence, kwargs: dict ) -> Callable: """Create a sanitizer function wrapper that marks return values as secure for a specific vulnerability type.""" # Apply the sanitizer function result = wrapped(*args, **kwargs) # If result is a string, mark it as secure - ranges = get_tainted_ranges(result) - if ranges: - for _range in ranges: - _range.add_secure_mark(vulnerability_type) + for vuln_type in vulnerability_types: + ranges = get_tainted_ranges(result) + if ranges: + for _range in ranges: + _range.add_secure_mark(vuln_type) return result @@ -40,7 +42,7 @@ def path_traversal_sanitizer(wrapped: Callable, instance: Any, args: Sequence, k Returns: The sanitized filename """ - return create_sanitizer(VulnerabilityType.PATH_TRAVERSAL, wrapped, instance, args, kwargs) + return create_sanitizer([VulnerabilityType.PATH_TRAVERSAL], wrapped, instance, args, kwargs) def xss_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> Any: @@ -55,7 +57,7 @@ def xss_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict Returns: The sanitized string """ - return create_sanitizer(VulnerabilityType.XSS, wrapped, instance, args, kwargs) + return create_sanitizer([VulnerabilityType.XSS], wrapped, instance, args, kwargs) def sqli_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> Any: @@ -70,7 +72,7 @@ def sqli_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dic Returns: The quoted SQL value """ - return create_sanitizer(VulnerabilityType.SQL_INJECTION, wrapped, instance, args, kwargs) + return create_sanitizer([VulnerabilityType.SQL_INJECTION], wrapped, instance, args, kwargs) def cmdi_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> Any: @@ -85,8 +87,8 @@ def cmdi_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dic Returns: The quoted shell command """ - return create_sanitizer(VulnerabilityType.COMMAND_INJECTION, wrapped, instance, args, kwargs) + return create_sanitizer([VulnerabilityType.COMMAND_INJECTION], wrapped, instance, args, kwargs) def header_injection_sanitizer(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> Any: - return create_sanitizer(VulnerabilityType.HEADER_INJECTION, wrapped, instance, args, kwargs) + return create_sanitizer([VulnerabilityType.HEADER_INJECTION], wrapped, instance, args, kwargs) diff --git a/ddtrace/appsec/_iast/secure_marks/validators.py b/ddtrace/appsec/_iast/secure_marks/validators.py index 0a6c7ebaa25..660344a5f68 100644 --- a/ddtrace/appsec/_iast/secure_marks/validators.py +++ b/ddtrace/appsec/_iast/secure_marks/validators.py @@ -6,6 +6,8 @@ from typing import Any from typing import Callable +from typing import List +from typing import Optional from typing import Sequence from ddtrace.appsec._iast._taint_tracking import VulnerabilityType @@ -13,24 +15,37 @@ def create_validator( - vulnerability_type: VulnerabilityType, wrapped: Callable, instance: Any, args: Sequence, kwargs: dict + vulnerability_types: List[VulnerabilityType], + parameter_positions: Optional[List[int]], + wrapped: Callable, + instance: Any, + args: Sequence, + kwargs: dict, ) -> Any: """Create a validator function wrapper that marks arguments as secure for a specific vulnerability type.""" # Apply the validator function result = wrapped(*args, **kwargs) + i = 0 for arg in args: + if parameter_positions != [] and isinstance(parameter_positions, list): + if i not in parameter_positions: + i += 1 + continue if isinstance(arg, str): ranges = get_tainted_ranges(arg) if ranges: for _range in ranges: - _range.add_secure_mark(vulnerability_type) + for vuln_type in vulnerability_types: + _range.add_secure_mark(vuln_type) + i += 1 for arg in kwargs.values(): if isinstance(arg, str): ranges = get_tainted_ranges(arg) if ranges: for _range in ranges: - _range.add_secure_mark(vulnerability_type) + for vuln_type in vulnerability_types: + _range.add_secure_mark(vuln_type) return result @@ -47,7 +62,7 @@ def path_traversal_validator(wrapped: Callable, instance: Any, args: Sequence, k Returns: True if validation passed, False otherwise """ - return create_validator(VulnerabilityType.PATH_TRAVERSAL, wrapped, instance, args, kwargs) + return create_validator([VulnerabilityType.PATH_TRAVERSAL], None, wrapped, instance, args, kwargs) def sqli_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> bool: @@ -62,7 +77,7 @@ def sqli_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dic Returns: True if validation passed, False otherwise """ - return create_validator(VulnerabilityType.SQL_INJECTION, wrapped, instance, args, kwargs) + return create_validator([VulnerabilityType.SQL_INJECTION], None, wrapped, instance, args, kwargs) def cmdi_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> bool: @@ -77,7 +92,7 @@ def cmdi_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dic Returns: True if validation passed, False otherwise """ - return create_validator(VulnerabilityType.COMMAND_INJECTION, wrapped, instance, args, kwargs) + return create_validator([VulnerabilityType.COMMAND_INJECTION], None, wrapped, instance, args, kwargs) def unvalidated_redirect_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> bool: @@ -92,11 +107,11 @@ def unvalidated_redirect_validator(wrapped: Callable, instance: Any, args: Seque Returns: True if validation passed, False otherwise """ - return create_validator(VulnerabilityType.UNVALIDATED_REDIRECT, wrapped, instance, args, kwargs) + return create_validator([VulnerabilityType.UNVALIDATED_REDIRECT], None, wrapped, instance, args, kwargs) def header_injection_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> bool: - return create_validator(VulnerabilityType.HEADER_INJECTION, wrapped, instance, args, kwargs) + return create_validator([VulnerabilityType.HEADER_INJECTION], None, wrapped, instance, args, kwargs) def ssrf_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dict) -> bool: @@ -111,4 +126,4 @@ def ssrf_validator(wrapped: Callable, instance: Any, args: Sequence, kwargs: dic Returns: True if validation passed, False otherwise """ - return create_validator(VulnerabilityType.SSRF, wrapped, instance, args, kwargs) + return create_validator([VulnerabilityType.SSRF], None, wrapped, instance, args, kwargs) diff --git a/ddtrace/settings/asm.py b/ddtrace/settings/asm.py index 0043a490305..1df56ba7f8b 100644 --- a/ddtrace/settings/asm.py +++ b/ddtrace/settings/asm.py @@ -140,6 +140,7 @@ class ASMConfig(DDConfig): ) _iast_lazy_taint = DDConfig.var(bool, IAST.LAZY_TAINT, default=False) _iast_deduplication_enabled = DDConfig.var(bool, "DD_IAST_DEDUPLICATION_ENABLED", default=True) + _iast_security_controls = DDConfig.var(str, "DD_IAST_SECURITY_CONTROLS_CONFIGURATION", default="") _iast_is_testing = False @@ -182,6 +183,7 @@ class ASMConfig(DDConfig): "_iast_debug", "_iast_propagation_debug", "_iast_telemetry_report_lvl", + "_iast_security_controls", "_iast_is_testing", "_ep_enabled", "_use_metastruct_for_triggers", diff --git a/docs/configuration.rst b/docs/configuration.rst index 89172c84cd5..79a8a66a0dd 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -564,6 +564,14 @@ AppSec default: "DES,Blowfish,RC2,RC4,IDEA" description: Weak cipher algorithms that should be reported, comma separated. + DD_IAST_SECURITY_CONTROLS_CONFIGURATION: + type: String + default: "" + description: | + Allows you to specify custom sanitizers and validators that IAST should recognize when + analyzing your application for security vulnerabilities. + See the `Security Controls `_ + documentation for more information about this feature. Test Visibility --------------- diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 75b8863efac..ee462ba1dc4 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -338,3 +338,4 @@ wsgi xfail yaaredis openai-agents +validators \ No newline at end of file diff --git a/releasenotes/notes/iast-security-controls-9cb913d485cd5e4b.yaml b/releasenotes/notes/iast-security-controls-9cb913d485cd5e4b.yaml new file mode 100644 index 00000000000..264fa976339 --- /dev/null +++ b/releasenotes/notes/iast-security-controls-9cb913d485cd5e4b.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Code Security (IAST): Handle IAST security controls custom validation and sanitization methods. + See the `Security Controls `_ documentation for more information about this feature. \ No newline at end of file diff --git a/tests/appsec/iast/secure_marks/__init__.py b/tests/appsec/iast/secure_marks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/appsec/iast/secure_marks/test_security_controls_configuration.py b/tests/appsec/iast/secure_marks/test_security_controls_configuration.py new file mode 100644 index 00000000000..d5653320940 --- /dev/null +++ b/tests/appsec/iast/secure_marks/test_security_controls_configuration.py @@ -0,0 +1,386 @@ +"""Tests for DD_IAST_SECURITY_CONTROLS_CONFIGURATION environment variable functionality.""" +import functools +import os +from unittest.mock import patch + +import pytest + +from ddtrace.appsec._iast._taint_tracking import VulnerabilityType +from ddtrace.appsec._iast.secure_marks.configuration import SC_SANITIZER +from ddtrace.appsec._iast.secure_marks.configuration import SC_VALIDATOR +from ddtrace.appsec._iast.secure_marks.configuration import VULNERABILITY_TYPE_MAPPING +from ddtrace.appsec._iast.secure_marks.configuration import SecurityControl +from ddtrace.appsec._iast.secure_marks.configuration import get_security_controls_from_env +from ddtrace.appsec._iast.secure_marks.configuration import parse_parameters +from ddtrace.appsec._iast.secure_marks.configuration import parse_security_controls_config +from ddtrace.appsec._iast.secure_marks.configuration import parse_vulnerability_types +from ddtrace.appsec._iast.secure_marks.sanitizers import create_sanitizer +from ddtrace.appsec._iast.secure_marks.validators import create_validator +from tests.utils import override_global_config + + +def test_vulnerability_type_mapping(): + """Test that all expected vulnerability types are mapped correctly.""" + expected_types = { + "CODE_INJECTION", + "COMMAND_INJECTION", + "HEADER_INJECTION", + "UNVALIDATED_REDIRECT", + "INSECURE_COOKIE", + "NO_HTTPONLY_COOKIE", + "NO_SAMESITE_COOKIE", + "PATH_TRAVERSAL", + "SQL_INJECTION", + "SQLI", # Alias + "SSRF", + "STACKTRACE_LEAK", + "WEAK_CIPHER", + "WEAK_HASH", + "WEAK_RANDOMNESS", + "XSS", + } + + assert set(VULNERABILITY_TYPE_MAPPING.keys()) == expected_types + assert VULNERABILITY_TYPE_MAPPING["SQLI"] == VulnerabilityType.SQL_INJECTION + + +def test_parse_vulnerability_types_single(): + """Test parsing a single vulnerability type.""" + result = parse_vulnerability_types("COMMAND_INJECTION") + assert result == [VulnerabilityType.COMMAND_INJECTION] + + +def test_parse_vulnerability_types_multiple(): + """Test parsing multiple vulnerability types.""" + result = parse_vulnerability_types("COMMAND_INJECTION,XSS,SQLI") + expected = [ + VulnerabilityType.COMMAND_INJECTION, + VulnerabilityType.XSS, + VulnerabilityType.SQL_INJECTION, + ] + assert result == expected + + +def test_parse_vulnerability_types_all(): + """Test parsing '*' for all vulnerability types.""" + result = parse_vulnerability_types("*") + assert len(result) == len(VULNERABILITY_TYPE_MAPPING) + assert VulnerabilityType.COMMAND_INJECTION in result + assert VulnerabilityType.XSS in result + + +def test_parse_vulnerability_types_invalid(): + """Test parsing invalid vulnerability type raises error.""" + with pytest.raises(ValueError, match="Unknown vulnerability type: INVALID_TYPE"): + parse_vulnerability_types("INVALID_TYPE") + + +def test_parse_parameter_positions_empty(): + """Test parsing empty parameter positions.""" + assert parse_parameters("") == [] + assert parse_parameters(" ") == [] + + +def test_parse_parameter_positions_single(): + """Test parsing single parameter position.""" + assert parse_parameters("0") == [0] + assert parse_parameters("3") == [3] + + +def test_parse_parameter_positions_multiple(): + """Test parsing multiple parameter positions.""" + assert parse_parameters("0,1,3") == [0, 1, 3] + assert parse_parameters("1, 2, 4") == [1, 2, 4] # With spaces + + +def test_parse_parameter_positions_invalid(): + """Test parsing invalid parameter positions raises error.""" + with pytest.raises(ValueError, match="Invalid parameter positions"): + parse_parameters("0,invalid,2") + + +def test_security_control_creation(): + """Test SecurityControl object creation.""" + control = SecurityControl( + control_type=SC_VALIDATOR, + vulnerability_types=[VulnerabilityType.COMMAND_INJECTION], + module_path="shlex", + method_name="quote", + ) + + assert control.control_type == SC_VALIDATOR + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert control.module_path == "shlex" + assert control.method_name == "quote" + assert control.parameters == [] + + +def test_security_control_invalid_type(): + """Test SecurityControl with invalid control type raises error.""" + with pytest.raises(ValueError, match="Invalid control type: INVALID"): + SecurityControl( + control_type="INVALID", + vulnerability_types=[VulnerabilityType.COMMAND_INJECTION], + module_path="shlex", + method_name="quote", + ) + + +def test_parse_security_controls_config_empty(): + """Test parsing empty configuration.""" + assert parse_security_controls_config("") == [] + assert parse_security_controls_config(" ") == [] + + +def test_parse_security_controls_config_single(): + """Test parsing single security control configuration.""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION:shlex:quote" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_VALIDATOR + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert control.module_path == "shlex" + assert control.method_name == "quote" + + +def test_parse_security_controls_config_multiple(): + """Test parsing multiple security control configurations.""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION:shlex:quote;" "SANITIZER:XSS,SQLI:html:escape" + result = parse_security_controls_config(config) + + assert len(result) == 2 + + # First control + assert result[0].control_type == SC_VALIDATOR + assert result[0].vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert result[0].module_path == "shlex" + assert result[0].method_name == "quote" + + # Second control + assert result[1].control_type == SC_SANITIZER + assert result[1].vulnerability_types == [VulnerabilityType.XSS, VulnerabilityType.SQL_INJECTION] + assert result[1].module_path == "html" + assert result[1].method_name == "escape" + + +def test_parse_security_controls_config_with_parameters(): + """Test parsing security control configuration with parameters.""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION:bar.foo:CustomValidator.validate:0,1" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_VALIDATOR + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert control.module_path == "bar.foo" + assert control.method_name == "CustomValidator.validate" + assert control.parameters == [0, 1] + + +def test_parse_security_controls_config_with_all_vulnerabilities(): + """Test parsing security control configuration with '*' for all vulnerabilities.""" + config = "SANITIZER:*:custom.sanitizer:clean_all" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_SANITIZER + assert len(control.vulnerability_types) == len(VULNERABILITY_TYPE_MAPPING) + assert VulnerabilityType.COMMAND_INJECTION in control.vulnerability_types + assert VulnerabilityType.XSS in control.vulnerability_types + + +def test_get_security_controls_from_env_empty(): + """Test getting security controls from empty environment variable.""" + result = get_security_controls_from_env() + assert result == [] + + +def test_get_security_controls_from_env_valid(): + """Test getting security controls from valid environment variable.""" + with override_global_config( + dict(_iast_security_controls="INPUT_VALIDATOR:COMMAND_INJECTION:shlex:quote;SANITIZER:XSS:html:escape") + ): + result = get_security_controls_from_env() + + assert len(result) == 2 + assert result[0].control_type == SC_VALIDATOR + assert result[0].module_path == "shlex" + assert result[1].control_type == SC_SANITIZER + assert result[1].module_path == "html" + + +@patch.dict(os.environ, {"DD_IAST_SECURITY_CONTROLS_CONFIGURATION": "INVALID:FORMAT"}) +def test_get_security_controls_from_env_invalid(): + """Test getting security controls from invalid environment variable.""" + result = get_security_controls_from_env() + assert result == [] # Should return empty list on parse error + + +def test_create_generic_sanitizer(): + """Test creating a generic sanitizer function.""" + vulnerability_types = [VulnerabilityType.COMMAND_INJECTION, VulnerabilityType.XSS] + sanitizer = functools.partial(create_sanitizer, vulnerability_types) + + assert callable(sanitizer) + + # Mock wrapped function + def mock_wrapped(*args, **kwargs): + return "sanitized_output" + + # Test the sanitizer + result = sanitizer(mock_wrapped, None, ["input"], {}) + assert result == "sanitized_output" + + +def test_create_generic_validator(): + """Test creating a generic validator function.""" + vulnerability_types = [VulnerabilityType.COMMAND_INJECTION] + validator = functools.partial(create_validator, vulnerability_types, None) + + assert callable(validator) + + # Mock wrapped function + def mock_wrapped(*args, **kwargs): + return True + + # Test the validator + result = validator(mock_wrapped, None, ["input"], {}) + assert result is True + + +def test_create_generic_validator_with_positions(): + """Test creating a generic validator function with specific parameter positions.""" + vulnerability_types = [VulnerabilityType.COMMAND_INJECTION] + parameter_positions = [0, 2] + validator = functools.partial(create_validator, vulnerability_types, parameter_positions) + + assert callable(validator) + + # Mock wrapped function + def mock_wrapped(*args, **kwargs): + return True + + # Test the validator + result = validator(mock_wrapped, None, ["input1", "input2", "input3"], {}) + assert result is True + + +def test_input_validator_example(): + """Test input validator example.""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION:bar.foo.CustomInputValidator:validate" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_VALIDATOR + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert control.module_path == "bar.foo.CustomInputValidator" + assert control.method_name == "validate" + + +def test_input_validator_with_positions_example(): + """Test input validator with parameter positions example.""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION:bar.foo.CustomInputValidator:validate:1,2" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_VALIDATOR + assert control.parameters == [1, 2] + + +def test_multiple_vulnerabilities_example(): + """Test multiple vulnerabilities example.""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION,CODE_INJECTION:bar.foo.CustomInputValidator:validate" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION, VulnerabilityType.CODE_INJECTION] + + +def test_all_vulnerabilities_example(): + """Test all vulnerabilities example.""" + config = "INPUT_VALIDATOR:*:bar.foo.CustomInputValidator:validate" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert len(control.vulnerability_types) > 10 # Should include all vulnerability types + + +def test_sanitizer_example(): + """Test sanitizer example.""" + config = "SANITIZER:COMMAND_INJECTION:bar.foo.CustomSanitizer:sanitize" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_SANITIZER + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert control.module_path == "bar.foo.CustomSanitizer" + assert control.method_name == "sanitize" + + +def test_nodejs_input_validator_example(): + """Test Node.js input validator example (adapted for Python).""" + config = "INPUT_VALIDATOR:COMMAND_INJECTION:bar.foo.custom_input_validator:validate" + result = parse_security_controls_config(config) + + assert len(result) == 1 + control = result[0] + assert control.control_type == SC_VALIDATOR + assert control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION] + assert control.module_path == "bar.foo.custom_input_validator" + assert control.method_name == "validate" + + +def test_complex_multi_control_example(): + """Test complex example with multiple controls.""" + config = ( + "INPUT_VALIDATOR:COMMAND_INJECTION,XSS:com.example:Validator.validateInput:0,1;" + "SANITIZER:*:com.example:Sanitizer.sanitizeInput" + ) + result = parse_security_controls_config(config) + + assert len(result) == 2 + + # First control - Input validator + validator_control = result[0] + assert validator_control.control_type == SC_VALIDATOR + assert validator_control.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION, VulnerabilityType.XSS] + assert validator_control.module_path == "com.example" + assert validator_control.method_name == "Validator.validateInput" + assert validator_control.parameters == [0, 1] + + # Second control - Sanitizer for all vulnerabilities + sanitizer_control = result[1] + assert sanitizer_control.control_type == SC_SANITIZER + assert len(sanitizer_control.vulnerability_types) > 10 + assert sanitizer_control.module_path == "com.example" + assert sanitizer_control.method_name == "Sanitizer.sanitizeInput" + + +def test_complex_example(): + """Test complex example with multiple controls.""" + config = ( + "INPUT_VALIDATOR:COMMAND_INJECTION,XSS:custom.validator:validate_input:0,1;" + "SANITIZER:*:custom.sanitizer:sanitize_all" + ) + result = parse_security_controls_config(config) + + assert len(result) == 2 + + # Input validator + validator = result[0] + assert validator.control_type == SC_VALIDATOR + assert validator.vulnerability_types == [VulnerabilityType.COMMAND_INJECTION, VulnerabilityType.XSS] + assert validator.parameters == [0, 1] + + # Sanitizer for all + sanitizer = result[1] + assert sanitizer.control_type == SC_SANITIZER + assert len(sanitizer.vulnerability_types) >= 10 diff --git a/tests/appsec/integrations/django_tests/django_app/urls.py b/tests/appsec/integrations/django_tests/django_app/urls.py index 29cde8165b7..b1f56ebf2f6 100644 --- a/tests/appsec/integrations/django_tests/django_app/urls.py +++ b/tests/appsec/integrations/django_tests/django_app/urls.py @@ -37,6 +37,11 @@ def shutdown(request): views.command_injection_secure_mark, name="command_injection_secure_mark", ), + handler( + "appsec/command-injection/security-control/$", + views.command_injection_security_control, + name="command_injection_security_control", + ), handler( "appsec/xss/secure-mark/$", views.xss_secure_mark, diff --git a/tests/appsec/integrations/django_tests/django_app/views.py b/tests/appsec/integrations/django_tests/django_app/views.py index afca0282757..32f4df57826 100644 --- a/tests/appsec/integrations/django_tests/django_app/views.py +++ b/tests/appsec/integrations/django_tests/django_app/views.py @@ -47,6 +47,14 @@ def assert_origin(parameter: Any, origin_type: Any) -> None: assert sources[0].origin == origin_type +def _security_control_sanitizer(parameter): + return parameter + + +def _security_control_validator(param1, param2, parameter_to_validate, param3): + return None + + def index(request): response = HttpResponse("Hello, test app.") response["my-response-header"] = "my_response_value" @@ -388,6 +396,15 @@ def command_injection_secure_mark(request): return HttpResponse("OK", status=200) +def command_injection_security_control(request): + value = request.body.decode() + _security_control_validator(None, None, value, None) + # label iast_command_injection + os.system("dir -l " + _security_control_sanitizer(value)) + + return HttpResponse("OK", status=200) + + def xss_secure_mark(request): value = request.body.decode() diff --git a/tests/appsec/integrations/django_tests/test_iast_django.py b/tests/appsec/integrations/django_tests/test_iast_django.py index d652fb96924..176c8c9e80c 100644 --- a/tests/appsec/integrations/django_tests/test_iast_django.py +++ b/tests/appsec/integrations/django_tests/test_iast_django.py @@ -7,6 +7,8 @@ from ddtrace.appsec._constants import IAST from ddtrace.appsec._constants import IAST_SPAN_TAGS from ddtrace.appsec._constants import STACK_TRACE +from ddtrace.appsec._iast._patch_modules import _apply_custom_security_controls +from ddtrace.appsec._iast._patch_modules import _testing_unpatch_iast from ddtrace.appsec._iast.constants import VULN_CMDI from ddtrace.appsec._iast.constants import VULN_HEADER_INJECTION from ddtrace.appsec._iast.constants import VULN_INSECURE_COOKIE @@ -15,8 +17,12 @@ from ddtrace.appsec._iast.constants import VULN_STACKTRACE_LEAK from ddtrace.appsec._iast.constants import VULN_UNVALIDATED_REDIRECT from ddtrace.settings.asm import config as asm_config +from tests.appsec.iast.iast_utils import _end_iast_context_and_oce +from tests.appsec.iast.iast_utils import _start_iast_context_and_oce from tests.appsec.iast.iast_utils import get_line_and_hash from tests.appsec.integrations.django_tests.utils import _aux_appsec_get_root_span +from tests.utils import TracerSpanContainer +from tests.utils import flaky from tests.utils import override_global_config @@ -939,6 +945,68 @@ def test_django_xss_secure_mark(client, iast_span, tracer): assert loaded is None +@pytest.mark.parametrize( + ("security_control", "match_function"), + [ + ( + "SANITIZER:COMMAND_INJECTION:tests.appsec.integrations.django_tests.django_app.views:_security_control_sanitizer", + True, + ), + ( + "INPUT_VALIDATOR:COMMAND_INJECTION:tests.appsec.integrations.django_tests.django_app.views:_security_control_validator:1,2", + True, + ), + ( + "INPUT_VALIDATOR:COMMAND_INJECTION:tests.appsec.integrations.django_tests.django_app.views:_security_control_validator:2", + True, + ), + ( + "INPUT_VALIDATOR:COMMAND_INJECTION:tests.appsec.integrations.django_tests.django_app.views:_security_control_validator:1,3,4", + False, + ), + ( + "INPUT_VALIDATOR:COMMAND_INJECTION:tests.appsec.integrations.django_tests.django_app.views:_security_control_validator:1,3", + False, + ), + ( + "INPUT_VALIDATOR:COMMAND_INJECTION:tests.appsec.integrations.django_tests.django_app.views:_security_control_validator", + True, + ), + ], +) +def test_django_command_injection_security_control(client, tracer, security_control, match_function): + with override_global_config( + dict( + _iast_enabled=True, + _appsec_enabled=False, + _iast_deduplication_enabled=False, + _iast_is_testing=True, + _iast_request_sampling=100.0, + _iast_security_controls=security_control, + ) + ): + _apply_custom_security_controls().patch() + span = TracerSpanContainer(tracer) + _start_iast_context_and_oce() + root_span, _ = _aux_appsec_get_root_span( + client, + span, + tracer, + url="/appsec/command-injection/security-control/", + payload="master", + content_type="application/json", + ) + + loaded = root_span.get_tag(IAST.JSON) + if match_function: + assert loaded is None + else: + assert loaded is not None + _end_iast_context_and_oce() + span.reset() + _testing_unpatch_iast() + + def test_django_header_injection_secure(client, iast_span, tracer): root_span, response = _aux_appsec_get_root_span( client, @@ -1234,6 +1302,7 @@ def test_django_stacktrace_leak(client, iast_span, tracer): assert vulnerability["hash"] +@flaky(until=1767220930, reason="This test fails on Python 3.10 and below, and on Django versions below 4.2") def test_django_stacktrace_from_technical_500_response(client, iast_span, tracer, debug_mode): root_span, response = _aux_appsec_get_root_span( client, diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index e3d1dfb84ec..7082fc0a1c8 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -398,6 +398,7 @@ def test_app_started_event_configuration_override(test_agent_session, run_python "[^\\-]+[\\-]{5}END[a-z\\s]+PRIVATE\\sKEY|ssh-rsa\\s*[a-z0-9\\/\\.+]{100,}", }, {"name": "DD_IAST_REQUEST_SAMPLING", "origin": "default", "value": 30.0}, + {"name": "DD_IAST_SECURITY_CONTROLS_CONFIGURATION", "origin": "default", "value": ""}, {"name": "DD_IAST_STACK_TRACE_ENABLED", "origin": "default", "value": True}, {"name": "DD_IAST_TELEMETRY_VERBOSITY", "origin": "default", "value": "INFORMATION"}, {"name": "DD_IAST_VULNERABILITIES_PER_REQUEST", "origin": "default", "value": 2},