Skip to content

feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"problog >= 2.2.6,<3.0.0",
"cryptography >=44.0.0,<45.0.0",
"semgrep == 1.113.0",
"email_validator >=2.2.0,<3.0.0",
]
keywords = []
# https://pypi.org/classifiers/
Expand Down
5 changes: 5 additions & 0 deletions src/macaron/malware_analyzer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b
- **Description**: Checks if the package name is suspiciously similar to any package name in a predefined list of popular packages. The similarity check incorporates the Jaro-Winkler distance and considers keyboard layout proximity to identify potential typosquatting.
- **Rule**: Return `HeuristicResult.FAIL` if the similarity ratio between the package name and any popular package name meets or exceeds a defined threshold; otherwise, return `HeuristicResult.PASS`.
- **Dependency**: None.

11. **Fake Email**
- **Description**: Checks if the package maintainer or author has a suspicious or invalid email.
- **Rule**: Return `HeuristicResult.FAIL` if the email is invalid; otherwise, return `HeuristicResult.PASS`.
- **Dependency**: None.
### Source Code Analysis with Semgrep
**PyPI Source Code Analyzer**
- **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code.
Expand Down
3 changes: 3 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class Heuristics(str, Enum):
#: Indicates that the package source code contains suspicious code patterns.
SUSPICIOUS_PATTERNS = "suspicious_patterns"

#: Indicates that the package maintainer's email address is suspicious or invalid.
FAKE_EMAIL = "fake_email"


class HeuristicResult(str, Enum):
"""Result type indicating the outcome of a heuristic."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The heuristic analyzer to check the email address of the package maintainers."""

import logging

from email_validator import EmailNotValidError, ValidatedEmail, validate_email

from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
"""Analyze the email address of the package maintainers."""

def __init__(self) -> None:
super().__init__(
name="fake_email_analyzer",
heuristic=Heuristics.FAKE_EMAIL,
depends_on=None,
)

def is_valid_email(self, email: str) -> ValidatedEmail | None:
"""Check if the email format is valid and the domain has MX records.

Parameters
----------
email: str
The email address to check.

Returns
-------
ValidatedEmail | None
The validated email object if the email is valid, otherwise None.

Raises
------
HeuristicAnalyzerValueError
if the failure is due to DNS resolution.
"""
emailinfo = None
try:
emailinfo = validate_email(email, check_deliverability=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please make check_deliverability configurable via defaults.ini so that you can turn it off in unit tests? We try to avoid network calls in our unit tests.

except EmailNotValidError as err:
err_message = f"Invalid email address: {email}. Error: {err}"
logger.warning(err_message)
return emailinfo

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.

Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.

Raises
------
HeuristicAnalyzerValueError
if the analysis fails.
"""
package_json = pypi_package_json.package_json
if not package_json.get("info", {}):
return HeuristicResult.SKIP, {"message": "No package info available."}

author_email = json_extract(package_json, ["info", "author_email"], str)
maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str)

if not author_email and not maintainer_email:
return HeuristicResult.SKIP, {"message": "No author or maintainer email available."}

validated_emails: list[JsonType] = []
details = ["normalized", "local_part", "domain"]

for email in [author_email, maintainer_email]:
if email:
email_info = self.is_valid_email(email)
if not email_info:
return HeuristicResult.FAIL, {"email": email}

validated_emails.append({key: getattr(email_info, key) for key in details})

return HeuristicResult.PASS, {"validated_emails": validated_emails}
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/gradle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Gradle class which inherits BaseBuildTool.
Expand Down Expand Up @@ -122,7 +122,7 @@ def get_dep_analyzer(self) -> CycloneDxGradle:
raise DependencyAnalyzerError("No default dependency analyzer is found.")
if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_gradle")):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)

tool_name, tool_version = tuple(
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/maven.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Maven class which inherits BaseBuildTool.
Expand Down Expand Up @@ -116,7 +116,7 @@ def get_dep_analyzer(self) -> CycloneDxMaven:
raise DependencyAnalyzerError("No default dependency analyzer is found.")
if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_maven")):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_maven')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_maven')} is not valid.",
)

tool_name, tool_version = tuple(
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/pip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Pip class which inherits BaseBuildTool.
Expand Down Expand Up @@ -88,7 +88,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer:
tool_name = "cyclonedx_py"
if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)
return CycloneDxPython(
resources_path=global_config.resources_path,
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/poetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Poetry class which inherits BaseBuildTool.
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer:
tool_name = "cyclonedx_py"
if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)
return CycloneDxPython(
resources_path=global_config.resources_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer
Expand Down Expand Up @@ -358,6 +359,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
WheelAbsenceAnalyzer,
AnomalousVersionAnalyzer,
TyposquattingPresenceAnalyzer,
FakeEmailAnalyzer,
]

# name used to query the result of all problog rules, so it can be accessed outside the model.
Expand Down Expand Up @@ -425,13 +427,18 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
failed({Heuristics.ONE_RELEASE.value}),
failed({Heuristics.ANOMALOUS_VERSION.value}).

% Package released recently with the a maintainer email address that is not valid.
{Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :-
quickUndetailed,
failed({Heuristics.FAKE_EMAIL.value}).
% ----- Evaluation -----

% Aggregate result
{problog_result_access} :- trigger(malware_high_confidence_1).
{problog_result_access} :- trigger(malware_high_confidence_2).
{problog_result_access} :- trigger(malware_high_confidence_3).
{problog_result_access} :- trigger(malware_high_confidence_4).
{problog_result_access} :- trigger(malware_medium_confidence_3).
{problog_result_access} :- trigger(malware_medium_confidence_2).
{problog_result_access} :- trigger(malware_medium_confidence_1).
query({problog_result_access}).
Expand Down
142 changes: 142 additions & 0 deletions tests/malware_analyzer/pypi/test_fake_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for the FakeEmailAnalyzer heuristic."""


from collections.abc import Generator
from unittest.mock import MagicMock, patch

import pytest
from email_validator import EmailNotValidError

from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset


@pytest.fixture(name="analyzer")
def analyzer_() -> FakeEmailAnalyzer:
"""Pytest fixture to create a FakeEmailAnalyzer instance."""
return FakeEmailAnalyzer()


@pytest.fixture(name="pypi_package_json_asset_mock")
def pypi_package_json_asset_mock_fixture() -> MagicMock:
"""Pytest fixture for a mock PyPIPackageJsonAsset."""
mock_asset = MagicMock(spec=PyPIPackageJsonAsset)
mock_asset.package_json = {}
return mock_asset


@pytest.fixture(name="mock_validate_email")
def mock_validate_email_fixture() -> Generator[MagicMock]:
"""Patch validate_email and mock its behavior."""
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.validate_email") as mock:
yield mock


def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if no author_email or maintainer_email is present."""
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No author or maintainer email available."


def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if 'info' key is missing in PyPI data."""
pypi_package_json_asset_mock.package_json = {} # No 'info' key
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No package info available."


def test_analyze_fail_invalid_email(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
) -> None:
"""Test analyzer fails for an invalid email format."""
invalid_email = "invalid-email"
pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}}
mock_validate_email.side_effect = EmailNotValidError("Invalid email.")

