diff --git a/pyproject.toml b/pyproject.toml index 74705364b..86cb0323f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", + "email-validator >=2.2.0,<3.0.0", ] keywords = [] # https://pypi.org/classifiers/ diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini index 0c31aaca7..0d08cc2e7 100644 --- a/src/macaron/config/defaults.ini +++ b/src/macaron/config/defaults.ini @@ -612,6 +612,9 @@ cost = 1.0 # The path to the file that contains the list of popular packages. popular_packages_path = +# A boolean value that determines whether to check the deliverability of the email address. +check_deliverability = True + # ==== The following sections are for source code analysis using Semgrep ==== # rulesets: a reference to a 'ruleset' in this section refers to a Semgrep .yaml file containing one or more rules. # rules: a reference to a 'rule' in this section refers to an individual rule ID, specified by the '- id:' field in diff --git a/src/macaron/malware_analyzer/README.md b/src/macaron/malware_analyzer/README.md index 7aeda9417..d3acff68d 100644 --- a/src/macaron/malware_analyzer/README.md +++ b/src/macaron/malware_analyzer/README.md @@ -56,6 +56,11 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b - **Description**: Checks if the package name is suspiciously similar to any package name in a predefined list of popular packages. The similarity check incorporates the Jaro-Winkler distance and considers keyboard layout proximity to identify potential typosquatting. - **Rule**: Return `HeuristicResult.FAIL` if the similarity ratio between the package name and any popular package name meets or exceeds a defined threshold; otherwise, return `HeuristicResult.PASS`. - **Dependency**: None. + +11. **Fake Email** + - **Description**: Checks if the package maintainer or author has a suspicious or invalid email. + - **Rule**: Return `HeuristicResult.FAIL` if the email is invalid; otherwise, return `HeuristicResult.PASS`. + - **Dependency**: None. ### Source Code Analysis with Semgrep **PyPI Source Code Analyzer** - **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code. diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py index eebce5764..c37f763a5 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py @@ -43,6 +43,9 @@ class Heuristics(str, Enum): #: Indicates that the package source code contains suspicious code patterns. SUSPICIOUS_PATTERNS = "suspicious_patterns" + #: Indicates that the package maintainer's email address is suspicious or invalid. + FAKE_EMAIL = "fake_email" + class HeuristicResult(str, Enum): """Result type indicating the outcome of a heuristic.""" diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py new file mode 100644 index 000000000..ff0509f68 --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py @@ -0,0 +1,128 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""The heuristic analyzer to check the email address of the package maintainers.""" + +import logging +import re + +from email_validator import EmailNotValidError, ValidatedEmail, validate_email + +from macaron.config.defaults import defaults +from macaron.errors import HeuristicAnalyzerValueError +from macaron.json_tools import JsonType, json_extract +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class FakeEmailAnalyzer(BaseHeuristicAnalyzer): + """Analyze the email address of the package maintainers.""" + + PATTERN = re.compile( + r"""\b # word‑boundary + [A-Za-z0-9]+ # first alpha‑numeric segment + (?:\.[A-Za-z0-9]+)* # optional “.segment” repeats + @ + [A-Za-z0-9]+ # domain name segment + (?:\.[A-Za-z0-9]+)* # optional sub‑domains + \.[A-Za-z]{2,} # top‑level domain (at least 2 letters) + \b""", + re.VERBOSE, + ) + + def __init__(self) -> None: + super().__init__( + name="fake_email_analyzer", + heuristic=Heuristics.FAKE_EMAIL, + depends_on=None, + ) + self.check_deliverability: bool = self._load_defaults() + + def _load_defaults(self) -> bool: + """Load the default values from defaults.ini.""" + section_name = "heuristic.pypi" + if defaults.has_section(section_name): + section = defaults[section_name] + return section.getboolean("check_deliverability", fallback=True) + return True + + def get_emails(self, email_field: str) -> list[str]: + """Extract emails from the given email field. + + Parameters + ---------- + email_field: str + The email field from which to extract emails. + + Returns + ------- + list[str] + A list of emails extracted from the email field. + """ + emails = self.PATTERN.findall(email_field) + return [email.strip() for email in emails if email.strip()] + + def is_valid_email(self, email: str) -> ValidatedEmail | None: + """Check if the email format is valid and the domain has MX records. + + Parameters + ---------- + email: str + The email address to check. + + Returns + ------- + ValidatedEmail | None + The validated email object if the email is valid, otherwise None. + """ + emailinfo = None + try: + emailinfo = validate_email(email, check_deliverability=self.check_deliverability) + except EmailNotValidError as err: + err_message = f"Invalid email address: {email}. Error: {err}" + logger.warning(err_message) + return emailinfo + + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the package. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + + Returns + ------- + tuple[HeuristicResult, dict[str, JsonType]]: + The result and related information collected during the analysis. + """ + package_json = pypi_package_json.package_json + if not package_json.get("info", {}): + raise HeuristicAnalyzerValueError("No package info available.") + + author_email = json_extract(package_json, ["info", "author_email"], str) + maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str) + + if not author_email and not maintainer_email: + return HeuristicResult.SKIP, {"message": "No author or maintainer email available."} + + validated_emails: list[JsonType] = [] + details = ["normalized", "local_part", "domain"] + + for email_field in [author_email, maintainer_email]: + if email_field: + emails = self.get_emails(email_field) + if not emails: + return HeuristicResult.FAIL, {"message": "no emails found in the email field"} + + for email in emails: + email_info = self.is_valid_email(email) + if not email_info: + return HeuristicResult.FAIL, {"invalid_email": email} + + validated_emails.append({key: getattr(email_info, key) for key in details}) + + return HeuristicResult.PASS, {"validated_emails": validated_emails} diff --git a/src/macaron/slsa_analyzer/build_tool/gradle.py b/src/macaron/slsa_analyzer/build_tool/gradle.py index 2cc491934..607e98579 100644 --- a/src/macaron/slsa_analyzer/build_tool/gradle.py +++ b/src/macaron/slsa_analyzer/build_tool/gradle.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Gradle class which inherits BaseBuildTool. @@ -122,7 +122,7 @@ def get_dep_analyzer(self) -> CycloneDxGradle: raise DependencyAnalyzerError("No default dependency analyzer is found.") if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_gradle")): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.", ) tool_name, tool_version = tuple( diff --git a/src/macaron/slsa_analyzer/build_tool/maven.py b/src/macaron/slsa_analyzer/build_tool/maven.py index 69323ad9c..e6c11c13e 100644 --- a/src/macaron/slsa_analyzer/build_tool/maven.py +++ b/src/macaron/slsa_analyzer/build_tool/maven.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Maven class which inherits BaseBuildTool. @@ -116,7 +116,7 @@ def get_dep_analyzer(self) -> CycloneDxMaven: raise DependencyAnalyzerError("No default dependency analyzer is found.") if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_maven")): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_maven')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_maven')} is not valid.", ) tool_name, tool_version = tuple( diff --git a/src/macaron/slsa_analyzer/build_tool/pip.py b/src/macaron/slsa_analyzer/build_tool/pip.py index 5abf0c0ba..c0e970ab9 100644 --- a/src/macaron/slsa_analyzer/build_tool/pip.py +++ b/src/macaron/slsa_analyzer/build_tool/pip.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Pip class which inherits BaseBuildTool. @@ -88,7 +88,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer: tool_name = "cyclonedx_py" if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.", ) return CycloneDxPython( resources_path=global_config.resources_path, diff --git a/src/macaron/slsa_analyzer/build_tool/poetry.py b/src/macaron/slsa_analyzer/build_tool/poetry.py index eeb54216b..54e3899f1 100644 --- a/src/macaron/slsa_analyzer/build_tool/poetry.py +++ b/src/macaron/slsa_analyzer/build_tool/poetry.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Poetry class which inherits BaseBuildTool. @@ -126,7 +126,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer: tool_name = "cyclonedx_py" if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.", ) return CycloneDxPython( resources_path=global_config.resources_path, diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index 8514a458d..646f7acc3 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -20,6 +20,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer +from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer @@ -358,6 +359,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: WheelAbsenceAnalyzer, AnomalousVersionAnalyzer, TyposquattingPresenceAnalyzer, + FakeEmailAnalyzer, ] # name used to query the result of all problog rules, so it can be accessed outside the model. @@ -425,6 +427,10 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: failed({Heuristics.ONE_RELEASE.value}), failed({Heuristics.ANOMALOUS_VERSION.value}). + % Package released recently with the a maintainer email address that is not valid. + {Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :- + quickUndetailed, + failed({Heuristics.FAKE_EMAIL.value}). % ----- Evaluation ----- % Aggregate result @@ -432,6 +438,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: {problog_result_access} :- trigger(malware_high_confidence_2). {problog_result_access} :- trigger(malware_high_confidence_3). {problog_result_access} :- trigger(malware_high_confidence_4). + {problog_result_access} :- trigger(malware_medium_confidence_3). {problog_result_access} :- trigger(malware_medium_confidence_2). {problog_result_access} :- trigger(malware_medium_confidence_1). query({problog_result_access}). diff --git a/tests/malware_analyzer/pypi/test_fake_email.py b/tests/malware_analyzer/pypi/test_fake_email.py new file mode 100644 index 000000000..f51fb4110 --- /dev/null +++ b/tests/malware_analyzer/pypi/test_fake_email.py @@ -0,0 +1,133 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Tests for the FakeEmailAnalyzer heuristic.""" + + +from unittest.mock import MagicMock + +import pytest + +from macaron.errors import HeuristicAnalyzerValueError +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult +from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + + +@pytest.fixture(name="analyzer") +def analyzer_() -> FakeEmailAnalyzer: + """Pytest fixture to create a FakeEmailAnalyzer instance.""" + return FakeEmailAnalyzer() + + +@pytest.fixture(name="pypi_package_json_asset_mock") +def pypi_package_json_asset_mock_() -> MagicMock: + """Pytest fixture for a mock PyPIPackageJsonAsset.""" + mock_asset = MagicMock(spec=PyPIPackageJsonAsset) + mock_asset.package_json = {} + return mock_asset + + +def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer skips if no author_email or maintainer_email is present.""" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.SKIP + assert info["message"] == "No author or maintainer email available." + + +def test_analyze_raises_error_for_missing_info_key( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock +) -> None: + """Test the analyzer raises an error if the 'info' key is missing in the PyPI data.""" + pypi_package_json_asset_mock.package_json = {} # No 'info' key + with pytest.raises(HeuristicAnalyzerValueError) as exc_info: + analyzer.analyze(pypi_package_json_asset_mock) + assert "No package info available." in str(exc_info.value) + + +def test_analyze_fail_no_email_found_in_field( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock +) -> None: + """Test the analyzer fails if an email field does not contain a parsable email address.""" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": "not an email", "maintainer_email": None}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert info == {"message": "no emails found in the email field"} + + +def test_analyze_fail_invalid_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test analyzer fails if the email field contains an invalid email format.""" + invalid_email = "user@example" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}} + + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert info == {"message": "no emails found in the email field"} + + +def test_analyze_pass_only_maintainer_email_valid( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock +) -> None: + """Test the analyzer passes if only a valid maintainer_email is present and deliverability is not checked.""" + email = "maintainer@example.net" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + + if analyzer.check_deliverability: + assert result == HeuristicResult.FAIL + assert info == {"invalid_email": email} + return + + assert result == HeuristicResult.PASS + assert info["validated_emails"] == [ + {"normalized": "maintainer@example.net", "local_part": "maintainer", "domain": "example.net"} + ] + + +def test_analyze_pass_both_emails_valid(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer passes if both emails are valid and deliverability is not checked.""" + author_email = "example@gmail.com" + author_local_part, author_domain = author_email.split("@") + maintainer_email = "maintainer@example.net" + maintainer_local_part, maintainer_domain = maintainer_email.split("@") + + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": author_email, "maintainer_email": maintainer_email} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + if analyzer.check_deliverability: + assert result == HeuristicResult.FAIL + assert info == {"invalid_email": maintainer_email} + return + + assert result == HeuristicResult.PASS + + validated_emails = info.get("validated_emails") + assert isinstance(validated_emails, list) + assert len(validated_emails) == 2 + assert {"normalized": author_email, "local_part": author_local_part, "domain": author_domain} in validated_emails + assert { + "normalized": maintainer_email, + "local_part": maintainer_local_part, + "domain": maintainer_domain, + } in validated_emails + + +def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer) -> None: + """Test is_valid_email returns None on failure.""" + result = analyzer.is_valid_email("invalid-email") + assert result is None + + +def test_get_emails(analyzer: FakeEmailAnalyzer) -> None: + """Test the get_emails method.""" + email_field = "test@example.com, another test " + expected = ["test@example.com", "another@example.org"] + assert analyzer.get_emails(email_field) == expected + + email_field_no_email = "this is not an email" + assert analyzer.get_emails(email_field_no_email) == [] + + email_field_empty = "" + assert analyzer.get_emails(email_field_empty) == []