diff --git a/codebeaver.yml b/codebeaver.yml new file mode 100644 index 00000000000..ac19b7a372c --- /dev/null +++ b/codebeaver.yml @@ -0,0 +1,2 @@ +from: pytest +# This file was generated automatically by CodeBeaver based on your repository. Learn how to customize it here: https://docs.codebeaver.ai/open-source/codebeaver-yml/ \ No newline at end of file diff --git a/tests/test__cli.py b/tests/test__cli.py new file mode 100644 index 00000000000..4a1e5c3d519 --- /dev/null +++ b/tests/test__cli.py @@ -0,0 +1,345 @@ +import argparse +import asyncio +import sys +import pytest + +from redbot.core._cli import ( + confirm, + interactive_config, + non_negative_int, + message_cache_size_int, + parse_cli_flags, + ExitCodes, +) + + +def test_confirm_yes(monkeypatch): + """Test confirm returns True for 'yes' input.""" + monkeypatch.setattr("builtins.input", lambda prompt="": "yes") + assert confirm("Test confirm") is True + +def test_confirm_no(monkeypatch): + """Test confirm returns False for 'no' input even with a default of True.""" + monkeypatch.setattr("builtins.input", lambda prompt="": "no") + assert confirm("Test confirm", default=True) is False + +def test_confirm_default(monkeypatch): + """Test confirm returns the default value when input is empty.""" + monkeypatch.setattr("builtins.input", lambda prompt="": "") + assert confirm("Test confirm", default=True) is True + +def test_confirm_invalid_then_valid(monkeypatch): + """Test confirm loops on an invalid input then accepts a valid 'y'.""" + inputs = iter(["maybe", "y"]) + monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs)) + assert confirm("Test confirm") is True + +def test_confirm_keyboard_interrupt(monkeypatch): + """Test confirm handles KeyboardInterrupt by exiting with SHUTDOWN exit code.""" + def fake_input(prompt=""): + raise KeyboardInterrupt + monkeypatch.setattr("builtins.input", fake_input) + with pytest.raises(SystemExit) as excinfo: + confirm("Test confirm") + assert excinfo.value.code == ExitCodes.SHUTDOWN + +def test_confirm_eof(monkeypatch): + """Test confirm handles EOFError by exiting with INVALID_CLI_USAGE exit code.""" + def fake_input(prompt=""): + raise EOFError + monkeypatch.setattr("builtins.input", fake_input) + with pytest.raises(SystemExit) as excinfo: + confirm("Test confirm") + assert excinfo.value.code == ExitCodes.INVALID_CLI_USAGE + +def test_non_negative_int_valid(): + """Test non_negative_int returns a valid positive integer.""" + assert non_negative_int("10") == 10 + +def test_non_negative_int_negative(): + """Test non_negative_int raises error on negative integer input.""" + with pytest.raises(argparse.ArgumentTypeError): + non_negative_int("-10") + +def test_non_negative_int_non_numeric(): + """Test non_negative_int raises error on non-numeric input.""" + with pytest.raises(argparse.ArgumentTypeError): + non_negative_int("abc") + +def test_message_cache_size_int_valid(): + """Test message_cache_size_int returns the valid integer input.""" + assert message_cache_size_int("1500") == 1500 + +def test_message_cache_size_int_too_low(): + """Test message_cache_size_int raises error when the number is less than 1000.""" + with pytest.raises(argparse.ArgumentTypeError): + message_cache_size_int("500") + +def test_parse_cli_flags_defaults(): + """Test parse_cli_flags returns expected default values with no arguments.""" + args = parse_cli_flags([]) + assert args.version is False + # Check that logging_level is calculated (using cli_level_to_log_level logic internally) + assert args.logging_level >= 0 + assert args.prefix == [] + +def test_parse_cli_flags_with_prefix_and_version(): + """Test parse_cli_flags correctly processes version and prefix arguments.""" + arg_list = ["--version", "-p", "!", "-p", "?"] + args = parse_cli_flags(arg_list) + assert args.version is True + # The prefixes are sorted in reverse order so "?" precedes "!" + assert args.prefix == ["?", "!"] + +class FakeConfigItem: + """Fake config item to simulate asynchronous setting of configuration values.""" + def __init__(self): + self.value = None + async def set(self, value): + self.value = value + +class FakeConfig: + """Fake configuration container for token and prefix.""" + def __init__(self): + self.token = FakeConfigItem() + self.prefix = FakeConfigItem() + +class FakeRed: + """Fake Red object with a _config attribute used for testing interactive configuration.""" + def __init__(self): + self._config = FakeConfig() + +@pytest.mark.asyncio +async def test_interactive_config(monkeypatch): + """Test interactive_config sets the token and prefix correctly during an interactive session.""" + token_str = "x" * 51 # valid token, length >= 50 + # Simulate inputs: token input, prefix input, then confirmation for the prefix choice + inputs = iter([token_str, "!", "y"]) + monkeypatch.setattr("builtins.input", lambda prompt="": next(inputs)) + + red = FakeRed() + result_token = await interactive_config(red, token_set=False, prefix_set=False, print_header=False) + # Verify that the token is returned and set correctly + assert result_token == token_str + assert red._config.token.value == token_str + # Verify that the prefix is set correctly (wrapped in a list) + assert red._config.prefix.value == ["!"] + +@pytest.mark.asyncio +async def test_interactive_config_with_existing_token_prefix(monkeypatch): + """Test interactive_config does not prompt and modify settings when token and prefix are already set.""" + # When token_set and prefix_set are True, no input should be requested. + monkeypatch.setattr("builtins.input", lambda prompt="": "should not be used") + red = FakeRed() + result_token = await interactive_config(red, token_set=True, prefix_set=True, print_header=False) + # Since no prompting occurs, the token remains None + assert result_token is None + assert red._config.token.value is None + assert red._config.prefix.value is None +def test_interactive_config_prefix_only(monkeypatch): + """Test interactive_config prompts for and sets the prefix when token is already set.""" + # token_set True so token prompt is skipped; prefix_set False so prefix prompt is run. + # Simulate: first input "LongPrefixName" (invalid due to length) then "n" for its confirmation, + # then a valid input "!" with confirmation "y". + responses = iter(["LongPrefixName", "n", "!", "y"]) + monkeypatch.setattr("builtins.input", lambda prompt="": next(responses)) + red = FakeRed() + # Use asyncio.run to execute the asynchronous function + token_result = __import__("asyncio").run(interactive_config(red, token_set=True, prefix_set=False, print_header=False)) + # token_result should be None because token was skipped. + assert token_result is None + assert red._config.prefix.value == ["!"] + +@pytest.mark.asyncio +async def test_interactive_config_token_invalid_then_valid(monkeypatch): + """Test interactive_config handles an invalid token input followed by a valid token input.""" + # token_set False so token prompt is active, prefix_set True so prefix prompt is skipped. + token_invalid = "short" + token_valid = "x" * 50 # exactly 50 characters: valid + responses = iter([token_invalid, token_valid]) + monkeypatch.setattr("builtins.input", lambda prompt="": next(responses)) + red = FakeRed() + result_token = await interactive_config(red, token_set=False, prefix_set=True, print_header=False) + assert result_token == token_valid + assert red._config.token.value == token_valid + +def test_parse_cli_flags_all_args(): + """Test parse_cli_flags with almost every CLI flag to ensure all arguments are parsed correctly.""" + arg_list = [ + "--version", + "--debuginfo", + "--list-instances", + "--edit", + "--edit-instance-name", "NewInstance", + "--overwrite-existing-instance", + "--edit-data-path", "/new/path", + "--copy-data", + "--owner", "123456789", + "--co-owner", "987654321", "1122334455", + "-p", "!", + "--no-prompt", + "--no-cogs", + "--load-cogs", "cog1", "cog2", + "--unload-cogs", "cog3", + "--cog-path", "/cogs/path1", "/cogs/path2", + "--dry-run", + "-v", "-v", + "--dev", + "--mentionable", + "--rpc", + "--rpc-port", "7000", + "--token", "fake_token_value_that_is_long_enough_for_testing_purposes__________", + "--no-instance", + "TestInstance", + "--team-members-are-owners", + "--disable-intent", "guilds", + "--force-rich-logging", + "--rich-traceback-extra-lines", "5", + "--rich-traceback-show-locals", + "--message-cache-size", "2000", + "--no-message-cache" + ] + args = parse_cli_flags(arg_list) + assert args.version is True + assert args.debuginfo is True + assert args.list_instances is True + assert args.edit is True + assert args.edit_instance_name == "NewInstance" + assert args.overwrite_existing_instance is True + assert args.edit_data_path == "/new/path" + assert args.copy_data is True + assert args.owner == 123456789 + assert args.co_owner == [987654321, 1122334455] + # The prefix should appear as a sorted list (with one element in this case). + assert args.prefix == ["!"] + assert args.no_prompt is True + assert args.no_cogs is True + assert args.load_cogs == ["cog1", "cog2"] + assert args.unload_cogs == ["cog3"] + assert args.cog_path == ["/cogs/path1", "/cogs/path2"] + assert args.dry_run is True + # logging_level should be increased due to two -v flags. + assert args.logging_level > 0 + assert args.dev is True + assert args.mentionable is True + assert args.rpc is True + assert args.rpc_port == 7000 + expected_token = "fake_token_value_that_is_long_enough_for_testing_purposes__________" + assert args.token == expected_token + assert args.no_instance is True + assert args.instance_name == "TestInstance" + assert args.use_team_features is True + # --disable-intent should be parsed as a list. + assert args.disable_intent == ["guilds"] + # Check rich logging flags and traceback settings. + assert args.rich_logging is True + assert args.rich_traceback_extra_lines == 5 + assert args.rich_traceback_show_locals is True + assert args.message_cache_size == 2000 + assert args.no_message_cache is True + +def test_interactive_config_prefix_startswith_slash(monkeypatch): + """Test that interactive_config reprompts when the user enters a prefix starting with '/'.""" + # Simulate inputs: prefix that starts with '/', then a valid prefix "!" and confirmation "y". + responses = iter(["/hello", "!", "y"]) + monkeypatch.setattr("builtins.input", lambda prompt="": next(responses)) + from redbot.core._cli import interactive_config + class FakeConfigItem: + async def set(self, value): + self.value = value + class FakeConfig: + def __init__(self): + self.token = FakeConfigItem() + self.prefix = FakeConfigItem() + class FakeRed: + def __init__(self): + self._config = FakeConfig() + red = FakeRed() + # token_set True so token input is skipped, prefix_set False so prefix input is required. + result = __import__("asyncio").run(interactive_config(red, token_set=True, prefix_set=False, print_header=False)) + assert result is None + assert red._config.prefix.value == ["!"] + +def test_parse_cli_flags_rich_logging_conflict(): + """Test that when both force rich logging and force disable rich logging flags are provided, the last one wins.""" + from redbot.core._cli import parse_cli_flags + # Note: since argparse processes arguments in the order given, --force-disable-rich-logging overwrites the previous setting. + args = parse_cli_flags(["--force-rich-logging", "--force-disable-rich-logging"]) + assert args.rich_logging is False + +def test_message_cache_size_int_exact_boundary(): + """Test that message_cache_size_int accepts the boundary value 1000.""" + from redbot.core._cli import message_cache_size_int + assert message_cache_size_int("1000") == 1000 + +def test_non_negative_int_maxsize_exceeded(): + """Test that non_negative_int raises an error when given a value greater than sys.maxsize.""" + from redbot.core._cli import non_negative_int +@pytest.mark.asyncio +async def test_interactive_config_prefix_keyboard_interrupt(monkeypatch): + """Test interactive_config handles KeyboardInterrupt during the prefix confirmation process.""" + # Prepare an iterator that returns a valid prefix first, then raises KeyboardInterrupt on confirmation. + responses = iter(["!", KeyboardInterrupt()]) + def fake_input(prompt=""): + value = next(responses) + if isinstance(value, BaseException): + raise value + return value + monkeypatch.setattr("builtins.input", fake_input) + from redbot.core._cli import interactive_config, ExitCodes + # Create fake config objects and a fake Red as used in interactive_config. + class FakeConfigItem: + async def set(self, value): + self.value = value + class FakeConfig: + def __init__(self): + self.token = FakeConfigItem() + self.prefix = FakeConfigItem() + class FakeRed: + def __init__(self): + self._config = FakeConfig() + fake_red = FakeRed() + import asyncio + with pytest.raises(SystemExit) as excinfo: + await interactive_config(fake_red, token_set=True, prefix_set=False, print_header=False) + assert excinfo.value.code == ExitCodes.SHUTDOWN + +@pytest.mark.asyncio +async def test_interactive_config_print_header(capsys, monkeypatch): + """Test that interactive_config prints the header when print_header is True.""" + responses = iter(["!", "y"]) + monkeypatch.setattr("builtins.input", lambda prompt="": next(responses)) + from redbot.core._cli import interactive_config + class FakeConfigItem: + async def set(self, value): + self.value = value + class FakeConfig: + def __init__(self): + self.token = FakeConfigItem() + self.prefix = FakeConfigItem() + class FakeRed: + def __init__(self): + self._config = FakeConfig() + fake_red = FakeRed() + import asyncio + await interactive_config(fake_red, token_set=True, prefix_set=False, print_header=True) + captured = capsys.readouterr().out + # Verify header message is in the captured output. + assert "Red - Discord Bot | Configuration process" in captured + assert fake_red._config.prefix.value == ["!"] + +def test_parse_cli_flags_instance_none(): + """Test that parse_cli_flags returns instance_name as None when not provided.""" + from redbot.core._cli import parse_cli_flags + args = parse_cli_flags([]) + assert args.instance_name is None + +def test_parse_cli_flags_invalid_co_owner(): + """Test that parse_cli_flags exits (via SystemExit) when an invalid co-owner value is provided.""" + from redbot.core._cli import parse_cli_flags + with pytest.raises(SystemExit): + parse_cli_flags(["--co-owner", "notanumber"]) + import sys + with pytest.raises(argparse.ArgumentTypeError): + non_negative_int(str(sys.maxsize + 1)) +# End of file. \ No newline at end of file diff --git a/tests/test__i18n.py b/tests/test__i18n.py new file mode 100644 index 00000000000..39ed940efc9 --- /dev/null +++ b/tests/test__i18n.py @@ -0,0 +1,193 @@ +import pytest +from redbot.core import _i18n + +class DummyTranslator: + """Dummy translator used for testing reload translations.""" + def __init__(self): + self.called = False + + def load_translations(self): + self.called = True + +@pytest.fixture(autouse=True) +def reset_globals(): + """Fixture to reset global state between tests.""" + # Clear translations and reset defaults + _i18n.translators.clear() + _i18n.current_locale_default = "en-US" + _i18n.current_regional_format_default = None + # Reset context variables by creating new instances + _i18n.current_locale = _i18n.ContextVar("current_locale") + _i18n.current_regional_format = _i18n.ContextVar("current_regional_format") + +def test_get_standardized_locale_name_valid(): + """Test _get_standardized_locale_name with valid locale.""" + result = _i18n._get_standardized_locale_name("en-US") + assert result == "en-US" + +def test_get_standardized_locale_name_invalid_format(): + """Test _get_standardized_locale_name with invalid locale format (missing country code).""" + with pytest.raises(ValueError, match="Invalid format - language code has to include country code"): + _i18n._get_standardized_locale_name("en") + +def test_set_global_locale(): + """Test setting global locale and verify that translations are reloaded.""" + dummy = DummyTranslator() + _i18n.translators.append(dummy) + result = _i18n.set_global_locale("fr-FR") + assert result == "fr-FR" + assert _i18n.current_locale_default == "fr-FR" + assert dummy.called is True + +def test_set_global_locale_invalid(): + """Test setting global locale with invalid locale should raise error.""" + with pytest.raises(ValueError, match="Invalid language code"): + _i18n.set_global_locale("invalid") + +def test_set_global_regional_format_valid(): + """Test setting global regional format with valid locale.""" + result = _i18n.set_global_regional_format("de-DE") + assert result == "de-DE" + assert _i18n.current_regional_format_default == "de-DE" + +def test_set_global_regional_format_none(): + """Test setting global regional format with None.""" + result = _i18n.set_global_regional_format(None) + assert result is None + assert _i18n.current_regional_format_default is None + +def test_set_contextual_locale_without_verification(): + """Test setting contextual locale without verification flag.""" + result = _i18n.set_contextual_locale("es", verify_language_code=False) + # Since verification is False, the locale is not standardized; it should be exactly what was passed. + assert result == "es" + assert _i18n.current_locale.get("default") == "es" + +def test_set_contextual_locale_with_verification_valid(): + """Test setting contextual locale with verification flag and valid locale.""" + result = _i18n.set_contextual_locale("it-IT", verify_language_code=True) + assert result == "it-IT" + assert _i18n.current_locale.get("default") == "it-IT" + +def test_set_contextual_locale_with_verification_invalid(): + """Test setting contextual locale with verification flag and invalid locale raises error.""" + with pytest.raises(ValueError): + _i18n.set_contextual_locale("it", verify_language_code=True) + +def test_set_contextual_regional_format_without_verification(): + """Test setting contextual regional format without verification flag.""" + result = _i18n.set_contextual_regional_format("pt-BR", verify_language_code=False) + assert result == "pt-BR" + assert _i18n.current_regional_format.get("default") == "pt-BR" + +def test_set_contextual_regional_format_with_verification_valid(): + """Test setting contextual regional format with verification flag and valid locale.""" + result = _i18n.set_contextual_regional_format("nl-NL", verify_language_code=True) + assert result == "nl-NL" + assert _i18n.current_regional_format.get("default") == "nl-NL" + +def test_set_contextual_regional_format_with_verification_invalid(): + """Test setting contextual regional format with verification flag and invalid locale raises error.""" + with pytest.raises(ValueError): + _i18n.set_contextual_regional_format("nl", verify_language_code=True) + +def test_reload_on_contextual_locale(): + """Test that setting a contextual locale triggers _reload_locales on all translators.""" + translator1 = DummyTranslator() + translator2 = DummyTranslator() + _i18n.translators.extend([translator1, translator2]) + _i18n.set_contextual_locale("ja-JP", verify_language_code=True) + assert translator1.called and translator2.called + +def test_reload_on_global_locale(): + """Test that setting global locale triggers _reload_locales on all translators.""" + translator1 = DummyTranslator() + translator2 = DummyTranslator() + _i18n.translators.extend([translator1, translator2]) + _i18n.set_global_locale("ko-KR") + assert translator1.called and translator2.called + +def test_set_contextual_regional_format_with_none(): + """Test that passing None to set_contextual_regional_format behaves gracefully.""" + # While type hints expect str, this test ensures that None is handled without error. + result = _i18n.set_contextual_regional_format(None) + assert result is None + assert _i18n.current_regional_format.get("default") is None +def test_get_standardized_locale_name_with_underscore_invalid(): + """Test _get_standardized_locale_name with a locale using underscore separator; should raise ValueError due to wrong formatting.""" + with pytest.raises(ValueError, match="Invalid language code. Use format: `en-US`"): + _i18n._get_standardized_locale_name("en_US") + +def test_set_global_regional_format_invalid(): + """Test setting global regional format with an invalid locale code should raise ValueError.""" + with pytest.raises(ValueError, match="Invalid language code"): + _i18n.set_global_regional_format("invalid") + +def test_set_contextual_regional_format_verify_none(): + """Test setting contextual regional format with verification on when passing None gracefully returns None.""" + result = _i18n.set_contextual_regional_format(None, verify_language_code=True) + assert result is None + assert _i18n.current_regional_format.get("default") is None +def test_get_standardized_locale_name_lowercase(): + """Test that _get_standardized_locale_name converts a lowercase region code to the correct format.""" + result = _i18n._get_standardized_locale_name("fr-fr") + assert result == "fr-FR" + +def test_context_var_default_value(): + """Test that the ContextVars return the provided fallback value when not set.""" + # Since these context variables were reset in the fixture and not set, they should return the fallback. + default_locale = _i18n.current_locale.get("default") + default_regional = _i18n.current_regional_format.get("default") + assert default_locale == "default" + assert default_regional == "default" + +def test_set_global_locale_multiple_calls(): + """Test that multiple calls to set_global_locale update the global default and trigger reload on translators each time.""" + dummy = DummyTranslator() + _i18n.translators.append(dummy) + result1 = _i18n.set_global_locale("es-ES") + assert result1 == "es-ES" + assert _i18n.current_locale_default == "es-ES" + # Reset dummy flag to test subsequent call + dummy.called = False + result2 = _i18n.set_global_locale("pt-PT") + assert result2 == "pt-PT" + assert _i18n.current_locale_default == "pt-PT" + assert dummy.called is True + +def test_set_contextual_locale_multiple_calls(): + """Test that multiple calls to set_contextual_locale update the ContextVar and trigger reload on translators each time.""" + dummy = DummyTranslator() + _i18n.translators.append(dummy) + result1 = _i18n.set_contextual_locale("ru-RU", verify_language_code=True) + assert result1 == "ru-RU" + assert _i18n.current_locale.get("default") == "ru-RU" + # Reset dummy flag to test subsequent call + dummy.called = False + result2 = _i18n.set_contextual_locale("ar-SA", verify_language_code=True) + assert result2 == "ar-SA" + assert _i18n.current_locale.get("default") == "ar-SA" + assert dummy.called is True +def test_get_standardized_locale_name_empty(): + """Test _get_standardized_locale_name with an empty string to ensure it raises a ValueError.""" + with pytest.raises(ValueError, match="Invalid language code"): + _i18n._get_standardized_locale_name("") + +class FailingTranslator: + """Dummy translator that always raises an exception when load_translations is called.""" + def load_translations(self): + raise RuntimeError("Translation failure") + +def test_set_contextual_locale_propagate_exception(): + """Test that an exception in translator.load_translations propagates when setting a contextual locale.""" + failing = FailingTranslator() + _i18n.translators.append(failing) + with pytest.raises(RuntimeError, match="Translation failure"): + _i18n.set_contextual_locale("ja-JP", verify_language_code=True) + +def test_set_global_locale_propagate_exception(): + """Test that an exception in translator.load_translations propagates when setting a global locale.""" + failing = FailingTranslator() + _i18n.translators.append(failing) + with pytest.raises(RuntimeError, match="Translation failure"): + _i18n.set_global_locale("ko-KR") \ No newline at end of file diff --git a/tests/test_tunnel.py b/tests/test_tunnel.py new file mode 100644 index 00000000000..e498785ca9c --- /dev/null +++ b/tests/test_tunnel.py @@ -0,0 +1,258 @@ +import asyncio +import pytest +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +import discord +from redbot.core.utils.tunnel import Tunnel + +class DummyChannel: + def __init__(self, channel_id=123): + self.id = channel_id + self.sent_messages = [] + async def send(self, content, **kwargs): + msg = MagicMock() + msg.content = content + msg.id = len(self.sent_messages) + 100 + self.sent_messages.append(msg) + return msg + +class DummyAttachment: + def __init__(self, size, height=None, content="dummy"): + self.size = size + self.height = height + self.content = content + async def to_file(self): + file = MagicMock() + file.fp = self.content + return file + +class DummyMessage: + def __init__(self, content="", author=None, channel=None, attachments=None, guild=True, message_id=1): + self.content = content + self.author = author + self.channel = channel + self.attachments = attachments if attachments is not None else [] + self.guild = guild + self.id = message_id + self.reactions = [] + async def add_reaction(self, emoji): + self.reactions.append(emoji) + +class DummyUser: + def __init__(self, id): + self.id = id + +class DummyMember: + def __init__(self, id, guild): + self.id = id + self.guild = guild + +@pytest.fixture +def dummy_sender(): + # Dummy sender as a DummyMember + guild = MagicMock() + return DummyMember(1, guild) + +@pytest.fixture +def dummy_recipient(): + return DummyUser(2) + +@pytest.fixture +def dummy_origin(): + return DummyChannel(channel_id=10) + +@pytest.fixture(autouse=True) +def reset_instances(): + # Clear the tunnel instances dictionary before each test + from redbot.core.utils.tunnel import _instances + _instances.clear() + +@pytest.mark.asyncio +async def test_singleton_tunnel(dummy_sender, dummy_origin, dummy_recipient): + """Test that creating multiple tunnels with same parameters returns the same instance.""" + t1 = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + t2 = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + assert t1 is t2 + +@pytest.mark.asyncio +async def test_members_property(dummy_sender, dummy_origin, dummy_recipient): + """Test the members property returns the sender and recipient.""" + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + members = tunnel.members + assert members[0] == dummy_sender + assert members[1] == dummy_recipient + +@pytest.mark.asyncio +@pytest.mark.xfail(reason="Bug in minutes_since calculation: uses (last_interaction - utcnow).seconds") +async def test_minutes_since(dummy_sender, dummy_origin, dummy_recipient): + """Test minutes_since property returns appropriate minutes difference.""" + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + # set last_interaction to 2 minutes ago + tunnel.last_interaction = datetime.utcnow() - timedelta(minutes=2) + minutes = tunnel.minutes_since + # because the calculation is (last_interaction - utcnow), expect a negative value around -2 minutes + assert -3 <= minutes <= -1 + +@pytest.mark.asyncio +async def test_react_close(dummy_sender, dummy_origin, dummy_recipient): + """Test react_close sends message to the correct destination and formats message.""" + # Create dummy channels where send is AsyncMock + dummy_origin.send = AsyncMock(return_value=MagicMock()) + dummy_recipient.send = AsyncMock(return_value=MagicMock()) + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + # Call react_close with sender id - destination will be recipient + await tunnel.react_close(uid=dummy_sender.id, message="Closed by {closer.id}") + dummy_recipient.send.assert_called_once() + # Call react_close with recipient id - destination will be origin + await tunnel.react_close(uid=dummy_recipient.id, message="Closed by {closer.id}") + dummy_origin.send.assert_called_once() + +@pytest.mark.asyncio +async def test_message_forwarder_text(): + """Test message_forwarder with text content splitting into pages.""" + async def fake_send(content, **kwargs): + msg = MagicMock() + msg.content = content + msg.id = 200 + len(content) + return msg + + dummy_destination = MagicMock() + dummy_destination.send = AsyncMock(side_effect=lambda content, **kwargs: fake_send(content, **kwargs)) + # Text that will be pagified (simulate multiple pages) + content = "Page1\nPage2\nPage3" + msgs = await Tunnel.message_forwarder(destination=dummy_destination, content=content) + assert isinstance(msgs, list) + assert len(msgs) >= 1 + +@pytest.mark.asyncio +async def test_message_forwarder_embed_and_files(): + """Test message_forwarder with embed and files without content.""" + dummy_destination = MagicMock() + dummy_destination.send = AsyncMock(return_value=MagicMock(id=999)) + embed = MagicMock() + files = [MagicMock()] + msgs = await Tunnel.message_forwarder(destination=dummy_destination, embed=embed, files=files) + assert msgs[0].id == 999 + +@pytest.mark.asyncio +async def test_files_from_attach(): + """Test files_from_attach returns a list of files for valid attachments and skips non-images if required.""" + # Create a dummy attachment that qualifies as image + attachment_image = DummyAttachment(size=1000, height=100) + # Create a dummy attachment that is not an image + attachment_non_image = DummyAttachment(size=1000, height=None) + # Create a dummy message with attachments; total size less than max_size + msg = DummyMessage(content="Test", attachments=[attachment_image, attachment_non_image]) + files = await Tunnel.files_from_attach(msg, images_only=True) + # Only the image attachment should be returned + assert len(files) == 1 + +@pytest.mark.asyncio +async def test_close_because_disabled(dummy_sender, dummy_origin, dummy_recipient): + """Test that close_because_disabled sends close messages to both ends.""" + dummy_origin.send = AsyncMock(return_value=MagicMock()) + dummy_recipient.send = AsyncMock(return_value=MagicMock()) + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + await tunnel.close_because_disabled("Tunnel closed.") + dummy_origin.send.assert_called_once_with("Tunnel closed.") + dummy_recipient.send.assert_called_once_with("Tunnel closed.") + +@pytest.mark.asyncio +async def test_communicate_from_sender(dummy_sender, dummy_origin, dummy_recipient): + """Test communicate method when message is from sender in the origin channel.""" + async def fake_origin_send(content, **kwargs): + await asyncio.sleep(0) + msg = MagicMock(id=300) + msg.add_reaction = AsyncMock(return_value=None) + return msg + async def fake_recipient_send(content, **kwargs): + await asyncio.sleep(0) + msg = MagicMock(id=400) + msg.add_reaction = AsyncMock(return_value=None) + return msg + dummy_origin.send = AsyncMock(side_effect=fake_origin_send) + dummy_recipient.send = AsyncMock(side_effect=fake_recipient_send) + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + # Create a dummy message from sender in origin channel + dummy_message = DummyMessage(content="Hello", author=dummy_sender, channel=dummy_origin, message_id=101) + # Patch files_from_attach to return an empty list (simulate no attachments) + with patch.object(Tunnel, "files_from_attach", return_value=[]): + ret_ids = await tunnel.communicate(message=dummy_message, topic="Topic:") + # ret_ids should be a list of two ints: [forwarded_message.id, original_message.id] + assert isinstance(ret_ids, list) + assert len(ret_ids) == 2 + +@pytest.mark.asyncio +async def test_communicate_from_recipient(dummy_sender, dummy_origin, dummy_recipient): + """Test communicate method when message is from recipient in DM (guild is None).""" + async def fake_origin_send(content, **kwargs): + await asyncio.sleep(0) + msg = MagicMock(id=500) + msg.add_reaction = AsyncMock(return_value=None) + return msg + async def fake_recipient_send(content, **kwargs): + await asyncio.sleep(0) + msg = MagicMock(id=600) + msg.add_reaction = AsyncMock(return_value=None) + return msg + dummy_origin.send = AsyncMock(side_effect=fake_origin_send) + dummy_recipient.send = AsyncMock(side_effect=fake_recipient_send) + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + # Create a dummy message from recipient in DM (guild is None) + dummy_message = DummyMessage(content="Reply", author=dummy_recipient, channel=DummyChannel(channel_id=999), guild=None, message_id=102) + with patch.object(Tunnel, "files_from_attach", return_value=[]): + ret_ids = await tunnel.communicate(message=dummy_message, topic="Re:") + assert isinstance(ret_ids, list) + assert len(ret_ids) == 2 +@pytest.mark.asyncio +async def test_files_from_attach_exceeds_size(): + """Test files_from_attach returns empty list if total attachment size exceeds max allowed.""" + # Create attachments whose total size is greater than 26214400 bytes. + att1 = DummyAttachment(size=20000000, height=100) + att2 = DummyAttachment(size=7000000, height=100) # total = 27000000 bytes > max_size + msg = DummyMessage(content="Big", attachments=[att1, att2]) + files = await Tunnel.files_from_attach(msg) + assert files == [] + +@pytest.mark.asyncio +async def test_files_from_attach_http_exception(): + """Test files_from_attach raises HTTPException for non-cached image attachment errors.""" + # Define a dummy attachment that raises discord.HTTPException with status != 415. + class DummyAttachmentFail(DummyAttachment): + async def to_file(self): + exc = discord.HTTPException(response=MagicMock(), message="fail") + exc.status = 400 + raise exc + att = DummyAttachmentFail(size=1000, height=100) + msg = DummyMessage(content="Test", attachments=[att]) + with pytest.raises(discord.HTTPException): + await Tunnel.files_from_attach(msg, images_only=True, use_cached=True) + +@pytest.mark.asyncio +async def test_communicate_invalid_sender(dummy_sender, dummy_origin, dummy_recipient): + """Test communicate returns None if message is from an invalid sender that does not match sender or recipient.""" + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + # Create a dummy message from a different author + fake_author = DummyMember(999, MagicMock()) + msg = DummyMessage(content="Ignored", author=fake_author, channel=dummy_origin, guild=True, message_id=103) + ret_ids = await tunnel.communicate(message=msg, topic="Ignore:") + assert ret_ids is None + +@pytest.mark.asyncio +async def test_communicate_skip_message_content(dummy_sender, dummy_origin, dummy_recipient): + """Test communicate with skip_message_content True to ensure only the topic is sent.""" + async def fake_send(content, **kwargs): + await asyncio.sleep(0) + msg = MagicMock(id=700) + msg.add_reaction = AsyncMock(return_value=None) + return msg + + dummy_origin.send = AsyncMock(side_effect=fake_send) + dummy_recipient.send = AsyncMock(side_effect=fake_send) + tunnel = Tunnel(sender=dummy_sender, origin=dummy_origin, recipient=dummy_recipient) + dummy_message = DummyMessage(content="Content", author=dummy_sender, channel=dummy_origin, message_id=104) + with patch.object(Tunnel, "files_from_attach", return_value=[]): + ret_ids = await tunnel.communicate(message=dummy_message, topic="Topic only", skip_message_content=True) + assert isinstance(ret_ids, list) + assert len(ret_ids) == 2 \ No newline at end of file