diff --git a/.github/workflows/pr-python-connector-tests.yml b/.github/workflows/pr-python-connector-tests.yml index 7b3dbf97c1e..aea2f89c473 100644 --- a/.github/workflows/pr-python-connector-tests.yml +++ b/.github/workflows/pr-python-connector-tests.yml @@ -53,6 +53,12 @@ env: # Hubspot HUBSPOT_ACCESS_TOKEN: ${{ secrets.HUBSPOT_ACCESS_TOKEN }} + # IMAP + IMAP_HOST: ${{ secrets.IMAP_HOST }} + IMAP_USERNAME: ${{ secrets.IMAP_USERNAME }} + IMAP_PASSWORD: ${{ secrets.IMAP_PASSWORD }} + IMAP_MAILBOXES: ${{ secrets.IMAP_MAILBOXES }} + # Airtable AIRTABLE_TEST_BASE_ID: ${{ secrets.AIRTABLE_TEST_BASE_ID }} AIRTABLE_TEST_TABLE_ID: ${{ secrets.AIRTABLE_TEST_TABLE_ID }} diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 39ace0d7aeb..4c26f0a710c 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -186,6 +186,8 @@ class DocumentSource(str, Enum): AIRTABLE = "airtable" HIGHSPOT = "highspot" + IMAP = "imap" + # Special case just for integration tests MOCK_CONNECTOR = "mock_connector" diff --git a/backend/onyx/connectors/imap/__init__.py b/backend/onyx/connectors/imap/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py new file mode 100644 index 00000000000..a5b8b2ef9c9 --- /dev/null +++ b/backend/onyx/connectors/imap/connector.py @@ -0,0 +1,447 @@ +import copy +import email +import imaplib +import os +import re +from datetime import datetime +from datetime import timezone +from email.message import Message +from email.utils import parseaddr +from enum import Enum +from typing import Any +from typing import cast + +import bs4 + +from onyx.access.models import ExternalAccess +from onyx.configs.constants import DocumentSource +from onyx.connectors.imap.models import EmailHeaders +from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync +from onyx.connectors.interfaces import CheckpointOutput +from onyx.connectors.interfaces import CredentialsConnector +from onyx.connectors.interfaces import CredentialsProviderInterface +from onyx.connectors.interfaces import SecondsSinceUnixEpoch +from onyx.connectors.models import BasicExpertInfo +from onyx.connectors.models import ConnectorCheckpoint +from onyx.connectors.models import Document +from onyx.connectors.models import TextSection +from onyx.utils.logger import setup_logger + +logger = setup_logger() + + +_DEFAULT_IMAP_PORT_NUMBER = int(os.environ.get("IMAP_PORT", 993)) +_IMAP_OKAY_STATUS = "OK" +_PAGE_SIZE = 100 + + +# An email has a list of mailboxes. +# Each mailbox has a list of email-ids inside of it. +# +# Usage: +# To use this checkpointer, first fetch all the mailboxes. +# Then, pop a mailbox and fetch all of its email-ids. +# Then, pop each email-id and fetch its content (and parse it, etc..). +# When you have popped all email-ids for this mailbox, pop the next mailbox and repeat the above process until you're done. +# +# For initial checkpointing, set both fields to `None`. +class ImapCheckpoint(ConnectorCheckpoint): + todo_mailboxes: list[str] | None = None + todo_email_ids: list[str] | None = None + + +class LoginState(str, Enum): + LoggedIn = "logged_in" + LoggedOut = "logged_out" + + +class ImapConnector( + CredentialsConnector, + CheckpointedConnectorWithPermSync[ImapCheckpoint], +): + def __init__( + self, + host: str, + port: int = _DEFAULT_IMAP_PORT_NUMBER, + mailboxes: list[str] | None = None, + ) -> None: + self._host = host + self._port = port + self._mailboxes = mailboxes + self._credentials: dict[str, Any] | None = None + self._mail_client: imaplib.IMAP4_SSL | None = None + self._login_state: LoginState = LoginState.LoggedOut + + @property + def mail_client(self) -> imaplib.IMAP4_SSL: + if not self._mail_client: + raise RuntimeError( + "No mail-client has been initialized; call `set_credentials_provider` first" + ) + return self._mail_client + + @property + def credentials(self) -> dict[str, Any]: + if not self._credentials: + raise RuntimeError( + "Credentials have not been initialized; call `set_credentials_provider` first" + ) + return self._credentials + + def _login(self) -> None: + def get_or_raise(name: str) -> str: + value = self.credentials.get(name) + if not value: + raise RuntimeError(f"Credential item {name=} was not found") + if not isinstance(value, str): + raise RuntimeError( + f"Credential item {name=} must be of type str, instead received {type(name)=}" + ) + return value + + if self._login_state == LoginState.LoggedIn: + return + + username = get_or_raise("username") + password = get_or_raise("password") + + self._login_state = LoginState.LoggedIn + self.mail_client.login(user=username, password=password) + + def _logout(self) -> None: + if self._login_state == LoginState.LoggedOut: + return + + self._login_state = LoginState.LoggedOut + self.mail_client.logout() + + def _load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ImapCheckpoint, + include_perm_sync: bool, + ) -> CheckpointOutput[ImapCheckpoint]: + checkpoint = cast(ImapCheckpoint, copy.deepcopy(checkpoint)) + checkpoint.has_more = True + + self._login() + + if checkpoint.todo_mailboxes is None: + # This is the dummy checkpoint. + # Fill it with mailboxes first. + if self._mailboxes: + checkpoint.todo_mailboxes = _sanitize_mailbox_names(self._mailboxes) + else: + fetched_mailboxes = _fetch_all_mailboxes_for_email_account( + mail_client=self.mail_client + ) + if not fetched_mailboxes: + raise RuntimeError( + "Failed to find any mailboxes for this email account" + ) + checkpoint.todo_mailboxes = _sanitize_mailbox_names(fetched_mailboxes) + + return checkpoint + + if not checkpoint.todo_email_ids: + if not checkpoint.todo_mailboxes: + checkpoint.has_more = False + return checkpoint + + mailbox = checkpoint.todo_mailboxes.pop() + checkpoint.todo_email_ids = _fetch_email_ids_in_mailbox( + mail_client=self.mail_client, + mailbox=mailbox, + start=start, + end=end, + ) + + current_todos = cast( + list, copy.deepcopy(checkpoint.todo_email_ids[:_PAGE_SIZE]) + ) + checkpoint.todo_email_ids = checkpoint.todo_email_ids[_PAGE_SIZE:] + + for email_id in current_todos: + email_msg = _fetch_email(mail_client=self.mail_client, email_id=email_id) + if not email_msg: + logger.warn(f"Failed to fetch message {email_id=}; skipping") + continue + + email_headers = EmailHeaders.from_email_msg(email_msg=email_msg) + + yield _convert_email_headers_and_body_into_document( + email_msg=email_msg, + email_headers=email_headers, + include_perm_sync=include_perm_sync, + ) + + return checkpoint + + # impls for BaseConnector + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + raise NotImplementedError("Use `set_credentials_provider` instead") + + def validate_connector_settings(self) -> None: + self._login() + self._logout() + + # impls for CredentialsConnector + + def set_credentials_provider( + self, credentials_provider: CredentialsProviderInterface + ) -> None: + self._credentials = credentials_provider.get_credentials() + self._mail_client = imaplib.IMAP4_SSL(host=self._host, port=self._port) + + # impls for CheckpointedConnector + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ImapCheckpoint, + ) -> CheckpointOutput[ImapCheckpoint]: + return self._load_from_checkpoint( + start=start, end=end, checkpoint=checkpoint, include_perm_sync=False + ) + + def build_dummy_checkpoint(self) -> ImapCheckpoint: + return ImapCheckpoint(has_more=True) + + def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: + return ImapCheckpoint.model_validate_json(json_data=checkpoint_json) + + # impls for CheckpointedConnectorWithPermSync + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ImapCheckpoint, + ) -> CheckpointOutput[ImapCheckpoint]: + return self._load_from_checkpoint( + start=start, end=end, checkpoint=checkpoint, include_perm_sync=True + ) + + +def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: + status, mailboxes_data = mail_client.list(directory="*", pattern="*") + if status != _IMAP_OKAY_STATUS: + raise RuntimeError(f"Failed to fetch mailboxes; {status=}") + + mailboxes = [] + + for mailboxes_raw in mailboxes_data: + if isinstance(mailboxes_raw, bytes): + mailboxes_str = mailboxes_raw.decode() + elif isinstance(mailboxes_raw, str): + mailboxes_str = mailboxes_raw + else: + logger.warn( + f"Expected the mailbox data to be of type str, instead got {type(mailboxes_raw)=} {mailboxes_raw}; skipping" + ) + continue + + # The mailbox LIST response output can be found here: + # https://www.rfc-editor.org/rfc/rfc3501.html#section-7.2.2 + # + # The general format is: + # `() ` + # + # The below regex matches on that pattern; from there, we select the 3rd match (index 2), which is the mailbox-name. + match = re.match(r'\([^)]*\)\s+"([^"]+)"\s+"?(.+?)"?$', mailboxes_str) + if not match: + logger.warn( + f"Invalid mailbox-data formatting structure: {mailboxes_str=}; skipping" + ) + continue + + mailbox = match.group(2) + mailboxes.append(mailbox) + + return mailboxes + + +def _fetch_email_ids_in_mailbox( + mail_client: imaplib.IMAP4_SSL, + mailbox: str, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, +) -> list[str]: + status, _ids = mail_client.select(mailbox=mailbox, readonly=True) + if status != _IMAP_OKAY_STATUS: + raise RuntimeError(f"Failed to select {mailbox=}") + + start_str = datetime.fromtimestamp(start, tz=timezone.utc).strftime("%d-%b-%Y") + end_str = datetime.fromtimestamp(end, tz=timezone.utc).strftime("%d-%b-%Y") + search_criteria = f'(SINCE "{start_str}" BEFORE "{end_str}")' + + status, email_ids_byte_array = mail_client.search(None, search_criteria) + + if status != _IMAP_OKAY_STATUS or not email_ids_byte_array: + raise RuntimeError(f"Failed to fetch email ids; {status=}") + + email_ids: bytes = email_ids_byte_array[0] + + return [email_id.decode() for email_id in email_ids.split()] + + +def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | None: + status, msg_data = mail_client.fetch(message_set=email_id, message_parts="(RFC822)") + if status != _IMAP_OKAY_STATUS or not msg_data: + return None + + data = msg_data[0] + if not isinstance(data, tuple): + raise RuntimeError( + f"Message data should be a tuple; instead got a {type(data)=} {data=}" + ) + + _metadata, raw_email = data + return email.message_from_bytes(raw_email) + + +def _convert_email_headers_and_body_into_document( + email_msg: Message, + email_headers: EmailHeaders, + include_perm_sync: bool, +) -> Document: + _sender_name, sender_addr = _parse_singular_addr(raw_header=email_headers.sender) + parsed_recipients = _parse_addrs(raw_header=email_headers.recipients) + + email_body = _parse_email_body(email_msg=email_msg, email_headers=email_headers) + primary_owners = [ + BasicExpertInfo(display_name=recipient_name, email=recipient_addr) + for recipient_name, recipient_addr in parsed_recipients + ] + external_access = ( + ExternalAccess( + external_user_emails=set(addr for _name, addr in parsed_recipients), + external_user_group_ids=set(), + is_public=False, + ) + if include_perm_sync + else None + ) + + return Document( + id=email_headers.id, + title=email_headers.subject, + semantic_identifier=email_headers.subject, + metadata={}, + source=DocumentSource.IMAP, + sections=[TextSection(text=email_body)], + primary_owners=primary_owners, + external_access=external_access, + ) + + +def _parse_email_body( + email_msg: Message, + email_headers: EmailHeaders, +) -> str: + body = None + for part in email_msg.walk(): + if part.is_multipart(): + # Multipart parts are *containers* for other parts, not the actual content itself. + # Therefore, we skip until we find the individual parts instead. + continue + + charset = part.get_content_charset() or "utf-8" + + try: + raw_payload = part.get_payload(decode=True) + if not isinstance(raw_payload, bytes): + logger.warn( + "Payload section from email was expected to be an array of bytes, instead got " + f"{type(raw_payload)=}, {raw_payload=}" + ) + continue + body = raw_payload.decode(charset) + break + except (UnicodeDecodeError, LookupError) as e: + print(f"Warning: Could not decode part with charset {charset}. Error: {e}") + continue + + if not body: + logger.warn( + f"Email with {email_headers.id=} has an empty body; returning an empty string" + ) + return "" + + soup = bs4.BeautifulSoup(markup=body, features="html.parser") + + return " ".join(str_section for str_section in soup.stripped_strings) + + +def _sanitize_mailbox_names(mailboxes: list[str]) -> list[str]: + """ + Mailboxes with special characters in them must be enclosed by double-quotes, as per the IMAP protocol. + Just to be safe, we wrap *all* mailboxes with double-quotes. + """ + return [f'"{mailbox}"' for mailbox in mailboxes if mailbox] + + +def _parse_addrs(raw_header: str) -> list[tuple[str, str]]: + addrs = raw_header.split(",") + name_addr_pairs = [parseaddr(addr=addr, strict=True) for addr in addrs if addr] + return [(name, addr) for name, addr in name_addr_pairs if addr] + + +def _parse_singular_addr(raw_header: str) -> tuple[str, str]: + addrs = _parse_addrs(raw_header=raw_header) + if not addrs: + raise RuntimeError( + f"Parsing email header resulted in no addresses being found; {raw_header=}" + ) + elif len(addrs) >= 2: + raise RuntimeError( + f"Expected a singular address, but instead got multiple; {raw_header=} {addrs=}" + ) + + return addrs[0] + + +if __name__ == "__main__": + import time + from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector + from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider + + host = os.environ.get("IMAP_HOST") + mailboxes_str = os.environ.get("IMAP_MAILBOXES") + username = os.environ.get("IMAP_USERNAME") + password = os.environ.get("IMAP_PASSWORD") + + mailboxes = ( + [mailbox.strip() for mailbox in mailboxes_str.split(",")] + if mailboxes_str + else [] + ) + + if not host: + raise RuntimeError("`IMAP_HOST` must be set") + + imap_connector = ImapConnector( + host=host, + mailboxes=mailboxes, + ) + + imap_connector.set_credentials_provider( + OnyxStaticCredentialsProvider( + tenant_id=None, + connector_name=DocumentSource.IMAP, + credential_json={ + "username": username, + "password": password, + }, + ) + ) + + for doc in load_all_docs_from_checkpoint_connector( + connector=imap_connector, + start=0, + end=time.time(), + ): + print(doc) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py new file mode 100644 index 00000000000..d07ee9bd202 --- /dev/null +++ b/backend/onyx/connectors/imap/models.py @@ -0,0 +1,75 @@ +import email +from datetime import datetime +from email.message import Message +from enum import Enum + +from pydantic import BaseModel + + +class Header(str, Enum): + SUBJECT_HEADER = "subject" + FROM_HEADER = "from" + TO_HEADER = "to" + DELIVERED_TO_HEADER = ( + "Delivered-To" # Used in mailing lists instead of the "to" header. + ) + DATE_HEADER = "date" + MESSAGE_ID_HEADER = "Message-ID" + + +class EmailHeaders(BaseModel): + """ + Model for email headers extracted from IMAP messages. + """ + + id: str + subject: str + sender: str + recipients: str + date: datetime + + @classmethod + def from_email_msg(cls, email_msg: Message) -> "EmailHeaders": + def _decode(header: str, default: str | None = None) -> str | None: + value = email_msg.get(header, default) + if not value: + return None + + decoded_value, _encoding = email.header.decode_header(value)[0] + + if isinstance(decoded_value, bytes): + return decoded_value.decode() + elif isinstance(decoded_value, str): + return decoded_value + else: + return None + + def _parse_date(date_str: str | None) -> datetime | None: + if not date_str: + return None + try: + return email.utils.parsedate_to_datetime(date_str) + except (TypeError, ValueError): + return None + + message_id = _decode(header=Header.MESSAGE_ID_HEADER) + # It's possible for the subject line to not exist or be an empty string. + subject = _decode(header=Header.SUBJECT_HEADER) or "Unknown Subject" + from_ = _decode(header=Header.FROM_HEADER) + to = _decode(header=Header.TO_HEADER) + if not to: + to = _decode(header=Header.DELIVERED_TO_HEADER) + date_str = _decode(header=Header.DATE_HEADER) + date = _parse_date(date_str=date_str) + + # If any of the above are `None`, model validation will fail. + # Therefore, no guards (i.e.: `if
is None: raise RuntimeError(..)`) were written. + return cls.model_validate( + { + "id": message_id, + "subject": subject, + "sender": from_, + "recipients": to, + "date": date, + } + ) diff --git a/backend/tests/daily/connectors/imap/models.py b/backend/tests/daily/connectors/imap/models.py new file mode 100644 index 00000000000..42e1c547659 --- /dev/null +++ b/backend/tests/daily/connectors/imap/models.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel + +from onyx.connectors.models import Document +from tests.daily.connectors.utils import to_text_sections + + +class EmailDoc(BaseModel): + subject: str + recipients: set[str] + body: str + + @classmethod + def from_doc(cls, document: Document) -> "EmailDoc": + # Acceptable to perform assertions since this class is only used in tests. + assert document.title + assert document.external_access + + body = " ".join(to_text_sections(iterator=iter(document.sections))) + + return cls( + subject=document.title, + recipients=document.external_access.external_user_emails, + body=body, + ) diff --git a/backend/tests/daily/connectors/imap/test_imap_connector.py b/backend/tests/daily/connectors/imap/test_imap_connector.py new file mode 100644 index 00000000000..ec74c9043a8 --- /dev/null +++ b/backend/tests/daily/connectors/imap/test_imap_connector.py @@ -0,0 +1,82 @@ +import os +import time + +import pytest + +from onyx.configs.constants import DocumentSource +from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider +from onyx.connectors.imap.connector import ImapConnector +from tests.daily.connectors.imap.models import EmailDoc +from tests.daily.connectors.utils import ( + load_all_docs_from_checkpoint_connector_with_perm_sync, +) +from tests.daily.connectors.utils import to_documents + + +@pytest.fixture +def imap_connector() -> ImapConnector: + host = os.environ.get("IMAP_HOST") + mailboxes_str = os.environ.get("IMAP_MAILBOXES") + username = os.environ.get("IMAP_USERNAME") + password = os.environ.get("IMAP_PASSWORD") + + assert host + mailboxes = ( + [mailbox.strip() for mailbox in mailboxes_str.split(",") if mailbox] + if mailboxes_str + else [] + ) + + imap_connector = ImapConnector( + host=host, + mailboxes=mailboxes, + ) + imap_connector.set_credentials_provider( + OnyxStaticCredentialsProvider( + tenant_id=None, + connector_name=DocumentSource.IMAP, + credential_json={ + "username": username, + "password": password, + }, + ) + ) + + return imap_connector + + +@pytest.mark.parametrize( + "expected_email_docs", + [ + [ + EmailDoc( + subject="Testing", + recipients=set(["admin@onyx-test.com"]), + body="Hello, testing.", + ), + EmailDoc( + subject="Hello world", + recipients=set(["admin@onyx-test.com", "r@rabh.io"]), + body='Hello world, this is an email that contains multiple "To" recipients.', + ), + ] + ], +) +def test_imap_connector( + imap_connector: ImapConnector, + expected_email_docs: list[EmailDoc], +) -> None: + actual_email_docs = [ + EmailDoc.from_doc(document=document) + for document in to_documents( + iterator=iter( + load_all_docs_from_checkpoint_connector_with_perm_sync( + connector=imap_connector, + start=0, + end=time.time(), + ) + ) + ) + ] + + assert actual_email_docs == expected_email_docs diff --git a/backend/tests/daily/connectors/teams/models.py b/backend/tests/daily/connectors/teams/models.py index d7f99995179..a1bfd8481c1 100644 --- a/backend/tests/daily/connectors/teams/models.py +++ b/backend/tests/daily/connectors/teams/models.py @@ -10,7 +10,6 @@ class TeamsThread(BaseModel): @classmethod def from_doc(cls, document: Document) -> "TeamsThread": - assert ( document.external_access ), f"ExternalAccess should always be available, instead got {document=}" diff --git a/backend/tests/daily/connectors/utils.py b/backend/tests/daily/connectors/utils.py index 768fe8d1fd7..388d567580d 100644 --- a/backend/tests/daily/connectors/utils.py +++ b/backend/tests/daily/connectors/utils.py @@ -1,3 +1,4 @@ +from collections.abc import Callable from collections.abc import Iterator from typing import cast from typing import TypeVar @@ -5,6 +6,7 @@ from onyx.connectors.connector_runner import CheckpointOutputWrapper from onyx.connectors.interfaces import CheckpointedConnector from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync +from onyx.connectors.interfaces import CheckpointOutput from onyx.connectors.interfaces import SecondsSinceUnixEpoch from onyx.connectors.models import ConnectorCheckpoint from onyx.connectors.models import ConnectorFailure @@ -15,21 +17,19 @@ _ITERATION_LIMIT = 100_000 CT = TypeVar("CT", bound=ConnectorCheckpoint) +LoadFunction = Callable[[CT], CheckpointOutput[CT]] -def load_all_docs_from_checkpoint_connector( +def _load_all_docs( connector: CheckpointedConnector[CT], - start: SecondsSinceUnixEpoch, - end: SecondsSinceUnixEpoch, + load: LoadFunction, ) -> list[Document]: num_iterations = 0 checkpoint = cast(CT, connector.build_dummy_checkpoint()) documents: list[Document] = [] while checkpoint.has_more: - doc_batch_generator = CheckpointOutputWrapper[CT]()( - connector.load_from_checkpoint(start, end, checkpoint) - ) + doc_batch_generator = CheckpointOutputWrapper[CT]()(load(checkpoint)) for document, failure, next_checkpoint in doc_batch_generator: if failure is not None: raise RuntimeError(f"Failed to load documents: {failure}") @@ -45,6 +45,32 @@ def load_all_docs_from_checkpoint_connector( return documents +def load_all_docs_from_checkpoint_connector_with_perm_sync( + connector: CheckpointedConnectorWithPermSync[CT], + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, +) -> list[Document]: + return _load_all_docs( + connector=connector, + load=lambda checkpoint: connector.load_from_checkpoint_with_perm_sync( + start=start, end=end, checkpoint=checkpoint + ), + ) + + +def load_all_docs_from_checkpoint_connector( + connector: CheckpointedConnector[CT], + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, +) -> list[Document]: + return _load_all_docs( + connector=connector, + load=lambda checkpoint: connector.load_from_checkpoint( + start=start, end=end, checkpoint=checkpoint + ), + ) + + def load_everything_from_checkpoint_connector( connector: CheckpointedConnector[CT], start: SecondsSinceUnixEpoch,