diff --git a/Pipfile b/Pipfile index 926fbe2a7d..d835879173 100644 --- a/Pipfile +++ b/Pipfile @@ -82,7 +82,6 @@ spdx-tools = ">=0.8.0,<0.9.0" license-expression = ">=30.1.0,<31.0.0" rustworkx = ">=0.13.0,<1.0.0" pydantic = ">=2.0.0,<3.0.0" -asteval = "==1.0.5" bc-detect-secrets = "==1.5.44" urllib3 = "==1.26.20" bc-python-hcl2 = "==0.4.3" diff --git a/Pipfile.lock b/Pipfile.lock index 03fb7631a6..ff6b248cc1 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -164,14 +164,13 @@ "markers": "python_version >= '3.8'", "version": "==3.6.2" }, - "asteval": { + "async-timeout": { "hashes": [ - "sha256:082b95312578affc8a6d982f7d92b7ac5de05634985c87e7eedd3188d31149fa", - "sha256:bac3c8dd6d2b789e959cfec9bb296fb8338eec066feae618c462132701fbc665" + "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c", + "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3" ], - "index": "pypi", "markers": "python_version >= '3.8'", - "version": "==1.0.5" + "version": "==5.0.1" }, "async-timeout": { "hashes": [ diff --git a/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py b/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py index 56ab94ef6c..c6f902a069 100644 --- a/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py +++ b/checkov/terraform/graph_builder/variable_rendering/evaluate_terraform.py @@ -20,7 +20,6 @@ # exclude "']" one the right side of the compare via (?!']), this can happen with a base64 encoded string COMPARE_REGEX = re.compile(r"^(?P.+?)\s*(?P==|!=|>=|>|<=|<|&&|\|\|)\s*(?P(?!']).+)$") COMPARE_OPERATORS = (" == ", " != ", " < ", " <= ", " > ", " >= ", " && ", " || ") -REMOVE_TRAILING_COMMAS = re.compile(r',(\s*[}\]])') CHECKOV_RENDER_MAX_LEN = force_int(os.getenv("CHECKOV_RENDER_MAX_LEN", "10000")) @@ -85,10 +84,7 @@ def _eval_merge_as_list(eval_value: Any) -> Any: def _try_evaluate(input_str: Union[str, bool]) -> Any: try: - result = evaluate(input_str) # type:ignore[arg-type] - if result is None: - raise Exception(f"Can't evaluate {input_str}") - return result + return evaluate(input_str) # type:ignore[arg-type] except Exception: try: return evaluate(f'"{input_str}"') @@ -101,12 +97,7 @@ def _try_evaluate(input_str: Union[str, bool]) -> Any: return json.loads(input_str) return input_str except Exception: - try: - # Remove trailing commas before } or ] - input_str_no_trailing = REMOVE_TRAILING_COMMAS.sub(r'\1', input_str) # type:ignore[arg-type] - return json.loads(input_str_no_trailing) - except Exception: - return input_str + return input_str def replace_string_value(original_str: Any, str_to_replace: str, replaced_value: str, keep_origin: bool = True) -> Any: diff --git a/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py b/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py index db7c3ff4d9..3d7616a2a6 100644 --- a/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py +++ b/checkov/terraform/graph_builder/variable_rendering/safe_eval_functions.py @@ -8,7 +8,6 @@ from functools import reduce from math import ceil, floor, log from typing import Union, Any, Dict, Callable, List, Optional -from asteval import Interpreter from checkov.terraform.parser_functions import tonumber, FUNCTION_FAILED, create_map, tobool, tostring @@ -356,21 +355,13 @@ def terraform_try(*args: Any) -> Any: SAFE_EVAL_DICT["formatdate"] = formatdate -def get_asteval() -> Interpreter: - # asteval provides a safer environment for evaluating expressions by restricting the operations to a secure subset, significantly reducing the risk of executing malicious code. - return Interpreter( - symtable=SAFE_EVAL_DICT, - use_numpy=False, - minimal=True - ) - - def evaluate(input_str: str) -> Any: - """ - Safely evaluate a Terraform-like function expression using a predefined function map. - Falls back gracefully if evaluation fails. - """ - if not input_str or input_str == "...": + count_underscores = input_str.count("__") + # We are operating under the assumption that the function name will start and end with "__", ensuring that we have at least two of them + if count_underscores >= 2: + logging.debug(f"got a substring with double underscore, which is not allowed. origin string: {input_str}") + return input_str + if input_str == "...": # don't create an Ellipsis object return input_str if input_str.startswith("try"): @@ -381,19 +372,11 @@ def evaluate(input_str: str) -> Any: input_str = f"{TRY_STR_REPLACEMENT}{input_str[3:]}" if input_str == "continue": return input_str - asteval = get_asteval() - log_level = os.getenv("LOG_LEVEL") - should_log_asteval_errors = log_level == "DEBUG" if RANGE_PATTERN.match(input_str): - temp_eval = asteval(input_str, show_errors=should_log_asteval_errors) + temp_eval = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec evaluated = input_str if temp_eval < 0 else temp_eval else: - evaluated = asteval(input_str, show_errors=should_log_asteval_errors) - - if asteval.error: - error_messages = [err.get_error() for err in asteval.error] - raise ValueError(f"Safe evaluation error: {error_messages}") - + evaluated = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec return evaluated if not isinstance(evaluated, str) else remove_unicode_null(evaluated) diff --git a/mypy.ini b/mypy.ini index 856e9b28ac..cd42da8d15 100644 --- a/mypy.ini +++ b/mypy.ini @@ -19,6 +19,3 @@ ignore_missing_imports = True [mypy-checkov.*] follow_imports = skip - -[mypy-asteval.*] -ignore_missing_imports = True \ No newline at end of file diff --git a/performance_tests/test_checkov_performance.py b/performance_tests/test_checkov_performance.py index 21f65f31bf..cfe4cfb87a 100644 --- a/performance_tests/test_checkov_performance.py +++ b/performance_tests/test_checkov_performance.py @@ -18,7 +18,7 @@ 'repo_name': 'terraform-aws-components', 'threshold': { "Darwin": 19.0, - "Linux": 13.0, + "Linux": 11.0, "Windows": 15.0, } }, @@ -114,5 +114,6 @@ def run_kubernetes_scan(): runner_registry = RunnerRegistry(banner, runner_filter, k8_runner()) reports = runner_registry.run(root_folder=test_files_dir) assert len(reports) > 0 + benchmark(run_kubernetes_scan) assert benchmark.stats.stats.mean <= repo_threshold + (DEVIATION_PERCENT / 100) * repo_threshold diff --git a/setup.py b/setup.py index 20c741f1eb..b88f7c2f1a 100644 --- a/setup.py +++ b/setup.py @@ -107,7 +107,6 @@ def run(self) -> None: "license-expression<31.0.0,>=30.1.0", "rustworkx>=0.13.0,<1.0.0", "pydantic<3.0.0,>=2.0.0", - "asteval==1.0.5", "urllib3==1.26.20" ], dependency_links=[], # keep it empty, needed for pipenv-setup diff --git a/tests/terraform/graph/variable_rendering/test_string_evaluation.py b/tests/terraform/graph/variable_rendering/test_string_evaluation.py index 8d4a32c8d9..ad98e05355 100644 --- a/tests/terraform/graph/variable_rendering/test_string_evaluation.py +++ b/tests/terraform/graph/variable_rendering/test_string_evaluation.py @@ -7,7 +7,7 @@ from checkov.terraform.graph_builder.variable_rendering.evaluate_terraform import evaluate_terraform, \ replace_string_value, \ remove_interpolation, _find_new_value_for_interpolation -from checkov.terraform.graph_builder.variable_rendering.safe_eval_functions import evaluate, get_asteval +from checkov.terraform.graph_builder.variable_rendering.safe_eval_functions import evaluate class TestTerraformEvaluation(TestCase): @@ -42,7 +42,7 @@ def test_conditional_expression(self): input_str = 'blocked == "allowed" ? True : False' expected = False self.assertEqual(expected, evaluate_terraform(input_str)) - + input_str = 'True == "true" ? True : False' expected = True self.assertEqual(expected, evaluate_terraform(input_str)) @@ -524,34 +524,10 @@ def test_try_then_merge_block(self): result = evaluate_terraform(input_str) self.assertEqual(expected, result) - def test_empty_string(self): - input_str = " " - expected = input_str - result = evaluate_terraform(input_str) - self.assertEqual(expected, result) - - input_str = "" - expected = input_str - result = evaluate_terraform(input_str) - self.assertEqual(expected, result) - - def test_dict_as_string(self): - expected = {'Statement': [ - {'Action': ['lambda:CreateFunction', 'lambda:CreateEventSourceMapping', 'dynamodb:CreateTable'], - 'Effect': 'Allow', 'Resource': '*'}], 'Version': '2012-10-17'} - input_str = ' { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:CreateFunction", "lambda:CreateEventSourceMapping", "dynamodb:CreateTable", ], "Resource": "*" } ] }' - result = evaluate_terraform(input_str) - assert result == expected - - def test_continue_stays_the_same(self): - expected = "continue" - result = evaluate_terraform("continue") - self.assertEqual(expected, result) - - @pytest.mark.parametrize( "origin_str,str_to_replace,new_value,expected", [ + ("${lookup({'a': ${local.protocol1}},\"a\",\"https\")}", '${local.protocol1}', 'local.protocol1', "'local.protocol1'"), ('${length(keys(var.identity)) > 0 ? [${var.identity}] : []}', '${var.identity}', 'var.identity', 'var.identity'), ], @@ -566,32 +542,8 @@ def test_evaluate_range_pattern() -> None: # Test range pattern assert evaluate("1-10") == "1-10" - assert evaluate("5-25")== "5-25" + assert evaluate("5-25") == "5-25" assert evaluate("10-5") == 5 # Test non-range pattern for comparison assert evaluate("1+1") == 2 - - -EVAL_DANGEROUS_INPUTS = [ - ("unicode_characters", "[{}._﹎class_﹎._﹎bases_﹎[0]._﹎subclasses_﹎()]"), - ("generators_to_bypass_protections_code", "((gen:=(gen_list[0].gi_frame.f_back.f_back.f_globals for _ in [1,]),v:=gen_list.append(gen), gen_list[0])[2] for gen_list in [[]]).send(None).send(None).get('_'+'_builtins_'+'_')['_'+'_import_'+'_']('os').system('date >> /tmp/gen-example')"), - ("system command", "__import__('os').system('rm -rf /')"), - ("read passwd", "open('/etc/passwd').read()"), - ("access builtins", "__builtins__.__import__('os').listdir('.')"), - ("exec call", "exec('print(123)')"), - ("create file", "__import__('os').popen('touch /tmp/hacked').read()"), - ("subclasses", "().__class__.__bases__[0].__subclasses__()"), - ("socket connection", "__import__('socket').socket().connect(('example.com', 80))"), - ("lambda trick", "(lambda f: f('ls'))(__import__('os').system)") -] - - -@pytest.mark.parametrize("description, input_str", EVAL_DANGEROUS_INPUTS) -def test_evaluate_malicious_code(description: str, input_str: str)-> None: - expected = input_str - result = evaluate_terraform(input_str) - assert result == expected - asteval = get_asteval() - asteval(input_str) - assert asteval.error \ No newline at end of file