result, info = analyzer.analyze(pypi_package_json_asset_mock)

assert result == HeuristicResult.FAIL
assert info == {"email": invalid_email}
mock_validate_email.assert_called_once_with(invalid_email, check_deliverability=True)


def test_analyze_pass_only_maintainer_email_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
) -> None:
"""Test analyzer passes when only maintainer_email is present and valid."""
email = "maintainer@example.net"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is example.net supposed to be a valid domain?

Copy link
Member Author

@AmineRaouane AmineRaouane Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no. The domain is technically valid, but it’s reserved by IANA and not intended for real-world use. So to ensure the email is actually usable by a real user, Should I add a list of reserved domains and TLDs and check against them before proceeding with validation or acceptance, or should I keep it as it is?

Copy link
Member

@art1f1c3R art1f1c3R Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the email validator packages has a test environment flag?

test_environment=False: If True, DNS-based deliverability checks are disabled and test and **.test domain names are permitted (see below). You can also set email_validator.TEST_ENVIRONMENT to True to turn it on for all calls by default.

Would that maybe be better in a unit test? If a user does use those IANA reserved domains shouldn't it fail if we use check_deliverability as they aren't set up to receive emails?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I tried to test both cases when deliverability is enabled and when it's not. Setting test_environment=False actually makes .test and similar test domains valid, so it's similar to the case when check_deliverability=False.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. check_deliverability=True will mean it'll make a network connection right? This could cause some issues with running the unit tests offline. Would it be possible to either mock a network response, or otherwise put this in an integration test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just switch it here and it will follow the other path, so it won’t trigger the network connection.

pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}}

mock_email_info = MagicMock()
mock_email_info.normalized = "maintainer@example.net"
mock_email_info.local_part = "maintainer"
mock_email_info.domain = "example.net"
mock_validate_email.return_value = mock_email_info

result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.PASS
assert info["validated_emails"] == [
{"normalized": "maintainer@example.net", "local_part": "maintainer", "domain": "example.net"}
]
mock_validate_email.assert_called_once_with(email, check_deliverability=True)


def test_analyze_pass_both_emails_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
) -> None:
"""Test the analyzer passes when both emails are present and valid."""

def side_effect(email: str, check_deliverability: bool) -> MagicMock: # pylint: disable=unused-argument
local_part, domain = email.split("@")
mock_email_info = MagicMock()
mock_email_info.normalized = email
mock_email_info.local_part = local_part
mock_email_info.domain = domain
return mock_email_info

mock_validate_email.side_effect = side_effect

pypi_package_json_asset_mock.package_json = {
"info": {"author_email": "author@example.com", "maintainer_email": "maintainer@example.net"}
}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.PASS
assert mock_validate_email.call_count == 2

validated_emails = info.get("validated_emails")
assert isinstance(validated_emails, list)
assert len(validated_emails) == 2
assert {"normalized": "author@example.com", "local_part": "author", "domain": "example.com"} in validated_emails
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question, if example.com is supposed to be a valid domain.

assert {
"normalized": "maintainer@example.net",
"local_part": "maintainer",
"domain": "example.net",
} in validated_emails


def test_is_valid_email_success(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None:
"""Test is_valid_email returns the validation object on success."""
mock_validated_email = MagicMock()
mock_validated_email.normalized = "test@example.com"
mock_validated_email.local_part = "test"
mock_validated_email.domain = "example.com"

mock_validate_email.return_value = mock_validated_email
result = analyzer.is_valid_email("test@example.com")
assert result == mock_validated_email
mock_validate_email.assert_called_once_with("test@example.com", check_deliverability=True)


def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None:
"""Test is_valid_email returns None on failure."""
mock_validate_email.side_effect = EmailNotValidError("The email address is not valid.")
result = analyzer.is_valid_email("invalid-email")
assert result is None
mock_validate_email.assert_called_once_with("invalid-email", check_deliverability=True)
Loading