Skip to content

feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"problog >= 2.2.6,<3.0.0",
"cryptography >=44.0.0,<45.0.0",
"semgrep == 1.113.0",
"email-validator >=2.2.0,<3.0.0",
]
keywords = []
# https://pypi.org/classifiers/
Expand Down
3 changes: 3 additions & 0 deletions src/macaron/config/defaults.ini
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ cost = 1.0
# The path to the file that contains the list of popular packages.
popular_packages_path =

# A boolean value that determines whether to check the deliverability of the email address.
check_deliverability = True

# ==== The following sections are for source code analysis using Semgrep ====
# rulesets: a reference to a 'ruleset' in this section refers to a Semgrep .yaml file containing one or more rules.
# rules: a reference to a 'rule' in this section refers to an individual rule ID, specified by the '- id:' field in
Expand Down
5 changes: 5 additions & 0 deletions src/macaron/malware_analyzer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b
- **Description**: Checks if the package name is suspiciously similar to any package name in a predefined list of popular packages. The similarity check incorporates the Jaro-Winkler distance and considers keyboard layout proximity to identify potential typosquatting.
- **Rule**: Return `HeuristicResult.FAIL` if the similarity ratio between the package name and any popular package name meets or exceeds a defined threshold; otherwise, return `HeuristicResult.PASS`.
- **Dependency**: None.

11. **Fake Email**
- **Description**: Checks if the package maintainer or author has a suspicious or invalid email.
- **Rule**: Return `HeuristicResult.FAIL` if the email is invalid; otherwise, return `HeuristicResult.PASS`.
- **Dependency**: None.
### Source Code Analysis with Semgrep
**PyPI Source Code Analyzer**
- **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code.
Expand Down
3 changes: 3 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class Heuristics(str, Enum):
#: Indicates that the package source code contains suspicious code patterns.
SUSPICIOUS_PATTERNS = "suspicious_patterns"

#: Indicates that the package maintainer's email address is suspicious or invalid.
FAKE_EMAIL = "fake_email"


class HeuristicResult(str, Enum):
"""Result type indicating the outcome of a heuristic."""
Expand Down
128 changes: 128 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The heuristic analyzer to check the email address of the package maintainers."""

import logging
import re

from email_validator import EmailNotValidError, ValidatedEmail, validate_email

from macaron.config.defaults import defaults
from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
"""Analyze the email address of the package maintainers."""

PATTERN = re.compile(
r"""\b # word‑boundary
[A-Za-z0-9]+ # first alpha‑numeric segment
(?:\.[A-Za-z0-9]+)* # optional “.segment” repeats
@
[A-Za-z0-9]+ # domain name segment
(?:\.[A-Za-z0-9]+)* # optional sub‑domains
\.[A-Za-z]{2,} # top‑level domain (at least 2 letters)
\b""",
re.VERBOSE,
)

def __init__(self) -> None:
super().__init__(
name="fake_email_analyzer",
heuristic=Heuristics.FAKE_EMAIL,
depends_on=None,
)
self.check_deliverability: bool = self._load_defaults()

def _load_defaults(self) -> bool:
"""Load the default values from defaults.ini."""
section_name = "heuristic.pypi"
if defaults.has_section(section_name):
section = defaults[section_name]
return section.getboolean("check_deliverability", fallback=True)
return True

def get_emails(self, email_field: str) -> list[str]:
"""Extract emails from the given email field.

Parameters
----------
email_field: str
The email field from which to extract emails.

Returns
-------
list[str]
A list of emails extracted from the email field.
"""
emails = self.PATTERN.findall(email_field)
return [email.strip() for email in emails if email.strip()]

def is_valid_email(self, email: str) -> ValidatedEmail | None:
"""Check if the email format is valid and the domain has MX records.

Parameters
----------
email: str
The email address to check.

Returns
-------
ValidatedEmail | None
The validated email object if the email is valid, otherwise None.
"""
emailinfo = None
try:
emailinfo = validate_email(email, check_deliverability=self.check_deliverability)
except EmailNotValidError as err:
err_message = f"Invalid email address: {email}. Error: {err}"
logger.warning(err_message)
return emailinfo

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.

Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.
"""
package_json = pypi_package_json.package_json
if not package_json.get("info", {}):
raise HeuristicAnalyzerValueError("No package info available.")

