-
Notifications
You must be signed in to change notification settings - Fork 29
feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
34a69a0
18715d1
59f4c61
a8d373b
468d67e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""The heuristic analyzer to check the email address of the package maintainers.""" | ||
|
||
import logging | ||
|
||
from email_validator import EmailNotValidError, ValidatedEmail, validate_email | ||
|
||
from macaron.json_tools import JsonType, json_extract | ||
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
class FakeEmailAnalyzer(BaseHeuristicAnalyzer): | ||
"""Analyze the email address of the package maintainers.""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
name="fake_email_analyzer", | ||
heuristic=Heuristics.FAKE_EMAIL, | ||
depends_on=None, | ||
) | ||
|
||
def is_valid_email(self, email: str) -> ValidatedEmail | None: | ||
"""Check if the email format is valid and the domain has MX records. | ||
|
||
Parameters | ||
---------- | ||
email: str | ||
The email address to check. | ||
|
||
Returns | ||
------- | ||
ValidatedEmail | None | ||
The validated email object if the email is valid, otherwise None. | ||
|
||
Raises | ||
------ | ||
HeuristicAnalyzerValueError | ||
if the failure is due to DNS resolution. | ||
""" | ||
emailinfo = None | ||
try: | ||
emailinfo = validate_email(email, check_deliverability=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please make |
||
except EmailNotValidError as err: | ||
err_message = f"Invalid email address: {email}. Error: {err}" | ||
logger.warning(err_message) | ||
return emailinfo | ||
|
||
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: | ||
"""Analyze the package. | ||
|
||
Parameters | ||
---------- | ||
pypi_package_json: PyPIPackageJsonAsset | ||
The PyPI package JSON asset object. | ||
|
||
Returns | ||
------- | ||
tuple[HeuristicResult, dict[str, JsonType]]: | ||
The result and related information collected during the analysis. | ||
|
||
Raises | ||
------ | ||
HeuristicAnalyzerValueError | ||
if the analysis fails. | ||
""" | ||
package_json = pypi_package_json.package_json | ||
if not package_json.get("info", {}): | ||
return HeuristicResult.SKIP, {"message": "No package info available."} | ||
art1f1c3R marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
author_email = json_extract(package_json, ["info", "author_email"], str) | ||
art1f1c3R marked this conversation as resolved.
Show resolved
Hide resolved
|
||
maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str) | ||
|
||
if not author_email and not maintainer_email: | ||
return HeuristicResult.SKIP, {"message": "No author or maintainer email available."} | ||
|
||
validated_emails: list[JsonType] = [] | ||
details = ["normalized", "local_part", "domain"] | ||
|
||
for email in [author_email, maintainer_email]: | ||
if email: | ||
email_info = self.is_valid_email(email) | ||
if not email_info: | ||
return HeuristicResult.FAIL, {"email": email} | ||
|
||
validated_emails.append({key: getattr(email_info, key) for key in details}) | ||
|
||
return HeuristicResult.PASS, {"validated_emails": validated_emails} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""Tests for the FakeEmailAnalyzer heuristic.""" | ||
|
||
|
||
from collections.abc import Generator | ||
from unittest.mock import MagicMock, patch | ||
|
||
import pytest | ||
from email_validator import EmailNotValidError | ||
|
||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
|
||
@pytest.fixture(name="analyzer") | ||
def analyzer_() -> FakeEmailAnalyzer: | ||
"""Pytest fixture to create a FakeEmailAnalyzer instance.""" | ||
return FakeEmailAnalyzer() | ||
|
||
|
||
@pytest.fixture(name="pypi_package_json_asset_mock") | ||
def pypi_package_json_asset_mock_fixture() -> MagicMock: | ||
"""Pytest fixture for a mock PyPIPackageJsonAsset.""" | ||
mock_asset = MagicMock(spec=PyPIPackageJsonAsset) | ||
mock_asset.package_json = {} | ||
return mock_asset | ||
|
||
|
||
@pytest.fixture(name="mock_validate_email") | ||
def mock_validate_email_fixture() -> Generator[MagicMock]: | ||
"""Patch validate_email and mock its behavior.""" | ||
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.validate_email") as mock: | ||
yield mock | ||
|
||
|
||
def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test the analyzer skips if no author_email or maintainer_email is present.""" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.SKIP | ||
assert info["message"] == "No author or maintainer email available." | ||
|
||
|
||
def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test the analyzer skips if 'info' key is missing in PyPI data.""" | ||
pypi_package_json_asset_mock.package_json = {} # No 'info' key | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.SKIP | ||
assert info["message"] == "No package info available." | ||
|
||
|
||
def test_analyze_fail_invalid_email( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock | ||
) -> None: | ||
"""Test analyzer fails for an invalid email format.""" | ||
invalid_email = "invalid-email" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}} | ||
mock_validate_email.side_effect = EmailNotValidError("Invalid email.") | ||
|
||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
|
||
assert result == HeuristicResult.FAIL | ||
assert info == {"email": invalid_email} | ||
mock_validate_email.assert_called_once_with(invalid_email, check_deliverability=True) | ||
|
||
|
||
def test_analyze_pass_only_maintainer_email_valid( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock | ||
) -> None: | ||
"""Test analyzer passes when only maintainer_email is present and valid.""" | ||
email = "maintainer@example.net" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes and no. The domain is technically valid, but it’s reserved by IANA and not intended for real-world use. So to ensure the email is actually usable by a real user, Should I add a list of reserved domains and TLDs and check against them before proceeding with validation or acceptance, or should I keep it as it is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the email validator packages has a test environment flag?
Would that maybe be better in a unit test? If a user does use those IANA reserved domains shouldn't it fail if we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but I tried to test both cases when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just switch it here and it will follow the other path, so it won’t trigger the network connection. |
||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}} | ||
|
||
mock_email_info = MagicMock() | ||
mock_email_info.normalized = "maintainer@example.net" | ||
mock_email_info.local_part = "maintainer" | ||
mock_email_info.domain = "example.net" | ||
mock_validate_email.return_value = mock_email_info | ||
|
||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.PASS | ||
assert info["validated_emails"] == [ | ||
{"normalized": "maintainer@example.net", "local_part": "maintainer", "domain": "example.net"} | ||
] | ||
mock_validate_email.assert_called_once_with(email, check_deliverability=True) | ||
|
||
|
||
def test_analyze_pass_both_emails_valid( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock | ||
) -> None: | ||
"""Test the analyzer passes when both emails are present and valid.""" | ||
|
||
def side_effect(email: str, check_deliverability: bool) -> MagicMock: # pylint: disable=unused-argument | ||
local_part, domain = email.split("@") | ||
mock_email_info = MagicMock() | ||
mock_email_info.normalized = email | ||
mock_email_info.local_part = local_part | ||
mock_email_info.domain = domain | ||
return mock_email_info | ||
|
||
mock_validate_email.side_effect = side_effect | ||
|
||
pypi_package_json_asset_mock.package_json = { | ||
"info": {"author_email": "author@example.com", "maintainer_email": "maintainer@example.net"} | ||
} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.PASS | ||
assert mock_validate_email.call_count == 2 | ||
|
||
validated_emails = info.get("validated_emails") | ||
assert isinstance(validated_emails, list) | ||
assert len(validated_emails) == 2 | ||
assert {"normalized": "author@example.com", "local_part": "author", "domain": "example.com"} in validated_emails | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question, if |
||
assert { | ||
"normalized": "maintainer@example.net", | ||
"local_part": "maintainer", | ||
"domain": "example.net", | ||
} in validated_emails | ||
|
||
|
||
def test_is_valid_email_success(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None: | ||
"""Test is_valid_email returns the validation object on success.""" | ||
mock_validated_email = MagicMock() | ||
mock_validated_email.normalized = "test@example.com" | ||
mock_validated_email.local_part = "test" | ||
mock_validated_email.domain = "example.com" | ||
|
||
mock_validate_email.return_value = mock_validated_email | ||
result = analyzer.is_valid_email("test@example.com") | ||
assert result == mock_validated_email | ||
mock_validate_email.assert_called_once_with("test@example.com", check_deliverability=True) | ||
|
||
|
||
def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None: | ||
"""Test is_valid_email returns None on failure.""" | ||
mock_validate_email.side_effect = EmailNotValidError("The email address is not valid.") | ||
result = analyzer.is_valid_email("invalid-email") | ||
assert result is None | ||
mock_validate_email.assert_called_once_with("invalid-email", check_deliverability=True) |
Uh oh!
There was an error while loading. Please reload this page.