Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ spdx-tools = ">=0.8.0,<0.9.0"
license-expression = ">=30.1.0,<31.0.0"
rustworkx = ">=0.13.0,<1.0.0"
pydantic = ">=2.0.0,<3.0.0"
asteval = "==1.0.5"
bc-detect-secrets = "==1.5.44"
urllib3 = "==1.26.20"
bc-python-hcl2 = "==0.4.3"
Expand Down
9 changes: 4 additions & 5 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
# exclude "']" one the right side of the compare via (?!']), this can happen with a base64 encoded string
COMPARE_REGEX = re.compile(r"^(?P<a>.+?)\s*(?P<operator>==|!=|>=|>|<=|<|&&|\|\|)\s*(?P<b>(?!']).+)$")
COMPARE_OPERATORS = (" == ", " != ", " < ", " <= ", " > ", " >= ", " && ", " || ")
REMOVE_TRAILING_COMMAS = re.compile(r',(\s*[}\]])')

CHECKOV_RENDER_MAX_LEN = force_int(os.getenv("CHECKOV_RENDER_MAX_LEN", "10000"))

Expand Down Expand Up @@ -85,10 +84,7 @@ def _eval_merge_as_list(eval_value: Any) -> Any:

def _try_evaluate(input_str: Union[str, bool]) -> Any:
try:
result = evaluate(input_str) # type:ignore[arg-type]
if result is None:
raise Exception(f"Can't evaluate {input_str}")
return result
return evaluate(input_str) # type:ignore[arg-type]
except Exception:
try:
return evaluate(f'"{input_str}"')
Expand All @@ -101,12 +97,7 @@ def _try_evaluate(input_str: Union[str, bool]) -> Any:
return json.loads(input_str)
return input_str
except Exception:
try:
# Remove trailing commas before } or ]
input_str_no_trailing = REMOVE_TRAILING_COMMAS.sub(r'\1', input_str) # type:ignore[arg-type]
return json.loads(input_str_no_trailing)
except Exception:
return input_str
return input_str


def replace_string_value(original_str: Any, str_to_replace: str, replaced_value: str, keep_origin: bool = True) -> Any:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from functools import reduce
from math import ceil, floor, log
from typing import Union, Any, Dict, Callable, List, Optional
from asteval import Interpreter

from checkov.terraform.parser_functions import tonumber, FUNCTION_FAILED, create_map, tobool, tostring

Expand Down Expand Up @@ -356,21 +355,13 @@ def terraform_try(*args: Any) -> Any:
SAFE_EVAL_DICT["formatdate"] = formatdate


def get_asteval() -> Interpreter:
# asteval provides a safer environment for evaluating expressions by restricting the operations to a secure subset, significantly reducing the risk of executing malicious code.
return Interpreter(
symtable=SAFE_EVAL_DICT,
use_numpy=False,
minimal=True
)


def evaluate(input_str: str) -> Any:
"""
Safely evaluate a Terraform-like function expression using a predefined function map.
Falls back gracefully if evaluation fails.
"""
if not input_str or input_str == "...":
count_underscores = input_str.count("__")
# We are operating under the assumption that the function name will start and end with "__", ensuring that we have at least two of them
if count_underscores >= 2:
logging.debug(f"got a substring with double underscore, which is not allowed. origin string: {input_str}")
return input_str
if input_str == "...":
# don't create an Ellipsis object
return input_str
if input_str.startswith("try"):
Expand All @@ -381,19 +372,11 @@ def evaluate(input_str: str) -> Any:
input_str = f"{TRY_STR_REPLACEMENT}{input_str[3:]}"
if input_str == "continue":
return input_str
asteval = get_asteval()
log_level = os.getenv("LOG_LEVEL")
should_log_asteval_errors = log_level == "DEBUG"
if RANGE_PATTERN.match(input_str):
temp_eval = asteval(input_str, show_errors=should_log_asteval_errors)
temp_eval = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec
evaluated = input_str if temp_eval < 0 else temp_eval
else:
evaluated = asteval(input_str, show_errors=should_log_asteval_errors)

if asteval.error:
error_messages = [err.get_error() for err in asteval.error]
raise ValueError(f"Safe evaluation error: {error_messages}")

evaluated = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec
return evaluated if not isinstance(evaluated, str) else remove_unicode_null(evaluated)


Expand Down
3 changes: 0 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,3 @@ ignore_missing_imports = True

[mypy-checkov.*]
follow_imports = skip

[mypy-asteval.*]
ignore_missing_imports = True
3 changes: 2 additions & 1 deletion performance_tests/test_checkov_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
'repo_name': 'terraform-aws-components',
'threshold': {
"Darwin": 19.0,
"Linux": 13.0,
"Linux": 11.0,
"Windows": 15.0,
}
},
Expand Down Expand Up @@ -114,5 +114,6 @@ def run_kubernetes_scan():
runner_registry = RunnerRegistry(banner, runner_filter, k8_runner())
reports = runner_registry.run(root_folder=test_files_dir)
assert len(reports) > 0

