diff --git a/cogs/check_su_platform_authorisation.py b/cogs/check_su_platform_authorisation.py index fc8bc0a0a..7f0583e7d 100644 --- a/cogs/check_su_platform_authorisation.py +++ b/cogs/check_su_platform_authorisation.py @@ -4,19 +4,18 @@ from enum import Enum from typing import TYPE_CHECKING, override -import aiohttp -import bs4 import discord from discord.ext import tasks from config import settings -from utils import GLOBAL_SSL_CONTEXT, CommandChecks, TeXBotBaseCog +from utils import CommandChecks, TeXBotBaseCog from utils.error_capture_decorators import ( capture_guild_does_not_exist_error, ) +from utils.msl import get_su_platform_access_cookie_status, get_su_platform_organisations if TYPE_CHECKING: - from collections.abc import Iterable, Mapping, Sequence + from collections.abc import Sequence from collections.abc import Set as AbstractSet from logging import Logger from typing import Final @@ -30,21 +29,6 @@ logger: "Final[Logger]" = logging.getLogger("TeX-Bot") -REQUEST_HEADERS: "Final[Mapping[str, str]]" = { - "Cache-Control": "no-cache", - "Pragma": "no-cache", - "Expires": "0", -} - -REQUEST_COOKIES: "Final[Mapping[str, str]]" = { - ".ASPXAUTH": settings["SU_PLATFORM_ACCESS_COOKIE"] -} - -SU_PLATFORM_PROFILE_URL: "Final[str]" = "https://guildofstudents.com/profile" -SU_PLATFORM_ORGANISATION_URL: "Final[str]" = ( - "https://www.guildofstudents.com/organisation/admin" -) - class SUPlatformAccessCookieStatus(Enum): """Enum class defining the status of the SU Platform Access Cookie.""" @@ -72,114 +56,7 @@ class SUPlatformAccessCookieStatus(Enum): ) -class CheckSUPlatformAuthorisationBaseCog(TeXBotBaseCog): - """Cog class that defines the base functionality for cookie authorisation checks.""" - - async def _fetch_url_content_with_session(self, url: str) -> str: - """Fetch the HTTP content at the given URL, using a shared aiohttp session.""" - async with ( - aiohttp.ClientSession( - headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES - ) as http_session, - http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as http_response, - ): - return await http_response.text() - - async def get_su_platform_access_cookie_status(self) -> SUPlatformAccessCookieStatus: - """Retrieve the current validity status of the SU platform access cookie.""" - response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( - await self._fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" - ) - page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") - if not page_title or "Login" in str(page_title): - logger.debug("Token is invalid or expired.") - return SUPlatformAccessCookieStatus.INVALID - - organisation_admin_url: str = ( - f"{SU_PLATFORM_ORGANISATION_URL}/{settings['ORGANISATION_ID']}" - ) - response_html: str = await self._fetch_url_content_with_session(organisation_admin_url) - - if "admin tools" in response_html.lower(): - return SUPlatformAccessCookieStatus.AUTHORISED - - if "You do not have any permissions for this organisation" in response_html.lower(): - return SUPlatformAccessCookieStatus.VALID - - logger.warning( - "Unexpected response when checking SU platform access cookie authorisation." - ) - return SUPlatformAccessCookieStatus.INVALID - - async def get_su_platform_organisations(self) -> "Iterable[str]": - """Retrieve the MSL organisations the current SU platform cookie has access to.""" - response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( - await self._fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" - ) - - page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") - - if not page_title: - logger.warning( - "Profile page returned no content when checking " - "SU platform access cookie's authorisation." - ) - return () - - if "Login" in str(page_title): - logger.warning( - "Authentication redirected to login page. " - "SU platform access cookie is invalid or expired." - ) - return () - - profile_section_html: bs4.Tag | bs4.NavigableString | None = response_object.find( - "div", {"id": "profile_main"} - ) - - if profile_section_html is None: - logger.warning( - "Couldn't find the profile section of the user " - "when scraping the SU platform's website HTML." - ) - logger.debug("Retrieved HTML: %s", response_object.text) - return () - - user_name: bs4.Tag | bs4.NavigableString | int | None = profile_section_html.find("h1") - - if not isinstance(user_name, bs4.Tag): - logger.warning( - "Found user profile on the SU platform but couldn't find their name." - ) - logger.debug("Retrieved HTML: %s", response_object.text) - return () - - parsed_html: bs4.Tag | bs4.NavigableString | None = response_object.find( - "ul", {"id": "ulOrgs"} - ) - - if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): - NO_ADMIN_TABLE_MESSAGE: Final[str] = ( - f"Failed to retrieve the admin table for user: {user_name.string}. " - "Please check you have used the correct SU platform access token!" - ) - logger.warning(NO_ADMIN_TABLE_MESSAGE) - return () - - organisations: Iterable[str] = [ - list_item.get_text(strip=True) for list_item in parsed_html.find_all("li") - ] - - logger.debug( - "SU platform access cookie has admin authorisation to: %s as user %s", - organisations, - user_name.text, - ) - - return organisations - - -class CheckSUPlatformAuthorisationCommandCog(CheckSUPlatformAuthorisationBaseCog): +class CheckSUPlatformAuthorisationCommandCog(TeXBotBaseCog): """Cog class that defines the "/check-su-platform-authorisation" command.""" @discord.slash_command( # type: ignore[no-untyped-call, misc] @@ -200,7 +77,7 @@ async def check_su_platform_authorisation(self, ctx: "TeXBotApplicationContext") async with ctx.typing(): su_platform_access_cookie_organisations: AbstractSet[str] = set( - await self.get_su_platform_organisations() + await get_su_platform_organisations() ) await ctx.followup.send( @@ -223,7 +100,7 @@ async def check_su_platform_authorisation(self, ctx: "TeXBotApplicationContext") ) -class CheckSUPlatformAuthorisationTaskCog(CheckSUPlatformAuthorisationBaseCog): +class CheckSUPlatformAuthorisationTaskCog(TeXBotBaseCog): """Cog class defining a repeated task for checking SU platform access cookie.""" @override @@ -255,7 +132,7 @@ async def su_platform_access_cookie_check_task(self) -> None: logger.debug("Running SU platform access cookie check task...") su_platform_access_cookie_status: tuple[int, str] = ( - await self.get_su_platform_access_cookie_status() + await get_su_platform_access_cookie_status() ).value logger.log( diff --git a/cogs/make_member.py b/cogs/make_member.py index 670d4d015..1d1b754db 100644 --- a/cogs/make_member.py +++ b/cogs/make_member.py @@ -4,16 +4,14 @@ import re from typing import TYPE_CHECKING -import aiohttp -import bs4 import discord -from bs4 import BeautifulSoup from django.core.exceptions import ValidationError from config import settings from db.core.models import GroupMadeMember from exceptions import ApplicantRoleDoesNotExistError, GuestRoleDoesNotExistError -from utils import GLOBAL_SSL_CONTEXT, CommandChecks, TeXBotBaseCog +from utils import CommandChecks, TeXBotBaseCog +from utils.msl import get_membership_count, is_student_id_member if TYPE_CHECKING: from collections.abc import Mapping, Sequence @@ -154,56 +152,7 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st ) return - guild_member_ids: set[str] = set() - - async with ( - aiohttp.ClientSession( - headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES - ) as http_session, - http_session.get( - url=GROUPED_MEMBERS_URL, ssl=GLOBAL_SSL_CONTEXT - ) as http_response, - ): - response_html: str = await http_response.text() - - MEMBER_HTML_TABLE_IDS: Final[frozenset[str]] = frozenset( - { - "ctl00_Main_rptGroups_ctl05_gvMemberships", - "ctl00_Main_rptGroups_ctl03_gvMemberships", - "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl03_gvMemberships", - "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl05_gvMemberships", - } - ) - table_id: str - for table_id in MEMBER_HTML_TABLE_IDS: - parsed_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, "html.parser" - ).find("table", {"id": table_id}) - - if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): - continue - - guild_member_ids.update( - row.contents[2].text - for row in parsed_html.find_all("tr", {"class": ["msl_row", "msl_altrow"]}) - ) - - guild_member_ids.discard("") - guild_member_ids.discard("\n") - guild_member_ids.discard(" ") - - if not guild_member_ids: - await self.command_send_error( - ctx, - error_code="E1041", - logging_message=OSError( - "The guild member IDs could not be retrieved from " - "the MEMBERS_LIST_URL." - ), - ) - return - - if group_member_id not in guild_member_ids: + if not await is_student_id_member(student_id=group_member_id): await self.command_send_error( ctx, message=( @@ -276,53 +225,9 @@ async def member_count(self, ctx: "TeXBotApplicationContext") -> None: # type: await ctx.defer(ephemeral=False) async with ctx.typing(): - async with ( - aiohttp.ClientSession( - headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES - ) as http_session, - http_session.get( - url=BASE_MEMBERS_URL, ssl=GLOBAL_SSL_CONTEXT - ) as http_response, - ): - response_html: str = await http_response.text() - - member_list_div: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, "html.parser" - ).find("div", {"class": "memberlistcol"}) - - if member_list_div is None or isinstance(member_list_div, bs4.NavigableString): - await self.command_send_error( - ctx=ctx, - error_code="E1041", - logging_message=OSError( - "The member count could not be retrieved from the MEMBERS_LIST_URL." - ), - ) - return - - if "showing 100 of" in member_list_div.text.lower(): - member_count: str = member_list_div.text.split(" ")[3] - await ctx.followup.send( - content=f"{self.bot.group_full_name} has {member_count} members! :tada:" - ) - return - - member_table: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, "html.parser" - ).find("table", {"id": "ctl00_ctl00_Main_AdminPageContent_gvMembers"}) - - if member_table is None or isinstance(member_table, bs4.NavigableString): - await self.command_send_error( - ctx=ctx, - error_code="E1041", - logging_message=OSError( - "The member count could not be retrieved from the MEMBERS_LIST_URL." - ), - ) - return - await ctx.followup.send( - content=f"{self.bot.group_full_name} has { - len(member_table.find_all('tr', {'class': ['msl_row', 'msl_altrow']})) - } members! :tada:" + content=( + f"{self.bot.group_full_name} has " + f"{await get_membership_count()} members! :tada:" + ) ) diff --git a/utils/msl/__init__.py b/utils/msl/__init__.py new file mode 100644 index 000000000..c7c8bd213 --- /dev/null +++ b/utils/msl/__init__.py @@ -0,0 +1,18 @@ +"""MSL utility classes & functions provided for use across the whole of the project.""" + +from typing import TYPE_CHECKING + +from .authorisation import get_su_platform_access_cookie_status, get_su_platform_organisations +from .memberships import get_full_membership_list, get_membership_count, is_student_id_member + +if TYPE_CHECKING: + from collections.abc import Sequence + +__all__: "Sequence[str]" = ( + "GLOBAL_SSL_CONTEXT", + "get_full_membership_list", + "get_membership_count", + "get_su_platform_access_cookie_status", + "get_su_platform_organisations", + "is_student_id_member", +) diff --git a/utils/msl/authorisation.py b/utils/msl/authorisation.py new file mode 100644 index 000000000..eeb63b469 --- /dev/null +++ b/utils/msl/authorisation.py @@ -0,0 +1,135 @@ +"""Module for authorisation checks.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +import bs4 + +from cogs.check_su_platform_authorisation import SUPlatformAccessCookieStatus +from config import settings +from utils import GLOBAL_SSL_CONTEXT + +from .core import BASE_COOKIES, BASE_HEADERS + +if TYPE_CHECKING: + from collections.abc import Iterable, Sequence + from logging import Logger + from typing import Final + + +__all__: "Sequence[str]" = ( + "get_su_platform_access_cookie_status", + "get_su_platform_organisations", +) + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +SU_PLATFORM_PROFILE_URL: "Final[str]" = "https://guildofstudents.com/profile" +SU_PLATFORM_ORGANISATION_URL: "Final[str]" = ( + "https://www.guildofstudents.com/organisation/admin" +) + + +async def _fetch_url_content_with_session(url: str) -> str: + """Fetch the HTTP content at the given URL, using a shared aiohttp session.""" + async with ( + aiohttp.ClientSession(headers=BASE_HEADERS, cookies=BASE_COOKIES) as http_session, + http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as http_response, + ): + return await http_response.text() + + +async def get_su_platform_access_cookie_status() -> SUPlatformAccessCookieStatus: + """Retrieve the current validity status of the SU platform access cookie.""" + response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( + await _fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" + ) + page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") + if not page_title or "Login" in str(page_title): + logger.debug("Token is invalid or expired.") + return SUPlatformAccessCookieStatus.INVALID + + organisation_admin_url: str = ( + f"{SU_PLATFORM_ORGANISATION_URL}/{settings['ORGANISATION_ID']}" + ) + response_html: str = await _fetch_url_content_with_session(organisation_admin_url) + + if "admin tools" in response_html.lower(): + return SUPlatformAccessCookieStatus.AUTHORISED + + if "You do not have any permissions for this organisation" in response_html.lower(): + return SUPlatformAccessCookieStatus.VALID + + logger.warning( + "Unexpected response when checking SU platform access cookie authorisation." + ) + return SUPlatformAccessCookieStatus.INVALID + + +async def get_su_platform_organisations() -> "Iterable[str]": + """Retrieve the MSL organisations the current SU platform cookie has access to.""" + response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( + await _fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" + ) + + page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") + + if not page_title: + logger.warning( + "Profile page returned no content when checking " + "SU platform access cookie's authorisation." + ) + return () + + if "Login" in str(page_title): + logger.warning( + "Authentication redirected to login page. " + "SU platform access cookie is invalid or expired." + ) + return () + + profile_section_html: bs4.Tag | bs4.NavigableString | None = response_object.find( + "div", {"id": "profile_main"} + ) + + if profile_section_html is None: + logger.warning( + "Couldn't find the profile section of the user " + "when scraping the SU platform's website HTML." + ) + logger.debug("Retrieved HTML: %s", response_object.text) + return () + + user_name: bs4.Tag | bs4.NavigableString | int | None = profile_section_html.find("h1") + + if not isinstance(user_name, bs4.Tag): + logger.warning("Found user profile on the SU platform but couldn't find their name.") + logger.debug("Retrieved HTML: %s", response_object.text) + return () + + parsed_html: bs4.Tag | bs4.NavigableString | None = response_object.find( + "ul", {"id": "ulOrgs"} + ) + + if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): + NO_ADMIN_TABLE_MESSAGE: Final[str] = ( + f"Failed to retrieve the admin table for user: {user_name.string}. " + "Please check you have used the correct SU platform access token!" + ) + logger.warning(NO_ADMIN_TABLE_MESSAGE) + return () + + organisations: Iterable[str] = [ + list_item.get_text(strip=True) for list_item in parsed_html.find_all("li") + ] + + logger.debug( + "SU platform access cookie has admin authorisation to: %s as user %s", + organisations, + user_name.text, + ) + + return organisations diff --git a/utils/msl/core.py b/utils/msl/core.py new file mode 100644 index 000000000..09308aa22 --- /dev/null +++ b/utils/msl/core.py @@ -0,0 +1,64 @@ +"""Functions to enable interaction with MSL based SU websites.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +from bs4 import BeautifulSoup + +from config import settings +from utils import GLOBAL_SSL_CONTEXT + +if TYPE_CHECKING: + from collections.abc import Mapping, Sequence + from http.cookies import Morsel + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = () + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +BASE_HEADERS: "Final[Mapping[str, str]]" = { + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Expires": "0", +} + +BASE_COOKIES: "Final[Mapping[str, str]]" = { + ".ASPXAUTH": settings["SU_PLATFORM_ACCESS_COOKIE"], +} + +ORGANISATION_ID: "Final[str]" = settings["ORGANISATION_ID"] + +ORGANISATION_ADMIN_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/organisation/admin/{ORGANISATION_ID}/" +) + + +async def get_msl_context(url: str) -> tuple[dict[str, str], dict[str, str]]: + """Get the required context headers, data and cookies to make a request to MSL.""" + http_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=BASE_COOKIES, + ) + data_fields: dict[str, str] = {} + cookies: dict[str, str] = {} + async with http_session, http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as field_data: + data_response = BeautifulSoup( + markup=await field_data.text(), + features="html.parser", + ) + + for field in data_response.find_all(name="input"): + if field.get("name") and field.get("value"): + data_fields[field.get("name")] = field.get("value") + + for cookie in field_data.cookies: + cookie_morsel: Morsel[str] | None = field_data.cookies.get(cookie) + if cookie_morsel is not None: + cookies[cookie] = cookie_morsel.value + cookies[".ASPXAUTH"] = settings["MEMBERS_LIST_AUTH_SESSION_COOKIE"] + + return data_fields, cookies diff --git a/utils/msl/memberships.py b/utils/msl/memberships.py new file mode 100644 index 000000000..076914cfa --- /dev/null +++ b/utils/msl/memberships.py @@ -0,0 +1,107 @@ +"""Module for checking membership status.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +import bs4 +from bs4 import BeautifulSoup + +from utils import GLOBAL_SSL_CONTEXT + +from .core import BASE_COOKIES, BASE_HEADERS, ORGANISATION_ID + +if TYPE_CHECKING: + from collections.abc import Sequence + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = ( + "get_full_membership_list", + "get_membership_count", + "is_student_id_member", +) + +MEMBERS_LIST_URL: "Final[str]" = ( + f"https://guildofstudents.com/organisation/memberlist/{ORGANISATION_ID}/?sort=groups" +) + +membership_list_cache: set[tuple[str, int]] = set() + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +async def get_full_membership_list() -> set[tuple[str, int]]: + """Get a list of tuples of student ID to names.""" + async with ( + aiohttp.ClientSession(headers=BASE_HEADERS, cookies=BASE_COOKIES) as http_session, + http_session.get(url=MEMBERS_LIST_URL, ssl=GLOBAL_SSL_CONTEXT) as http_response, + ): + response_html: str = await http_response.text() + + parsed_html: BeautifulSoup = BeautifulSoup(markup=response_html, features="html.parser") + + standard_members_table: bs4.Tag | bs4.NavigableString | None = parsed_html.find( + name="table", + attrs={"id": "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl03_gvMemberships"}, + ) + + all_members_table: bs4.Tag | bs4.NavigableString | None = parsed_html.find( + name="table", + attrs={"id": "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl05_gvMemberships"}, + ) + + if standard_members_table is None or all_members_table is None: + logger.warning("One or both of the membership tables could not be found!") + logger.debug(response_html) + return set() + + if isinstance(standard_members_table, bs4.NavigableString) or isinstance( + all_members_table, bs4.NavigableString + ): + logger.warning( + "Both membership tables were found but one or both are the wrong format!", + ) + logger.debug(standard_members_table) + logger.debug(all_members_table) + return set() + + standard_members: list[bs4.Tag] = standard_members_table.find_all(name="tr") + all_members: list[bs4.Tag] = all_members_table.find_all(name="tr") + + standard_members.pop(0) + all_members.pop(0) + + member_list: set[tuple[str, int]] = { + ( + member.find_all(name="td")[0].text.strip(), + member.find_all(name="td")[ + 1 + ].text.strip(), # NOTE: This will not properly handle external members who do not have an ID... There does not appear to be a solution to this other than simply checking manually. + ) + for member in standard_members + all_members + } + + membership_list_cache.clear() + membership_list_cache.update(member_list) + + return member_list + + +async def is_student_id_member(student_id: str | int) -> bool: + """Check if the student ID is a member of the society.""" + all_ids: set[str] = {str(member[1]) for member in membership_list_cache} + + if str(student_id) in all_ids: + return True + + logger.debug("Student ID %s not found in cache, fetching updated list.", student_id) + + new_ids: set[str] = {str(member[1]) for member in await get_full_membership_list()} + + return str(student_id) in new_ids + + +async def get_membership_count() -> int: + """Return the total number of members.""" + return len(await get_full_membership_list())