author_email = json_extract(package_json, ["info", "author_email"], str)
Copy link
Member

@art1f1c3R art1f1c3R Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see here we're extracting the email fields and then running the is_valid_email function directly on what is present in that field. Have you tested if this works on fields where the email is not the only text present? I've got some PyPI JSON data from Django where author_email looks like this:

"author_email": "Django Software Foundation <foundation@djangoproject.com>"

So the text includes more than just the email. Here's another example from the black package:

"author_email": "Łukasz Langa <lukasz@langa.pl>"

It may also be a string of multiple emails as well, like the ultralytics package:

"author_email": "Glenn Jocher <glenn.jocher@ultralytics.com>, Jing Qiu <jing.qiu@ultralytics.com>"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str)

if not author_email and not maintainer_email:
return HeuristicResult.SKIP, {"message": "No author or maintainer email available."}

validated_emails: list[JsonType] = []
details = ["normalized", "local_part", "domain"]

for email_field in [author_email, maintainer_email]:
if email_field:
emails = self.get_emails(email_field)
if not emails:
return HeuristicResult.FAIL, {"message": "no emails found in the email field"}

for email in emails:
email_info = self.is_valid_email(email)
if not email_info:
return HeuristicResult.FAIL, {"invalid_email": email}

validated_emails.append({key: getattr(email_info, key) for key in details})

return HeuristicResult.PASS, {"validated_emails": validated_emails}
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/gradle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Gradle class which inherits BaseBuildTool.
Expand Down Expand Up @@ -122,7 +122,7 @@ def get_dep_analyzer(self) -> CycloneDxGradle:
raise DependencyAnalyzerError("No default dependency analyzer is found.")
if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_gradle")):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)

tool_name, tool_version = tuple(
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/maven.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Maven class which inherits BaseBuildTool.
Expand Down Expand Up @@ -116,7 +116,7 @@ def get_dep_analyzer(self) -> CycloneDxMaven:
raise DependencyAnalyzerError("No default dependency analyzer is found.")
if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_maven")):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_maven')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_maven')} is not valid.",
)

tool_name, tool_version = tuple(
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/pip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Pip class which inherits BaseBuildTool.
Expand Down Expand Up @@ -88,7 +88,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer:
tool_name = "cyclonedx_py"
if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)
return CycloneDxPython(
resources_path=global_config.resources_path,
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/poetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Poetry class which inherits BaseBuildTool.
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer:
tool_name = "cyclonedx_py"
if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)
return CycloneDxPython(
resources_path=global_config.resources_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer
Expand Down Expand Up @@ -358,6 +359,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
WheelAbsenceAnalyzer,
AnomalousVersionAnalyzer,
TyposquattingPresenceAnalyzer,
FakeEmailAnalyzer,
]

# name used to query the result of all problog rules, so it can be accessed outside the model.
Expand Down Expand Up @@ -425,13 +427,18 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
failed({Heuristics.ONE_RELEASE.value}),
failed({Heuristics.ANOMALOUS_VERSION.value}).