benchmark(run_kubernetes_scan)
assert benchmark.stats.stats.mean <= repo_threshold + (DEVIATION_PERCENT / 100) * repo_threshold
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def run(self) -> None:
"license-expression<31.0.0,>=30.1.0",
"rustworkx>=0.13.0,<1.0.0",
"pydantic<3.0.0,>=2.0.0",
"asteval==1.0.5",
"urllib3==1.26.20"
],
dependency_links=[], # keep it empty, needed for pipenv-setup
Expand Down
56 changes: 4 additions & 52 deletions tests/terraform/graph/variable_rendering/test_string_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from checkov.terraform.graph_builder.variable_rendering.evaluate_terraform import evaluate_terraform, \
replace_string_value, \
remove_interpolation, _find_new_value_for_interpolation
from checkov.terraform.graph_builder.variable_rendering.safe_eval_functions import evaluate, get_asteval
from checkov.terraform.graph_builder.variable_rendering.safe_eval_functions import evaluate


class TestTerraformEvaluation(TestCase):
Expand Down Expand Up @@ -42,7 +42,7 @@ def test_conditional_expression(self):
input_str = 'blocked == "allowed" ? True : False'
expected = False
self.assertEqual(expected, evaluate_terraform(input_str))

input_str = 'True == "true" ? True : False'
expected = True
self.assertEqual(expected, evaluate_terraform(input_str))
Expand Down Expand Up @@ -524,34 +524,10 @@ def test_try_then_merge_block(self):
result = evaluate_terraform(input_str)
self.assertEqual(expected, result)

def test_empty_string(self):
input_str = " "
expected = input_str
result = evaluate_terraform(input_str)
self.assertEqual(expected, result)

input_str = ""
expected = input_str
result = evaluate_terraform(input_str)
self.assertEqual(expected, result)

def test_dict_as_string(self):
expected = {'Statement': [
{'Action': ['lambda:CreateFunction', 'lambda:CreateEventSourceMapping', 'dynamodb:CreateTable'],
'Effect': 'Allow', 'Resource': '*'}], 'Version': '2012-10-17'}
input_str = ' { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:CreateFunction", "lambda:CreateEventSourceMapping", "dynamodb:CreateTable", ], "Resource": "*" } ] }'
result = evaluate_terraform(input_str)
assert result == expected

def test_continue_stays_the_same(self):
expected = "continue"
result = evaluate_terraform("continue")
self.assertEqual(expected, result)


@pytest.mark.parametrize(
"origin_str,str_to_replace,new_value,expected",
[

("${lookup({'a': ${local.protocol1}},\"a\",\"https\")}", '${local.protocol1}', 'local.protocol1', "'local.protocol1'"),
('${length(keys(var.identity)) > 0 ? [${var.identity}] : []}', '${var.identity}', 'var.identity', 'var.identity'),
],
Expand All @@ -566,32 +542,8 @@ def test_evaluate_range_pattern() -> None:

# Test range pattern
assert evaluate("1-10") == "1-10"
assert evaluate("5-25")== "5-25"
assert evaluate("5-25") == "5-25"
assert evaluate("10-5") == 5

# Test non-range pattern for comparison
assert evaluate("1+1") == 2


EVAL_DANGEROUS_INPUTS = [
("unicode_characters", "[{}._﹎class_﹎._﹎bases_﹎[0]._﹎subclasses_﹎()]"),
("generators_to_bypass_protections_code", "((gen:=(gen_list[0].gi_frame.f_back.f_back.f_globals for _ in [1,]),v:=gen_list.append(gen), gen_list[0])[2] for gen_list in [[]]).send(None).send(None).get('_'+'_builtins_'+'_')['_'+'_import_'+'_']('os').system('date >> /tmp/gen-example')"),
("system command", "__import__('os').system('rm -rf /')"),
("read passwd", "open('/etc/passwd').read()"),
("access builtins", "__builtins__.__import__('os').listdir('.')"),
("exec call", "exec('print(123)')"),
("create file", "__import__('os').popen('touch /tmp/hacked').read()"),
("subclasses", "().__class__.__bases__[0].__subclasses__()"),
("socket connection", "__import__('socket').socket().connect(('example.com', 80))"),
("lambda trick", "(lambda f: f('ls'))(__import__('os').system)")
]


@pytest.mark.parametrize("description, input_str", EVAL_DANGEROUS_INPUTS)
def test_evaluate_malicious_code(description: str, input_str: str)-> None:
expected = input_str
result = evaluate_terraform(input_str)
assert result == expected
asteval = get_asteval()
asteval(input_str)
assert asteval.error
Loading