Skip to content
Merged
14 changes: 14 additions & 0 deletions cloudsplaining/command/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@
multiple=True,
type=click.Choice(["CRITICAL", "HIGH", "MEDIUM", "LOW", "NONE"], case_sensitive=False),
)
@click.option(
"-t",
"--flag-trust-policies",
required=False,
default=False,
is_flag=True,
help="Flag risky trust policies in roles.",
)
def scan(
input_file: str,
exclusions_file: str,
Expand All @@ -105,6 +113,7 @@ def scan(
flag_all_risky_actions: bool,
verbosity: int,
severity: list[str],
flag_trust_policies: bool,
) -> None: # pragma: no cover
"""
Given the path to account authorization details files and the exclusions config file, scan all inline and
Expand Down Expand Up @@ -142,6 +151,7 @@ def scan(
minimize=minimize,
flag_conditional_statements=flag_conditional_statements,
flag_resource_arn_statements=flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
severity=severity,
)
html_output_file = os.path.join(output, f"iam-report-{account_name}.html")
Expand Down Expand Up @@ -207,6 +217,7 @@ def scan_account_authorization_details(
return_json_results: Literal[True],
flag_conditional_statements: bool = ...,
flag_resource_arn_statements: bool = ...,
flag_trust_policies: bool = ...,
severity: list[str] | None = ...,
) -> dict[str, Any]: ...

Expand All @@ -222,6 +233,7 @@ def scan_account_authorization_details(
return_json_results: Literal[False] = ...,
flag_conditional_statements: bool = ...,
flag_resource_arn_statements: bool = ...,
flag_trust_policies: bool = ...,
severity: list[str] | None = ...,
) -> str: ...

Expand All @@ -236,6 +248,7 @@ def scan_account_authorization_details(
return_json_results: bool = False,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
flag_trust_policies: bool = False,
severity: list[str] | None = None,
) -> str | dict[str, Any]: # pragma: no cover
"""
Expand All @@ -250,6 +263,7 @@ def scan_account_authorization_details(
exclusions=exclusions,
flag_conditional_statements=flag_conditional_statements,
flag_resource_arn_statements=flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
severity=severity,
)
results = authorization_details.results
Expand Down
14 changes: 14 additions & 0 deletions cloudsplaining/command/scan_multi_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ def _accounts(self) -> dict[str, str]:
multiple=True,
type=click.Choice(["CRITICAL", "HIGH", "MEDIUM", "LOW", "NONE"], case_sensitive=False),
)
@click.option(
"-t",
"--flag-trust-policies",
required=False,
default=False,
is_flag=True,
help="Flag risky trust policies in roles.",
)
def scan_multi_account(
config_file: str,
profile: str,
Expand All @@ -131,6 +139,7 @@ def scan_multi_account(
flag_all_risky_actions: bool,
verbosity: int,
severity: list[str],
flag_trust_policies: bool,
) -> None:
"""Scan multiple accounts via AssumeRole"""
set_log_level(verbosity)
Expand Down Expand Up @@ -160,6 +169,7 @@ def scan_multi_account(
severity=severity,
flag_conditional_statements=flag_conditional_statements,
flag_resource_arn_statements=flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
)


Expand All @@ -174,6 +184,7 @@ def scan_accounts(
severity: list[str] | None = None,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
flag_trust_policies: bool = False,
) -> None:
"""Use this method as a library to scan multiple accounts"""
# TODO: Speed improvements? Multithreading? This currently runs sequentially.
Expand All @@ -187,6 +198,7 @@ def scan_accounts(
severity=severity,
flag_conditional_statements=flag_conditional_statements,
flag_resource_arn_statements=flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
)
html_report = HTMLReport(
account_id=target_account_id,
Expand Down Expand Up @@ -233,6 +245,7 @@ def scan_account(
severity: list[str] | None = None,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
flag_trust_policies: bool = False,
) -> dict[str, dict[str, Any]]:
"""Scan a target account in one shot"""
account_authorization_details = download_account_authorization_details(
Expand All @@ -247,6 +260,7 @@ def scan_account(
severity=severity,
flag_conditional_statements=flag_conditional_statements,
flag_resource_arn_statements=flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
)
results = authorization_details.results
return results
Expand Down
134 changes: 126 additions & 8 deletions cloudsplaining/scan/assume_role_policy_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
ResourceStatement,
)
from cloudsplaining.shared.constants import SERVICE_PREFIXES_WITH_COMPUTE_ROLES
from cloudsplaining.shared.exclusions import (
DEFAULT_EXCLUSIONS,
Exclusions,
)
from cloudsplaining.shared.utils import get_account_id_from_principal

logger = logging.getLogger(__name__)

Expand All @@ -25,35 +30,80 @@ class AssumeRolePolicyDocument(ResourcePolicyDocument):
It is a specialized version of a Resource-based policy
"""

def __init__(self, policy: dict[str, Any]) -> None:
def __init__(
self,
policy: dict[str, Any],
current_account_id: str | None = None,
exclusions: Exclusions = DEFAULT_EXCLUSIONS,
) -> None:
statement_structure = policy.get("Statement", [])
self.policy = policy
self.current_account_id = current_account_id
# We would actually need to define a proper base class with a generic type for statements
self.statements: list[AssumeRoleStatement] = [] # type:ignore[assignment]
self.exclusions = exclusions
# leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list
if not isinstance(statement_structure, list): # pragma: no cover
statement_structure = [statement_structure]

for statement in statement_structure:
self.statements.append(AssumeRoleStatement(statement))
self.statements.append(AssumeRoleStatement(statement, current_account_id, exclusions))

@property
def role_assumable_by_compute_services(self) -> list[str]:
"""Determines whether or not the role is assumed from a compute service, and if so which ones."""
assumable_by_compute_services = []
for statement in self.statements:
if statement.role_assumable_by_compute_services:
assumable_by_compute_services.extend(statement.role_assumable_by_compute_services)
return assumable_by_compute_services
return [
principal
for statement in self.statements
if statement.role_assumable_by_compute_services
for principal in statement.role_assumable_by_compute_services
]

@property
def role_assumable_by_cross_account_principals(self) -> list[str]:
"""Determines whether or not the role can be assumed from principals in other accounts, and if so which ones."""
return [
principal
for statement in self.statements
if statement.role_assumable_by_cross_account_principals
for principal in statement.role_assumable_by_cross_account_principals
]

@property
def role_assumable_by_any_principal(self) -> list[str]:
"""Determines whether or not the role can be assumed by any principal (*) or any AWS account root."""
return [
principal
for statement in self.statements
if statement.role_assumable_by_any_principal
for principal in statement.role_assumable_by_any_principal
]

@property
def role_assumable_by_any_principal_with_conditions(self) -> list[str]:
"""Determines whether or not the role can be assumed by any principal (*) or any AWS account root with conditions."""
return [
principal
for statement in self.statements
if statement.role_assumable_by_any_principal_with_conditions
for principal in statement.role_assumable_by_any_principal_with_conditions
]


class AssumeRoleStatement(ResourceStatement):
"""
Statements in an AssumeRole/Trust Policy document
"""

def __init__(self, statement: dict[str, Any]) -> None:
def __init__(
self,
statement: dict[str, Any],
current_account_id: str | None = None,
exclusions: Exclusions = DEFAULT_EXCLUSIONS,
) -> None:
super().__init__(statement=statement)
self.current_account_id = current_account_id
self.exclusions = exclusions

# self.not_principal = statement.get("NotPrincipal")
if statement.get("NotPrincipal"):
Expand Down Expand Up @@ -82,10 +132,78 @@ def role_assumable_by_compute_services(self) -> list[str]:
if "sts:AssumeRole".lower() not in lowercase_actions:
return []

# Effect must be Allow
if self.effect.lower() != "allow":
return []

assumable_by_compute_services = []
for principal in self.principals:
if principal.endswith(".amazonaws.com"):
service_prefix_to_evaluate = principal.split(".")[0]
if service_prefix_to_evaluate in SERVICE_PREFIXES_WITH_COMPUTE_ROLES:
assumable_by_compute_services.append(service_prefix_to_evaluate)
return assumable_by_compute_services

@property
def role_assumable_by_cross_account_principals(self) -> list[str]:
"""Determines whether or not the role can be assumed from principals in other accounts, and if so which ones."""
# sts:AssumeRole must be there
lowercase_actions = [x.lower() for x in self.actions]
if "sts:AssumeRole".lower() not in lowercase_actions:
return []

# Effect must be Allow
if self.effect.lower() != "allow":
return []

return [
principal
for principal in self.principals
if (principal_account_id := get_account_id_from_principal(principal))
and (self.current_account_id is None or principal_account_id != self.current_account_id)
and principal_account_id not in self.exclusions.known_accounts
]

@property
def role_assumable_by_any_principal(self) -> list[str]:
"""Determines whether or not the role can be assumed by any principal (*) or any AWS account root."""
# sts:AssumeRole must be there
lowercase_actions = [x.lower() for x in self.actions]
if "sts:AssumeRole".lower() not in lowercase_actions:
return []

# Effect must be Allow
if self.effect.lower() != "allow":
return []

# Must have no conditions
if self.statement.get("Condition"):
return []

# Check if any principal is "*" or "arn:aws:iam::*:root"
any_principals = [
principal for principal in self.principals if principal == "*" or principal == "arn:aws:iam::*:root"
]
return any_principals

@property
def role_assumable_by_any_principal_with_conditions(self) -> list[str]:
"""Determines whether or not the role can be assumed by any principal (*) or any AWS account root with conditions."""
# sts:AssumeRole must be there
lowercase_actions = [x.lower() for x in self.actions]
if "sts:AssumeRole".lower() not in lowercase_actions:
return []

# Effect must be Allow
if self.effect.lower() != "allow":
return []

# Must have conditions (opposite of role_assumable_by_any_principal)
if not self.statement.get("Condition"):
return []

# Check if any principal is "*" or "arn:aws:iam::*:root"
any_principals = [
principal for principal in self.principals if principal == "*" or principal == "arn:aws:iam::*:root"
]
return any_principals
3 changes: 3 additions & 0 deletions cloudsplaining/scan/authorization_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
exclusions: Exclusions = DEFAULT_EXCLUSIONS,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
flag_trust_policies: bool = False,
severity: list[str] | None = None,
) -> None:
"""
Expand All @@ -53,6 +54,7 @@ def __init__(
self.exclusions = exclusions
self.flag_conditional_statements = flag_conditional_statements
self.flag_resource_arn_statements = flag_resource_arn_statements
self.flag_trust_policies = flag_trust_policies

self.policies = ManagedPolicyDetails(
auth_json.get("Policies", []),
Expand Down Expand Up @@ -86,6 +88,7 @@ def __init__(
exclusions,
flag_conditional_statements=flag_conditional_statements,
flag_resource_arn_statements=flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
severity=severity,
)

Expand Down
Loading