% Package released recently with the a maintainer email address that is not valid.
{Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :-
quickUndetailed,
failed({Heuristics.FAKE_EMAIL.value}).
% ----- Evaluation -----

% Aggregate result
{problog_result_access} :- trigger(malware_high_confidence_1).
{problog_result_access} :- trigger(malware_high_confidence_2).
{problog_result_access} :- trigger(malware_high_confidence_3).
{problog_result_access} :- trigger(malware_high_confidence_4).
{problog_result_access} :- trigger(malware_medium_confidence_3).
{problog_result_access} :- trigger(malware_medium_confidence_2).
{problog_result_access} :- trigger(malware_medium_confidence_1).
query({problog_result_access}).
Expand Down
133 changes: 133 additions & 0 deletions tests/malware_analyzer/pypi/test_fake_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for the FakeEmailAnalyzer heuristic."""


from unittest.mock import MagicMock

import pytest

from macaron.errors import HeuristicAnalyzerValueError
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset


@pytest.fixture(name="analyzer")
def analyzer_() -> FakeEmailAnalyzer:
"""Pytest fixture to create a FakeEmailAnalyzer instance."""
return FakeEmailAnalyzer()


@pytest.fixture(name="pypi_package_json_asset_mock")
def pypi_package_json_asset_mock_() -> MagicMock:
"""Pytest fixture for a mock PyPIPackageJsonAsset."""
mock_asset = MagicMock(spec=PyPIPackageJsonAsset)
mock_asset.package_json = {}
return mock_asset


def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if no author_email or maintainer_email is present."""
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No author or maintainer email available."


def test_analyze_raises_error_for_missing_info_key(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock
) -> None:
"""Test the analyzer raises an error if the 'info' key is missing in the PyPI data."""
pypi_package_json_asset_mock.package_json = {} # No 'info' key
with pytest.raises(HeuristicAnalyzerValueError) as exc_info:
analyzer.analyze(pypi_package_json_asset_mock)
assert "No package info available." in str(exc_info.value)


def test_analyze_fail_no_email_found_in_field(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock
) -> None:
"""Test the analyzer fails if an email field does not contain a parsable email address."""
pypi_package_json_asset_mock.package_json = {"info": {"author_email": "not an email", "maintainer_email": None}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.FAIL
assert info == {"message": "no emails found in the email field"}


def test_analyze_fail_invalid_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test analyzer fails if the email field contains an invalid email format."""
invalid_email = "user@example"
pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}}

result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.FAIL
assert info == {"message": "no emails found in the email field"}


def test_analyze_pass_only_maintainer_email_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock
) -> None:
"""Test the analyzer passes if only a valid maintainer_email is present and deliverability is not checked."""
email = "maintainer@example.net"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is example.net supposed to be a valid domain?

Copy link
Member Author

@AmineRaouane AmineRaouane Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no. The domain is technically valid, but it’s reserved by IANA and not intended for real-world use. So to ensure the email is actually usable by a real user, Should I add a list of reserved domains and TLDs and check against them before proceeding with validation or acceptance, or should I keep it as it is?

Copy link
Member

@art1f1c3R art1f1c3R Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the email validator packages has a test environment flag?

test_environment=False: If True, DNS-based deliverability checks are disabled and test and **.test domain names are permitted (see below). You can also set email_validator.TEST_ENVIRONMENT to True to turn it on for all calls by default.

Would that maybe be better in a unit test? If a user does use those IANA reserved domains shouldn't it fail if we use check_deliverability as they aren't set up to receive emails?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I tried to test both cases when deliverability is enabled and when it's not. Setting test_environment=False actually makes .test and similar test domains valid, so it's similar to the case when check_deliverability=False.

pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)

if analyzer.check_deliverability:
assert result == HeuristicResult.FAIL
assert info == {"invalid_email": email}
return

assert result == HeuristicResult.PASS
assert info["validated_emails"] == [
{"normalized": "maintainer@example.net", "local_part": "maintainer", "domain": "example.net"}
]


def test_analyze_pass_both_emails_valid(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer passes if both emails are valid and deliverability is not checked."""
author_email = "example@gmail.com"
author_local_part, author_domain = author_email.split("@")
maintainer_email = "maintainer@example.net"
maintainer_local_part, maintainer_domain = maintainer_email.split("@")

pypi_package_json_asset_mock.package_json = {
"info": {"author_email": author_email, "maintainer_email": maintainer_email}
}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
if analyzer.check_deliverability:
assert result == HeuristicResult.FAIL
assert info == {"invalid_email": maintainer_email}
return

assert result == HeuristicResult.PASS

validated_emails = info.get("validated_emails")
assert isinstance(validated_emails, list)
assert len(validated_emails) == 2
assert {"normalized": author_email, "local_part": author_local_part, "domain": author_domain} in validated_emails
assert {
"normalized": maintainer_email,
"local_part": maintainer_local_part,
"domain": maintainer_domain,
} in validated_emails


def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer) -> None:
"""Test is_valid_email returns None on failure."""
result = analyzer.is_valid_email("invalid-email")
assert result is None


def test_get_emails(analyzer: FakeEmailAnalyzer) -> None:
"""Test the get_emails method."""
email_field = "test@example.com, another test <another@example.org>"
expected = ["test@example.com", "another@example.org"]
assert analyzer.get_emails(email_field) == expected

email_field_no_email = "this is not an email"
assert analyzer.get_emails(email_field_no_email) == []

email_field_empty = ""
assert analyzer.get_emails(email_field_empty) == []
Loading