-
Notifications
You must be signed in to change notification settings - Fork 29
feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
34a69a0
18715d1
59f4c61
a8d373b
468d67e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""The heuristic analyzer to check the email address of the package maintainers.""" | ||
|
||
import logging | ||
import re | ||
|
||
from email_validator import EmailNotValidError, ValidatedEmail, validate_email | ||
|
||
from macaron.config.defaults import defaults | ||
from macaron.errors import HeuristicAnalyzerValueError | ||
from macaron.json_tools import JsonType, json_extract | ||
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
class FakeEmailAnalyzer(BaseHeuristicAnalyzer): | ||
"""Analyze the email address of the package maintainers.""" | ||
|
||
PATTERN = re.compile( | ||
r"""\b # word‑boundary | ||
[A-Za-z0-9]+ # first alpha‑numeric segment | ||
(?:\.[A-Za-z0-9]+)* # optional “.segment” repeats | ||
@ | ||
[A-Za-z0-9]+ # domain name segment | ||
(?:\.[A-Za-z0-9]+)* # optional sub‑domains | ||
\.[A-Za-z]{2,} # top‑level domain (at least 2 letters) | ||
\b""", | ||
re.VERBOSE, | ||
) | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
name="fake_email_analyzer", | ||
heuristic=Heuristics.FAKE_EMAIL, | ||
depends_on=None, | ||
) | ||
self.check_deliverability: bool = self._load_defaults() | ||
|
||
def _load_defaults(self) -> bool: | ||
"""Load the default values from defaults.ini.""" | ||
section_name = "heuristic.pypi" | ||
if defaults.has_section(section_name): | ||
section = defaults[section_name] | ||
return section.getboolean("check_deliverability", fallback=True) | ||
return True | ||
|
||
def get_emails(self, email_field: str) -> list[str]: | ||
"""Extract emails from the given email field. | ||
|
||
Parameters | ||
---------- | ||
email_field: str | ||
The email field from which to extract emails. | ||
|
||
Returns | ||
------- | ||
list[str] | ||
A list of emails extracted from the email field. | ||
""" | ||
emails = self.PATTERN.findall(email_field) | ||
return [email.strip() for email in emails if email.strip()] | ||
|
||
def is_valid_email(self, email: str) -> ValidatedEmail | None: | ||
"""Check if the email format is valid and the domain has MX records. | ||
|
||
Parameters | ||
---------- | ||
email: str | ||
The email address to check. | ||
|
||
Returns | ||
------- | ||
ValidatedEmail | None | ||
The validated email object if the email is valid, otherwise None. | ||
""" | ||
emailinfo = None | ||
try: | ||
emailinfo = validate_email(email, check_deliverability=self.check_deliverability) | ||
except EmailNotValidError as err: | ||
err_message = f"Invalid email address: {email}. Error: {err}" | ||
logger.warning(err_message) | ||
return emailinfo | ||
|
||
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: | ||
"""Analyze the package. | ||
|
||
Parameters | ||
---------- | ||
pypi_package_json: PyPIPackageJsonAsset | ||
The PyPI package JSON asset object. | ||
|
||
Returns | ||
------- | ||
tuple[HeuristicResult, dict[str, JsonType]]: | ||
The result and related information collected during the analysis. | ||
""" | ||
package_json = pypi_package_json.package_json | ||
if not package_json.get("info", {}): | ||
raise HeuristicAnalyzerValueError("No package info available.") | ||
|
||
author_email = json_extract(package_json, ["info", "author_email"], str) | ||
maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str) | ||
|
||
if not author_email and not maintainer_email: | ||
return HeuristicResult.SKIP, {"message": "No author or maintainer email available."} | ||
|
||
validated_emails: list[JsonType] = [] | ||
details = ["normalized", "local_part", "domain"] | ||
|
||
for email_field in [author_email, maintainer_email]: | ||
if email_field: | ||
emails = self.get_emails(email_field) | ||
if not emails: | ||
return HeuristicResult.FAIL, {"message": "no emails found in the email field"} | ||
|
||
for email in emails: | ||
email_info = self.is_valid_email(email) | ||
if not email_info: | ||
return HeuristicResult.FAIL, {"invalid_email": email} | ||
|
||
validated_emails.append({key: getattr(email_info, key) for key in details}) | ||
|
||
return HeuristicResult.PASS, {"validated_emails": validated_emails} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""Tests for the FakeEmailAnalyzer heuristic.""" | ||
|
||
|
||
from unittest.mock import MagicMock | ||
|
||
import pytest | ||
|
||
from macaron.errors import HeuristicAnalyzerValueError | ||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
|
||
@pytest.fixture(name="analyzer") | ||
def analyzer_() -> FakeEmailAnalyzer: | ||
"""Pytest fixture to create a FakeEmailAnalyzer instance.""" | ||
return FakeEmailAnalyzer() | ||
|
||
|
||
@pytest.fixture(name="pypi_package_json_asset_mock") | ||
def pypi_package_json_asset_mock_() -> MagicMock: | ||
"""Pytest fixture for a mock PyPIPackageJsonAsset.""" | ||
mock_asset = MagicMock(spec=PyPIPackageJsonAsset) | ||
mock_asset.package_json = {} | ||
return mock_asset | ||
|
||
|
||
def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test the analyzer skips if no author_email or maintainer_email is present.""" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.SKIP | ||
assert info["message"] == "No author or maintainer email available." | ||
|
||
|
||
def test_analyze_raises_error_for_missing_info_key( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock | ||
) -> None: | ||
"""Test the analyzer raises an error if the 'info' key is missing in the PyPI data.""" | ||
pypi_package_json_asset_mock.package_json = {} # No 'info' key | ||
with pytest.raises(HeuristicAnalyzerValueError) as exc_info: | ||
analyzer.analyze(pypi_package_json_asset_mock) | ||
assert "No package info available." in str(exc_info.value) | ||
|
||
|
||
def test_analyze_fail_no_email_found_in_field( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock | ||
) -> None: | ||
"""Test the analyzer fails if an email field does not contain a parsable email address.""" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": "not an email", "maintainer_email": None}} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.FAIL | ||
assert info == {"message": "no emails found in the email field"} | ||
|
||
|
||
def test_analyze_fail_invalid_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test analyzer fails if the email field contains an invalid email format.""" | ||
invalid_email = "user@example" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}} | ||
|
||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.FAIL | ||
assert info == {"message": "no emails found in the email field"} | ||
|
||
|
||
def test_analyze_pass_only_maintainer_email_valid( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock | ||
) -> None: | ||
"""Test the analyzer passes if only a valid maintainer_email is present and deliverability is not checked.""" | ||
email = "maintainer@example.net" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes and no. The domain is technically valid, but it’s reserved by IANA and not intended for real-world use. So to ensure the email is actually usable by a real user, Should I add a list of reserved domains and TLDs and check against them before proceeding with validation or acceptance, or should I keep it as it is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the email validator packages has a test environment flag?
Would that maybe be better in a unit test? If a user does use those IANA reserved domains shouldn't it fail if we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but I tried to test both cases when |
||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
|
||
if analyzer.check_deliverability: | ||
assert result == HeuristicResult.FAIL | ||
assert info == {"invalid_email": email} | ||
return | ||
|
||
assert result == HeuristicResult.PASS | ||
assert info["validated_emails"] == [ | ||
{"normalized": "maintainer@example.net", "local_part": "maintainer", "domain": "example.net"} | ||
] | ||
|
||
|
||
def test_analyze_pass_both_emails_valid(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test the analyzer passes if both emails are valid and deliverability is not checked.""" | ||
author_email = "example@gmail.com" | ||
author_local_part, author_domain = author_email.split("@") | ||
maintainer_email = "maintainer@example.net" | ||
maintainer_local_part, maintainer_domain = maintainer_email.split("@") | ||
|
||
pypi_package_json_asset_mock.package_json = { | ||
"info": {"author_email": author_email, "maintainer_email": maintainer_email} | ||
} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
if analyzer.check_deliverability: | ||
assert result == HeuristicResult.FAIL | ||
assert info == {"invalid_email": maintainer_email} | ||
return | ||
|
||
assert result == HeuristicResult.PASS | ||
|
||
validated_emails = info.get("validated_emails") | ||
assert isinstance(validated_emails, list) | ||
assert len(validated_emails) == 2 | ||
assert {"normalized": author_email, "local_part": author_local_part, "domain": author_domain} in validated_emails | ||
assert { | ||
"normalized": maintainer_email, | ||
"local_part": maintainer_local_part, | ||
"domain": maintainer_domain, | ||
} in validated_emails | ||
|
||
|
||
def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer) -> None: | ||
"""Test is_valid_email returns None on failure.""" | ||
result = analyzer.is_valid_email("invalid-email") | ||
assert result is None | ||
|
||
|
||
def test_get_emails(analyzer: FakeEmailAnalyzer) -> None: | ||
"""Test the get_emails method.""" | ||
email_field = "test@example.com, another test <another@example.org>" | ||
expected = ["test@example.com", "another@example.org"] | ||
assert analyzer.get_emails(email_field) == expected | ||
|
||
email_field_no_email = "this is not an email" | ||
assert analyzer.get_emails(email_field_no_email) == [] | ||
|
||
email_field_empty = "" | ||
assert analyzer.get_emails(email_field_empty) == [] |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see here we're extracting the email fields and then running the
is_valid_email
function directly on what is present in that field. Have you tested if this works on fields where the email is not the only text present? I've got some PyPI JSON data from Django whereauthor_email
looks like this:So the text includes more than just the email. Here's another example from the black package:
It may also be a string of multiple emails as well, like the ultralytics package:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done