From aec30498c8de0fe451994c2bf7d1f396dde25c0d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 3 Jul 2025 10:32:32 -0700 Subject: [PATCH 01/31] Implement fetching; still need to work on document parsing --- backend/onyx/configs/constants.py | 2 + backend/onyx/connectors/imap/__init__.py | 0 backend/onyx/connectors/imap/connector.py | 192 ++++++++++++++++++++++ backend/onyx/connectors/imap/models.py | 68 ++++++++ 4 files changed, 262 insertions(+) create mode 100644 backend/onyx/connectors/imap/__init__.py create mode 100644 backend/onyx/connectors/imap/connector.py create mode 100644 backend/onyx/connectors/imap/models.py diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 39ace0d7aeb..4c26f0a710c 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -186,6 +186,8 @@ class DocumentSource(str, Enum): AIRTABLE = "airtable" HIGHSPOT = "highspot" + IMAP = "imap" + # Special case just for integration tests MOCK_CONNECTOR = "mock_connector" diff --git a/backend/onyx/connectors/imap/__init__.py b/backend/onyx/connectors/imap/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py new file mode 100644 index 00000000000..df1f3051a0a --- /dev/null +++ b/backend/onyx/connectors/imap/connector.py @@ -0,0 +1,192 @@ +import email +import imaplib +from email.message import Message +from email.utils import parseaddr +from typing import Any + +from onyx.access.models import ExternalAccess +from onyx.configs.constants import DocumentSource +from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider +from onyx.connectors.imap.models import EmailHeaders +from onyx.connectors.interfaces import CheckpointedConnector +from onyx.connectors.interfaces import CheckpointOutput +from onyx.connectors.interfaces import CredentialsConnector +from onyx.connectors.interfaces import CredentialsProviderInterface +from onyx.connectors.interfaces import SecondsSinceUnixEpoch +from onyx.connectors.models import BasicExpertInfo +from onyx.connectors.models import ConnectorCheckpoint +from onyx.connectors.models import Document +from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector + + +DEFAULT_IMAP_PORT_NUMBER = 993 +IMAP_OKAY_STATUS = "OK" + + +class ImapCheckpoint(ConnectorCheckpoint): ... + + +class ImapConnector( + CheckpointedConnector[ImapCheckpoint], + CredentialsConnector, +): + def __init__( + self, + host: str, + port: int = DEFAULT_IMAP_PORT_NUMBER, + ) -> None: + self._host = host + self._port = port + self._username: str | None = None + self._password: str | None = None + self._mail_client = imaplib.IMAP4_SSL(host=host, port=port) + + # impls for BaseConnector + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + raise NotImplementedError("Use `set_credentials_provider` instead") + + def validate_connector_settings(self) -> None: + raise NotImplementedError + + # impls for CredentialsConnector + + def set_credentials_provider( + self, credentials_provider: CredentialsProviderInterface + ) -> None: + credentials = credentials_provider.get_credentials() + + def get_or_raise(name: str) -> str: + value = credentials.get(name) + if not value: + raise RuntimeError(f"Credential item {name=} was not found") + if not isinstance(value, str): + raise RuntimeError( + f"Credential item {name=} must be of type str, instead received {type(name)=}" + ) + return value + + username = get_or_raise("username") + password = get_or_raise("password") + + self._mail_client.login(user=username, password=password) + + # impls for CheckpointedConnector + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ImapCheckpoint, + ) -> CheckpointOutput[ImapCheckpoint]: + status, _ids = self._mail_client.select("Inbox", readonly=True) + if status != IMAP_OKAY_STATUS: + raise RuntimeError + + status, email_ids = self._mail_client.search(None, "ALL") + + if status != IMAP_OKAY_STATUS or not email_ids: + raise RuntimeError + + for email_id in email_ids[0].split(): + status, msg_data = self._mail_client.fetch( + message_set=email_id, message_parts="(RFC822)" + ) + if status != IMAP_OKAY_STATUS or not msg_data: + continue + data = msg_data[0] + if not isinstance(data, tuple): + continue + _data, raw_email = data + + email_msg = email.message_from_bytes(raw_email) + email_headers = EmailHeaders.from_email_msg(email_msg=email_msg) + + yield _convert_email_headers_and_body_into_document( + email_msg=email_msg, email_headers=email_headers + ) + + return ImapCheckpoint(has_more=False) + + def build_dummy_checkpoint(self) -> ImapCheckpoint: + return ImapCheckpoint(has_more=True) + + def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: + raise NotImplementedError + + +def _convert_email_headers_and_body_into_document( + email_msg: Message, + email_headers: EmailHeaders, +) -> Document: + _sender_name, sender_addr = parseaddr(addr=email_headers.sender) + recipient_name, recipient_addr = parseaddr(addr=email_headers.recipient) + + semantic_identifier = ( + f"{sender_addr} to {recipient_addr} about {email_headers.subject}" + ) + + return Document( + id=semantic_identifier, + semantic_identifier=semantic_identifier, + metadata={}, + source=DocumentSource.IMAP, + sections=[], + primary_owners=[ + BasicExpertInfo( + display_name=recipient_name, + email=recipient_addr, + ) + ], + external_access=ExternalAccess( + external_user_emails=set([recipient_addr]), + external_user_group_ids=set(), + is_public=False, + ), + ) + + # _sender_name, sender_addr = parseaddr(email_headers.sender) + # _recipient_name, recipient_addr = parseaddr(email_headers.recipient) + # plain_text_body = "" + # for part in msg.walk(): + # email_headers = EmailHeaders.from_email_msg() + # ctype = part.get_content_type() + # cdisp = str(part.get('Content-Disposition')) + # if ctype == 'text/plain' and 'attachment' not in cdisp: + # try: + # plain_text_body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8') + # except (UnicodeDecodeError, AttributeError): + # plain_text_body = part.get_payload(decode=True).decode('latin-1', errors='ignore') + # break + + +if __name__ == "__main__": + import os + import time + + username = os.environ.get("IMAP_USERNAME") + password = os.environ.get("IMAP_PASSWORD") + oauth2_token = os.environ.get("IMAP_OAUTH2_TOKEN") + + imap_connector = ImapConnector( + host="imap.fastmail.com", + ) + + imap_connector.set_credentials_provider( + OnyxStaticCredentialsProvider( + tenant_id=None, + connector_name=DocumentSource.IMAP, + credential_json={ + "username": username, + "password": password, + "oauth2_token": oauth2_token, + }, + ) + ) + + for doc in load_all_docs_from_checkpoint_connector( + connector=imap_connector, + start=0, + end=time.time(), + ): + print(doc) + ... diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py new file mode 100644 index 00000000000..cee76a37131 --- /dev/null +++ b/backend/onyx/connectors/imap/models.py @@ -0,0 +1,68 @@ +import email +from datetime import datetime +from email.message import Message + +from pydantic import BaseModel + + +_SUBJECT_HEADER = "subject" +_FROM_HEADER = "from" +_TO_HEADER = "to" +_DATE_HEADER = "date" +_ENCODING_HEADER = "Content-Transfer-Encoding" +_CONTENT_TYPE_HEADER = "Content-Type" +_DEFAULT_ENCODING = "utf-8" + + +class EmailHeaders(BaseModel): + """ + Model for email headers extracted from IMAP messages. + """ + + subject: str + sender: str + recipient: str + date: datetime + + @classmethod + def from_email_msg(cls, email_msg: Message) -> "EmailHeaders": + def _decode(header: str, default: str | None = None) -> str | None: + value = email_msg.get(header, default) + if not value: + return None + + decoded_value, _encoding = email.header.decode_header(value)[0] + + if isinstance(decoded_value, bytes): + return decoded_value.decode() + elif isinstance(decoded_value, str): + return decoded_value + else: + return None + + def _parse_date(date_str: str | None) -> datetime | None: + if not date_str: + return None + try: + return email.utils.parsedate_to_datetime(date_str) + except (TypeError, ValueError): + return None + + # It's possible for the subject line to not exist or be an empty string. + subject = _decode(header=_SUBJECT_HEADER) or "Unknown Subject" + from_ = _decode(header=_FROM_HEADER) + to = _decode(header=_TO_HEADER) + date_str = _decode(header=_DATE_HEADER) + date = _parse_date(date_str) + content_type = _decode(header=_CONTENT_TYPE_HEADER) + _encoding = _decode(header=_ENCODING_HEADER, default=_DEFAULT_ENCODING) + + return cls.model_validate( + { + "subject": subject, + "sender": from_, + "recipient": to, + "date": date, + "content_type": content_type, + } + ) From c02ab710f1c9139c7c3eb4b314fa80a2fb226e2d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Thu, 3 Jul 2025 15:40:00 -0700 Subject: [PATCH 02/31] Add basic skeleton of parsing email bodies --- backend/onyx/connectors/imap/connector.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index df1f3051a0a..28273f02b37 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -16,6 +16,8 @@ from onyx.connectors.models import BasicExpertInfo from onyx.connectors.models import ConnectorCheckpoint from onyx.connectors.models import Document +from onyx.connectors.models import ImageSection +from onyx.connectors.models import TextSection from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector @@ -125,12 +127,14 @@ def _convert_email_headers_and_body_into_document( f"{sender_addr} to {recipient_addr} about {email_headers.subject}" ) + sections = _parse_email_body(email_msg=email_msg, email_headers=email_headers) + return Document( id=semantic_identifier, semantic_identifier=semantic_identifier, metadata={}, source=DocumentSource.IMAP, - sections=[], + sections=sections, primary_owners=[ BasicExpertInfo( display_name=recipient_name, @@ -144,6 +148,11 @@ def _convert_email_headers_and_body_into_document( ), ) + +def _parse_email_body( + email_msg: Message, + email_headers: EmailHeaders, +) -> list[TextSection | ImageSection]: # _sender_name, sender_addr = parseaddr(email_headers.sender) # _recipient_name, recipient_addr = parseaddr(email_headers.recipient) # plain_text_body = "" @@ -158,6 +167,8 @@ def _convert_email_headers_and_body_into_document( # plain_text_body = part.get_payload(decode=True).decode('latin-1', errors='ignore') # break + raise NotImplementedError + if __name__ == "__main__": import os From dc04f85c4504618431c51b3e9ecdfe6f5362d849 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Fri, 4 Jul 2025 19:24:56 -0700 Subject: [PATCH 03/31] Add id field --- backend/onyx/connectors/imap/models.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index cee76a37131..abfe579904c 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -11,6 +11,7 @@ _DATE_HEADER = "date" _ENCODING_HEADER = "Content-Transfer-Encoding" _CONTENT_TYPE_HEADER = "Content-Type" +_MESSAGE_ID_HEADER = "Message-ID" _DEFAULT_ENCODING = "utf-8" @@ -19,6 +20,7 @@ class EmailHeaders(BaseModel): Model for email headers extracted from IMAP messages. """ + id: str subject: str sender: str recipient: str @@ -49,6 +51,7 @@ def _parse_date(date_str: str | None) -> datetime | None: return None # It's possible for the subject line to not exist or be an empty string. + message_id = _decode(header=_MESSAGE_ID_HEADER) subject = _decode(header=_SUBJECT_HEADER) or "Unknown Subject" from_ = _decode(header=_FROM_HEADER) to = _decode(header=_TO_HEADER) @@ -59,6 +62,7 @@ def _parse_date(date_str: str | None) -> datetime | None: return cls.model_validate( { + "id": message_id, "subject": subject, "sender": from_, "recipient": to, From 9590face9caaba19f0642425bcb79e7baa54417f Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Fri, 4 Jul 2025 19:25:21 -0700 Subject: [PATCH 04/31] Add email body parsing --- backend/onyx/connectors/imap/connector.py | 68 ++++++++++++++--------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 28273f02b37..e5dbaed4f5b 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -4,6 +4,8 @@ from email.utils import parseaddr from typing import Any +import bs4 + from onyx.access.models import ExternalAccess from onyx.configs.constants import DocumentSource from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider @@ -16,10 +18,12 @@ from onyx.connectors.models import BasicExpertInfo from onyx.connectors.models import ConnectorCheckpoint from onyx.connectors.models import Document -from onyx.connectors.models import ImageSection from onyx.connectors.models import TextSection +from onyx.utils.logger import setup_logger from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector +logger = setup_logger() + DEFAULT_IMAP_PORT_NUMBER = 993 IMAP_OKAY_STATUS = "OK" @@ -122,19 +126,16 @@ def _convert_email_headers_and_body_into_document( ) -> Document: _sender_name, sender_addr = parseaddr(addr=email_headers.sender) recipient_name, recipient_addr = parseaddr(addr=email_headers.recipient) - - semantic_identifier = ( - f"{sender_addr} to {recipient_addr} about {email_headers.subject}" - ) - - sections = _parse_email_body(email_msg=email_msg, email_headers=email_headers) + title = f"{sender_addr} to {recipient_addr} about {email_headers.subject}" + email_body = _parse_email_body(email_msg=email_msg, email_headers=email_headers) return Document( - id=semantic_identifier, - semantic_identifier=semantic_identifier, + id=email_headers.id, + title=title, + semantic_identifier=email_headers.subject, metadata={}, source=DocumentSource.IMAP, - sections=sections, + sections=[TextSection(text=email_body)], primary_owners=[ BasicExpertInfo( display_name=recipient_name, @@ -152,22 +153,37 @@ def _convert_email_headers_and_body_into_document( def _parse_email_body( email_msg: Message, email_headers: EmailHeaders, -) -> list[TextSection | ImageSection]: - # _sender_name, sender_addr = parseaddr(email_headers.sender) - # _recipient_name, recipient_addr = parseaddr(email_headers.recipient) - # plain_text_body = "" - # for part in msg.walk(): - # email_headers = EmailHeaders.from_email_msg() - # ctype = part.get_content_type() - # cdisp = str(part.get('Content-Disposition')) - # if ctype == 'text/plain' and 'attachment' not in cdisp: - # try: - # plain_text_body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8') - # except (UnicodeDecodeError, AttributeError): - # plain_text_body = part.get_payload(decode=True).decode('latin-1', errors='ignore') - # break - - raise NotImplementedError +) -> str: + body = None + for part in email_msg.walk(): + if part.is_multipart(): + continue + + charset = part.get_content_charset() or "utf-8" + + try: + raw_payload = part.get_payload(decode=True) + if not isinstance(raw_payload, bytes): + logger.warn( + "Payload section from email was expected to be an array of bytes, instead got " + f"{type(raw_payload)=}, {raw_payload=}" + ) + continue + body = raw_payload.decode(charset) + break + except (UnicodeDecodeError, LookupError) as e: + print(f"Warning: Could not decode part with charset {charset}. Error: {e}") + continue + + if not body: + logger.warn( + f"Email with {email_headers.id=} has an empty body; returning an empty string" + ) + return "" + + soup = bs4.BeautifulSoup(markup=body, features="html.parser") + + return "".join(str_section for str_section in soup.stripped_strings) if __name__ == "__main__": From b4ebe544b033e80efc9043cb6b15e69c222897e0 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 09:02:50 -0700 Subject: [PATCH 05/31] Implement checkpointed imap-connector --- backend/onyx/connectors/imap/connector.py | 98 ++++++++++++++++------- 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index e5dbaed4f5b..94f2a206dc3 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -1,8 +1,10 @@ +import copy import email import imaplib from email.message import Message from email.utils import parseaddr from typing import Any +from typing import cast import bs4 @@ -20,16 +22,17 @@ from onyx.connectors.models import Document from onyx.connectors.models import TextSection from onyx.utils.logger import setup_logger -from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector logger = setup_logger() -DEFAULT_IMAP_PORT_NUMBER = 993 -IMAP_OKAY_STATUS = "OK" +_DEFAULT_IMAP_PORT_NUMBER = 993 +_IMAP_OKAY_STATUS = "OK" +_PAGE_SIZE = 100 -class ImapCheckpoint(ConnectorCheckpoint): ... +class ImapCheckpoint(ConnectorCheckpoint): + todo_email_ids: list[str] = [] class ImapConnector( @@ -39,7 +42,7 @@ class ImapConnector( def __init__( self, host: str, - port: int = DEFAULT_IMAP_PORT_NUMBER, + port: int = _DEFAULT_IMAP_PORT_NUMBER, ) -> None: self._host = host self._port = port @@ -84,34 +87,30 @@ def load_from_checkpoint( end: SecondsSinceUnixEpoch, checkpoint: ImapCheckpoint, ) -> CheckpointOutput[ImapCheckpoint]: - status, _ids = self._mail_client.select("Inbox", readonly=True) - if status != IMAP_OKAY_STATUS: - raise RuntimeError - - status, email_ids = self._mail_client.search(None, "ALL") - - if status != IMAP_OKAY_STATUS or not email_ids: - raise RuntimeError + if not checkpoint.todo_email_ids: + # This is the first time running this connector. + # Populate the todo_email_ids and return a checkpoint. + email_ids = _fetch_email_ids(mail_client=self._mail_client) + return ImapCheckpoint(has_more=True, todo_email_ids=email_ids) + + current_todos = cast( + list, copy.deepcopy(checkpoint.todo_email_ids[:_PAGE_SIZE]) + ) + future_todos = cast(list, copy.deepcopy(checkpoint.todo_email_ids[_PAGE_SIZE:])) - for email_id in email_ids[0].split(): - status, msg_data = self._mail_client.fetch( - message_set=email_id, message_parts="(RFC822)" - ) - if status != IMAP_OKAY_STATUS or not msg_data: + for email_id in current_todos: + email_msg = _fetch_email(mail_client=self._mail_client, email_id=email_id) + if not email_msg: + logger.warn(f"Failed to fetch message {email_id=}; skipping") continue - data = msg_data[0] - if not isinstance(data, tuple): - continue - _data, raw_email = data - email_msg = email.message_from_bytes(raw_email) email_headers = EmailHeaders.from_email_msg(email_msg=email_msg) yield _convert_email_headers_and_body_into_document( email_msg=email_msg, email_headers=email_headers ) - return ImapCheckpoint(has_more=False) + return ImapCheckpoint(has_more=bool(future_todos), todo_email_ids=future_todos) def build_dummy_checkpoint(self) -> ImapCheckpoint: return ImapCheckpoint(has_more=True) @@ -120,6 +119,36 @@ def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: raise NotImplementedError +def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | None: + status, msg_data = mail_client.fetch(message_set=email_id, message_parts="(RFC822)") + if status != _IMAP_OKAY_STATUS or not msg_data: + return None + + data = msg_data[0] + if not isinstance(data, tuple): + raise RuntimeError( + f"Message data should be a tuple; instead got a {type(data)=} {data=}" + ) + + _other, raw_email = data + return email.message_from_bytes(raw_email) + + +def _fetch_email_ids(mail_client: imaplib.IMAP4_SSL) -> list[str]: + status, _ids = mail_client.select("Inbox", readonly=True) + if status != _IMAP_OKAY_STATUS: + raise RuntimeError + + status, email_ids_byte_array = mail_client.search(None, "ALL") + + if status != _IMAP_OKAY_STATUS or not email_ids_byte_array: + raise RuntimeError(f"Failed to fetch email ids; {status=}") + + email_ids: bytes = email_ids_byte_array[0] + + return [email_id.decode() for email_id in email_ids.split()] + + def _convert_email_headers_and_body_into_document( email_msg: Message, email_headers: EmailHeaders, @@ -210,10 +239,17 @@ def _parse_email_body( ) ) - for doc in load_all_docs_from_checkpoint_connector( - connector=imap_connector, - start=0, - end=time.time(), - ): - print(doc) - ... + checkpoint = imap_connector.build_dummy_checkpoint() + while True: + for doc in imap_connector.load_from_checkpoint( + start=0.0, end=time.time(), checkpoint=checkpoint + ): + print(doc) + + # for doc in load_all_docs_from_checkpoint_connector( + # connector=imap_connector, + # start=0, + # end=time.time(), + # ): + # print(doc) + # ... From 4f7feddfae5a75e1fe0c6805c86001ab0efe28ea Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 09:57:36 -0700 Subject: [PATCH 06/31] Add testing logic for basic iteration --- backend/onyx/connectors/imap/connector.py | 37 ++++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 94f2a206dc3..7bcef68cba4 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -10,7 +10,6 @@ from onyx.access.models import ExternalAccess from onyx.configs.constants import DocumentSource -from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider from onyx.connectors.imap.models import EmailHeaders from onyx.connectors.interfaces import CheckpointedConnector from onyx.connectors.interfaces import CheckpointOutput @@ -218,6 +217,8 @@ def _parse_email_body( if __name__ == "__main__": import os import time + from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector + from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider username = os.environ.get("IMAP_USERNAME") password = os.environ.get("IMAP_PASSWORD") @@ -239,17 +240,23 @@ def _parse_email_body( ) ) - checkpoint = imap_connector.build_dummy_checkpoint() - while True: - for doc in imap_connector.load_from_checkpoint( - start=0.0, end=time.time(), checkpoint=checkpoint - ): - print(doc) - - # for doc in load_all_docs_from_checkpoint_connector( - # connector=imap_connector, - # start=0, - # end=time.time(), - # ): - # print(doc) - # ... + # # Manual iteration through the checkpointing logic. + # # Uncomment to add breakpoints and step through manually. + # checkpoint = imap_connector.build_dummy_checkpoint() + # while True: + # gen = imap_connector.load_from_checkpoint( + # start=0.0, end=time.time(), checkpoint=checkpoint + # ) + # while True: + # try: + # doc = next(gen) + # except StopIteration as e: + # checkpoint = cast(ImapCheckpoint, copy.deepcopy(e.value)) + # break + + for doc in load_all_docs_from_checkpoint_connector( + connector=imap_connector, + start=0, + end=time.time(), + ): + print(doc) From 87fcb7b74534cca78a317abefdcfd28321393af2 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 13:47:07 -0700 Subject: [PATCH 07/31] Add logic to get different header if "to" isn't present - possible in mailing-list workflows --- backend/onyx/connectors/imap/models.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index abfe579904c..5802a7791c3 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -8,6 +8,9 @@ _SUBJECT_HEADER = "subject" _FROM_HEADER = "from" _TO_HEADER = "to" +_DELIVERED_TO_HEADER = ( + "Delivered-To" # Used in mailing lists instead of the "to" header. +) _DATE_HEADER = "date" _ENCODING_HEADER = "Content-Transfer-Encoding" _CONTENT_TYPE_HEADER = "Content-Type" @@ -55,6 +58,8 @@ def _parse_date(date_str: str | None) -> datetime | None: subject = _decode(header=_SUBJECT_HEADER) or "Unknown Subject" from_ = _decode(header=_FROM_HEADER) to = _decode(header=_TO_HEADER) + if not to: + to = _decode(header=_DELIVERED_TO_HEADER) date_str = _decode(header=_DATE_HEADER) date = _parse_date(date_str) content_type = _decode(header=_CONTENT_TYPE_HEADER) From f8b8c387475fdbf65114bb2a6de6afa591895d2e Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 13:55:39 -0700 Subject: [PATCH 08/31] Add ability to index specific mailboxes --- backend/onyx/connectors/imap/connector.py | 149 ++++++++++++++++++---- 1 file changed, 122 insertions(+), 27 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 7bcef68cba4..a7458142cdd 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -1,6 +1,7 @@ import copy import email import imaplib +import re from email.message import Message from email.utils import parseaddr from typing import Any @@ -30,8 +31,19 @@ _PAGE_SIZE = 100 +# An email has a list of mailboxes. +# Each mailbox has a list of email-ids inside of it. +# +# Usage: +# To use this checkpointer, first fetch all the mailboxes. +# Then, pop a mailbox and fetch all of its email-ids. +# Then, pop each email-id and fetch its content (and parse it, etc..). +# When you have popped all email-ids for this mailbox, pop the next mailbox and repeat the above process until you're done. +# +# For initial checkpointing, set both fields to `None`. class ImapCheckpoint(ConnectorCheckpoint): - todo_email_ids: list[str] = [] + todo_mailboxes: list[str] | None = None + todo_email_ids: list[str] | None = None class ImapConnector( @@ -42,19 +54,26 @@ def __init__( self, host: str, port: int = _DEFAULT_IMAP_PORT_NUMBER, + mailboxes: list[str] = [], ) -> None: self._host = host self._port = port self._username: str | None = None self._password: str | None = None + self._mailboxes = mailboxes self._mail_client = imaplib.IMAP4_SSL(host=host, port=port) # impls for BaseConnector + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: raise NotImplementedError("Use `set_credentials_provider` instead") def validate_connector_settings(self) -> None: - raise NotImplementedError + if not self._username or not self._password: + raise RuntimeError( + "Credentials not yet set; call `set_credentials_provider` prior to calling this function" + ) + self._mail_client.login(user=self._username, password=self._password) # impls for CredentialsConnector @@ -75,7 +94,6 @@ def get_or_raise(name: str) -> str: username = get_or_raise("username") password = get_or_raise("password") - self._mail_client.login(user=username, password=password) # impls for CheckpointedConnector @@ -86,16 +104,40 @@ def load_from_checkpoint( end: SecondsSinceUnixEpoch, checkpoint: ImapCheckpoint, ) -> CheckpointOutput[ImapCheckpoint]: + checkpoint = cast(ImapCheckpoint, copy.deepcopy(checkpoint)) + checkpoint.has_more = True + + if checkpoint.todo_mailboxes is None: + # This is the dummy checkpoint. + # Fill it with mailboxes first. + if self._mailboxes: + checkpoint.todo_mailboxes = self._mailboxes + else: + fetched_mailboxes = _fetch_all_mailboxes_for_email_account( + mail_client=self._mail_client + ) + if not fetched_mailboxes: + raise RuntimeError( + "Failed to find any mailboxes for this email account" + ) + checkpoint.todo_mailboxes = fetched_mailboxes + + return checkpoint + if not checkpoint.todo_email_ids: - # This is the first time running this connector. - # Populate the todo_email_ids and return a checkpoint. - email_ids = _fetch_email_ids(mail_client=self._mail_client) - return ImapCheckpoint(has_more=True, todo_email_ids=email_ids) + if not checkpoint.todo_mailboxes: + checkpoint.has_more = False + return checkpoint + + mailbox = checkpoint.todo_mailboxes.pop(0) + checkpoint.todo_email_ids = _fetch_email_ids_in_mailbox( + mail_client=self._mail_client, mailbox=mailbox + ) current_todos = cast( list, copy.deepcopy(checkpoint.todo_email_ids[:_PAGE_SIZE]) ) - future_todos = cast(list, copy.deepcopy(checkpoint.todo_email_ids[_PAGE_SIZE:])) + checkpoint.todo_email_ids = checkpoint.todo_email_ids[_PAGE_SIZE:] for email_id in current_todos: email_msg = _fetch_email(mail_client=self._mail_client, email_id=email_id) @@ -109,34 +151,62 @@ def load_from_checkpoint( email_msg=email_msg, email_headers=email_headers ) - return ImapCheckpoint(has_more=bool(future_todos), todo_email_ids=future_todos) + return checkpoint def build_dummy_checkpoint(self) -> ImapCheckpoint: return ImapCheckpoint(has_more=True) def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: - raise NotImplementedError + return ImapCheckpoint.model_validate_json(json_data=checkpoint_json) -def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | None: - status, msg_data = mail_client.fetch(message_set=email_id, message_parts="(RFC822)") - if status != _IMAP_OKAY_STATUS or not msg_data: - return None +def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: + """ + Fetches all the mailboxes for this email account. - data = msg_data[0] - if not isinstance(data, tuple): - raise RuntimeError( - f"Message data should be a tuple; instead got a {type(data)=} {data=}" - ) + Returns a list of mailboxes. If the query fails, returns `None`. + """ - _other, raw_email = data - return email.message_from_bytes(raw_email) + status, mailboxes_data = mail_client.list(directory="*", pattern="*") + if status != _IMAP_OKAY_STATUS: + raise RuntimeError(f"Failed to fetch mailboxes; {status=}") + + mailboxes = [] + + for mailboxes_raw in mailboxes_data: + if isinstance(mailboxes_raw, bytes): + mailboxes_str = mailboxes_raw.decode() + elif isinstance(mailboxes_raw, str): + mailboxes_str = mailboxes_raw + else: + logger.warn( + f"Expected the mailbox data to be of type str, instead got {type(mailboxes_raw)=} {mailboxes_raw}; skipping" + ) + continue + match = re.match(r'\([^)]*\)\s+"([^"]+)"\s+"?(.+?)"?$', mailboxes_str) + if not match: + logger.warn( + f"Invalid mailbox-data formatting structure: {mailboxes_str=}; skipping" + ) + continue -def _fetch_email_ids(mail_client: imaplib.IMAP4_SSL) -> list[str]: - status, _ids = mail_client.select("Inbox", readonly=True) + mailbox = match.group(2) + + # In order to comply with the IMAP protocol, all identifiers should be enclosed by double-quotes. + mailbox = f'"{mailbox}"' + + mailboxes.append(mailbox) + + return mailboxes + + +def _fetch_email_ids_in_mailbox( + mail_client: imaplib.IMAP4_SSL, mailbox: str +) -> list[str]: + status, _ids = mail_client.select(mailbox=mailbox, readonly=True) if status != _IMAP_OKAY_STATUS: - raise RuntimeError + raise RuntimeError(f"Failed to select {mailbox=}") status, email_ids_byte_array = mail_client.search(None, "ALL") @@ -148,6 +218,21 @@ def _fetch_email_ids(mail_client: imaplib.IMAP4_SSL) -> list[str]: return [email_id.decode() for email_id in email_ids.split()] +def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | None: + status, msg_data = mail_client.fetch(message_set=email_id, message_parts="(RFC822)") + if status != _IMAP_OKAY_STATUS or not msg_data: + return None + + data = msg_data[0] + if not isinstance(data, tuple): + raise RuntimeError( + f"Message data should be a tuple; instead got a {type(data)=} {data=}" + ) + + _other, raw_email = data + return email.message_from_bytes(raw_email) + + def _convert_email_headers_and_body_into_document( email_msg: Message, email_headers: EmailHeaders, @@ -220,12 +305,23 @@ def _parse_email_body( from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider + host = os.environ.get("IMAP_HOST") + mailboxes_str = os.environ.get("IMAP_MAILBOXES") username = os.environ.get("IMAP_USERNAME") password = os.environ.get("IMAP_PASSWORD") - oauth2_token = os.environ.get("IMAP_OAUTH2_TOKEN") + + mailboxes = ( + [mailbox.strip() for mailbox in mailboxes_str.split(",")] + if mailboxes_str + else [] + ) + + if not host: + raise RuntimeError("`IMAP_HOST` must be set") imap_connector = ImapConnector( - host="imap.fastmail.com", + host=host, + mailboxes=mailboxes, ) imap_connector.set_credentials_provider( @@ -235,7 +331,6 @@ def _parse_email_body( credential_json={ "username": username, "password": password, - "oauth2_token": oauth2_token, }, ) ) From 49cf7252e3503fd1407e91e8c4fda47b1ce0cf3f Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 13:58:46 -0700 Subject: [PATCH 09/31] Add breaking when indexing has been fully exhausted --- backend/onyx/connectors/imap/connector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index a7458142cdd..920bdc694c1 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -349,6 +349,9 @@ def _parse_email_body( # checkpoint = cast(ImapCheckpoint, copy.deepcopy(e.value)) # break + # if not checkpoint.has_more: + # break + for doc in load_all_docs_from_checkpoint_connector( connector=imap_connector, start=0, From 85de22b0d8bfad6f5e4d57ca6d30d804d8af7c04 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 14:05:25 -0700 Subject: [PATCH 10/31] Sanitize all mailbox names + add space between stripped strings after parsing --- backend/onyx/connectors/imap/connector.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 920bdc694c1..2fb22f88a40 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -111,7 +111,7 @@ def load_from_checkpoint( # This is the dummy checkpoint. # Fill it with mailboxes first. if self._mailboxes: - checkpoint.todo_mailboxes = self._mailboxes + checkpoint.todo_mailboxes = _sanitize_mailbox_names(self._mailboxes) else: fetched_mailboxes = _fetch_all_mailboxes_for_email_account( mail_client=self._mail_client @@ -120,7 +120,7 @@ def load_from_checkpoint( raise RuntimeError( "Failed to find any mailboxes for this email account" ) - checkpoint.todo_mailboxes = fetched_mailboxes + checkpoint.todo_mailboxes = _sanitize_mailbox_names(fetched_mailboxes) return checkpoint @@ -161,12 +161,6 @@ def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: - """ - Fetches all the mailboxes for this email account. - - Returns a list of mailboxes. If the query fails, returns `None`. - """ - status, mailboxes_data = mail_client.list(directory="*", pattern="*") if status != _IMAP_OKAY_STATUS: raise RuntimeError(f"Failed to fetch mailboxes; {status=}") @@ -192,10 +186,6 @@ def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> li continue mailbox = match.group(2) - - # In order to comply with the IMAP protocol, all identifiers should be enclosed by double-quotes. - mailbox = f'"{mailbox}"' - mailboxes.append(mailbox) return mailboxes @@ -296,7 +286,11 @@ def _parse_email_body( soup = bs4.BeautifulSoup(markup=body, features="html.parser") - return "".join(str_section for str_section in soup.stripped_strings) + return " ".join(str_section for str_section in soup.stripped_strings) + + +def _sanitize_mailbox_names(mailboxes: list[str]) -> list[str]: + return [f'"{mailbox}"' for mailbox in mailboxes if mailbox] if __name__ == "__main__": From 892668756111cda16e03336de584ad6ce8db125b Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 14:54:56 -0700 Subject: [PATCH 11/31] Add multi-recipient parsing --- backend/onyx/connectors/imap/connector.py | 44 +++++++++++++++++------ backend/onyx/connectors/imap/models.py | 4 +-- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 2fb22f88a40..1360151a9d3 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -227,10 +227,17 @@ def _convert_email_headers_and_body_into_document( email_msg: Message, email_headers: EmailHeaders, ) -> Document: - _sender_name, sender_addr = parseaddr(addr=email_headers.sender) - recipient_name, recipient_addr = parseaddr(addr=email_headers.recipient) - title = f"{sender_addr} to {recipient_addr} about {email_headers.subject}" + _sender_name, sender_addr = _parse_singular_addr(raw_header=email_headers.sender) + parsed_recipients = _parse_addrs(raw_header=email_headers.recipients) + + recipient_emails = set(addr for _name, addr in parsed_recipients) + + title = f"{sender_addr} to {recipient_emails} about {email_headers.subject}" email_body = _parse_email_body(email_msg=email_msg, email_headers=email_headers) + primary_owners = [ + BasicExpertInfo(display_name=recipient_name, email=recipient_addr) + for recipient_name, recipient_addr in parsed_recipients + ] return Document( id=email_headers.id, @@ -239,14 +246,9 @@ def _convert_email_headers_and_body_into_document( metadata={}, source=DocumentSource.IMAP, sections=[TextSection(text=email_body)], - primary_owners=[ - BasicExpertInfo( - display_name=recipient_name, - email=recipient_addr, - ) - ], + primary_owners=primary_owners, external_access=ExternalAccess( - external_user_emails=set([recipient_addr]), + external_user_emails=recipient_emails, external_user_group_ids=set(), is_public=False, ), @@ -293,6 +295,27 @@ def _sanitize_mailbox_names(mailboxes: list[str]) -> list[str]: return [f'"{mailbox}"' for mailbox in mailboxes if mailbox] +def _parse_addrs(raw_header: str) -> list[tuple[str, str]]: + addrs = raw_header.split(",") + name_addr_pairs = [parseaddr(addr=addr, strict=True) for addr in addrs if addr] + return [(name, addr) for name, addr in name_addr_pairs if addr] + + +def _parse_singular_addr(raw_header: str) -> tuple[str, str]: + addrs = _parse_addrs(raw_header=raw_header) + if not addrs: + raise RuntimeError( + f"Parsing email header resulted in no addresses being found; {raw_header=}" + ) + elif len(addrs) >= 2: + raise RuntimeError( + f"Expected a singular address, but instead got multiple; {raw_header=} {addrs=}" + ) + + [(name, addr)] = addrs + return name, addr + + if __name__ == "__main__": import os import time @@ -342,7 +365,6 @@ def _sanitize_mailbox_names(mailboxes: list[str]) -> list[str]: # except StopIteration as e: # checkpoint = cast(ImapCheckpoint, copy.deepcopy(e.value)) # break - # if not checkpoint.has_more: # break diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index 5802a7791c3..8569d7d2f70 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -26,7 +26,7 @@ class EmailHeaders(BaseModel): id: str subject: str sender: str - recipient: str + recipients: str date: datetime @classmethod @@ -70,7 +70,7 @@ def _parse_date(date_str: str | None) -> datetime | None: "id": message_id, "subject": subject, "sender": from_, - "recipient": to, + "recipients": to, "date": date, "content_type": content_type, } From 46940a7b65fc8a5d6829fc794e0d63ac8edcce1f Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 15:18:12 -0700 Subject: [PATCH 12/31] Change around semantic-identifier and title --- backend/onyx/connectors/imap/connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 1360151a9d3..8dfc13eb9bf 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -241,8 +241,8 @@ def _convert_email_headers_and_body_into_document( return Document( id=email_headers.id, - title=title, - semantic_identifier=email_headers.subject, + title=email_headers.subject, + semantic_identifier=title, metadata={}, source=DocumentSource.IMAP, sections=[TextSection(text=email_body)], From 5a39f2bd504af04a0e0e55a17083af0737a352fb Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 15:18:45 -0700 Subject: [PATCH 13/31] Add imap tests --- backend/tests/daily/connectors/imap/models.py | 15 ++++ .../connectors/imap/test_imap_connector.py | 76 +++++++++++++++++++ .../tests/daily/connectors/teams/models.py | 1 - 3 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 backend/tests/daily/connectors/imap/models.py create mode 100644 backend/tests/daily/connectors/imap/test_imap_connector.py diff --git a/backend/tests/daily/connectors/imap/models.py b/backend/tests/daily/connectors/imap/models.py new file mode 100644 index 00000000000..0027536e521 --- /dev/null +++ b/backend/tests/daily/connectors/imap/models.py @@ -0,0 +1,15 @@ +from pydantic import BaseModel + +from onyx.connectors.models import Document + + +class EmailDoc(BaseModel): + subject: str + + @classmethod + def from_doc(cls, document: Document) -> "EmailDoc": + assert document.title # Acceptable since this will only be used in tests. + + return cls( + subject=document.title, + ) diff --git a/backend/tests/daily/connectors/imap/test_imap_connector.py b/backend/tests/daily/connectors/imap/test_imap_connector.py new file mode 100644 index 00000000000..28a07fbba8e --- /dev/null +++ b/backend/tests/daily/connectors/imap/test_imap_connector.py @@ -0,0 +1,76 @@ +import os +import time + +import pytest + +from onyx.configs.constants import DocumentSource +from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider +from onyx.connectors.imap.connector import ImapConnector +from tests.daily.connectors.imap.models import EmailDoc +from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector +from tests.daily.connectors.utils import to_documents + + +@pytest.fixture +def imap_connector() -> ImapConnector: + host = os.environ.get("IMAP_HOST") + mailboxes_str = os.environ.get("IMAP_MAILBOXES") + username = os.environ.get("IMAP_USERNAME") + password = os.environ.get("IMAP_PASSWORD") + + assert host + mailboxes = ( + [mailbox.strip() for mailbox in mailboxes_str.split(",") if mailbox] + if mailboxes_str + else [] + ) + + imap_connector = ImapConnector( + host=host, + mailboxes=mailboxes, + ) + imap_connector.set_credentials_provider( + OnyxStaticCredentialsProvider( + tenant_id=None, + connector_name=DocumentSource.IMAP, + credential_json={ + "username": username, + "password": password, + }, + ) + ) + + return imap_connector + + +@pytest.mark.parametrize( + "expected_email_docs", + [ + [ + EmailDoc( + subject="Testing", + ), + EmailDoc( + subject="Hello world", + ), + ] + ], +) +def test_imap_connector( + imap_connector: ImapConnector, + expected_email_docs: list[EmailDoc], +) -> None: + actual_email_docs = [ + EmailDoc.from_doc(document=document) + for document in to_documents( + iterator=iter( + load_all_docs_from_checkpoint_connector( + connector=imap_connector, + start=0, + end=time.time(), + ) + ) + ) + ] + + assert actual_email_docs == expected_email_docs diff --git a/backend/tests/daily/connectors/teams/models.py b/backend/tests/daily/connectors/teams/models.py index d7f99995179..a1bfd8481c1 100644 --- a/backend/tests/daily/connectors/teams/models.py +++ b/backend/tests/daily/connectors/teams/models.py @@ -10,7 +10,6 @@ class TeamsThread(BaseModel): @classmethod def from_doc(cls, document: Document) -> "TeamsThread": - assert ( document.external_access ), f"ExternalAccess should always be available, instead got {document=}" From 2526e43d40711254c3006a4d52e4acb789ebfd62 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 15:36:23 -0700 Subject: [PATCH 14/31] Add recipients and content assertions to tests --- backend/tests/daily/connectors/imap/models.py | 11 ++++++++++- .../daily/connectors/imap/test_imap_connector.py | 4 ++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/backend/tests/daily/connectors/imap/models.py b/backend/tests/daily/connectors/imap/models.py index 0027536e521..42e1c547659 100644 --- a/backend/tests/daily/connectors/imap/models.py +++ b/backend/tests/daily/connectors/imap/models.py @@ -1,15 +1,24 @@ from pydantic import BaseModel from onyx.connectors.models import Document +from tests.daily.connectors.utils import to_text_sections class EmailDoc(BaseModel): subject: str + recipients: set[str] + body: str @classmethod def from_doc(cls, document: Document) -> "EmailDoc": - assert document.title # Acceptable since this will only be used in tests. + # Acceptable to perform assertions since this class is only used in tests. + assert document.title + assert document.external_access + + body = " ".join(to_text_sections(iterator=iter(document.sections))) return cls( subject=document.title, + recipients=document.external_access.external_user_emails, + body=body, ) diff --git a/backend/tests/daily/connectors/imap/test_imap_connector.py b/backend/tests/daily/connectors/imap/test_imap_connector.py index 28a07fbba8e..d9602509c66 100644 --- a/backend/tests/daily/connectors/imap/test_imap_connector.py +++ b/backend/tests/daily/connectors/imap/test_imap_connector.py @@ -49,9 +49,13 @@ def imap_connector() -> ImapConnector: [ EmailDoc( subject="Testing", + recipients=set(["admin@onyx-test.com"]), + body="Hello, testing.", ), EmailDoc( subject="Hello world", + recipients=set(["admin@onyx-test.com", "r@rabh.io"]), + body='Hello world, this is an email that contains multiple "To" recipients.', ), ] ], From 2cd07e718bf9402dd3da497e2d42d5c9f319e104 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 15:41:40 -0700 Subject: [PATCH 15/31] Add envvars to github actions workflow file --- .github/workflows/pr-python-connector-tests.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/pr-python-connector-tests.yml b/.github/workflows/pr-python-connector-tests.yml index 7b3dbf97c1e..aea2f89c473 100644 --- a/.github/workflows/pr-python-connector-tests.yml +++ b/.github/workflows/pr-python-connector-tests.yml @@ -53,6 +53,12 @@ env: # Hubspot HUBSPOT_ACCESS_TOKEN: ${{ secrets.HUBSPOT_ACCESS_TOKEN }} + # IMAP + IMAP_HOST: ${{ secrets.IMAP_HOST }} + IMAP_USERNAME: ${{ secrets.IMAP_USERNAME }} + IMAP_PASSWORD: ${{ secrets.IMAP_PASSWORD }} + IMAP_MAILBOXES: ${{ secrets.IMAP_MAILBOXES }} + # Airtable AIRTABLE_TEST_BASE_ID: ${{ secrets.AIRTABLE_TEST_BASE_ID }} AIRTABLE_TEST_TABLE_ID: ${{ secrets.AIRTABLE_TEST_TABLE_ID }} From 434d9a3862146fe307d3ad4800aa3dc03f442a21 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 18:18:00 -0700 Subject: [PATCH 16/31] Remove encoding header --- backend/onyx/connectors/imap/models.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index 8569d7d2f70..c5a335ffaee 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -12,10 +12,8 @@ "Delivered-To" # Used in mailing lists instead of the "to" header. ) _DATE_HEADER = "date" -_ENCODING_HEADER = "Content-Transfer-Encoding" _CONTENT_TYPE_HEADER = "Content-Type" _MESSAGE_ID_HEADER = "Message-ID" -_DEFAULT_ENCODING = "utf-8" class EmailHeaders(BaseModel): @@ -63,7 +61,6 @@ def _parse_date(date_str: str | None) -> datetime | None: date_str = _decode(header=_DATE_HEADER) date = _parse_date(date_str) content_type = _decode(header=_CONTENT_TYPE_HEADER) - _encoding = _decode(header=_ENCODING_HEADER, default=_DEFAULT_ENCODING) return cls.model_validate( { From 48392a0f771b81a26d96969cb7e6bd592fd6978f Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 18:26:32 -0700 Subject: [PATCH 17/31] Update logic to not immediately establish connection upon init of `ImapConnector` --- backend/onyx/connectors/imap/connector.py | 25 +++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 8dfc13eb9bf..9423f9885f5 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -58,10 +58,18 @@ def __init__( ) -> None: self._host = host self._port = port + self._mailboxes = mailboxes self._username: str | None = None self._password: str | None = None - self._mailboxes = mailboxes - self._mail_client = imaplib.IMAP4_SSL(host=host, port=port) + self._mail_client: imaplib.IMAP4_SSL | None = None + + @property + def mail_client(self) -> imaplib.IMAP4_SSL: + if not self._mail_client: + raise RuntimeError( + "No mail-client has been initialized; call `set_credentials_provider` first" + ) + return self._mail_client # impls for BaseConnector @@ -71,9 +79,10 @@ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None def validate_connector_settings(self) -> None: if not self._username or not self._password: raise RuntimeError( - "Credentials not yet set; call `set_credentials_provider` prior to calling this function" + "Credentials not yet set; call `set_credentials_provider` first" ) - self._mail_client.login(user=self._username, password=self._password) + + self.mail_client.login(user=self._username, password=self._password) # impls for CredentialsConnector @@ -94,7 +103,7 @@ def get_or_raise(name: str) -> str: username = get_or_raise("username") password = get_or_raise("password") - self._mail_client.login(user=username, password=password) + self.mail_client.login(user=username, password=password) # impls for CheckpointedConnector @@ -114,7 +123,7 @@ def load_from_checkpoint( checkpoint.todo_mailboxes = _sanitize_mailbox_names(self._mailboxes) else: fetched_mailboxes = _fetch_all_mailboxes_for_email_account( - mail_client=self._mail_client + mail_client=self.mail_client ) if not fetched_mailboxes: raise RuntimeError( @@ -131,7 +140,7 @@ def load_from_checkpoint( mailbox = checkpoint.todo_mailboxes.pop(0) checkpoint.todo_email_ids = _fetch_email_ids_in_mailbox( - mail_client=self._mail_client, mailbox=mailbox + mail_client=self.mail_client, mailbox=mailbox ) current_todos = cast( @@ -140,7 +149,7 @@ def load_from_checkpoint( checkpoint.todo_email_ids = checkpoint.todo_email_ids[_PAGE_SIZE:] for email_id in current_todos: - email_msg = _fetch_email(mail_client=self._mail_client, email_id=email_id) + email_msg = _fetch_email(mail_client=self.mail_client, email_id=email_id) if not email_msg: logger.warn(f"Failed to fetch message {email_id=}; skipping") continue From eb579050c34da5824504f004fc86d278d6649913 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 18:49:36 -0700 Subject: [PATCH 18/31] Add start and end datetime filtering + edit when connection is established / how login is done --- backend/onyx/connectors/imap/connector.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 9423f9885f5..c9b35839803 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -2,6 +2,8 @@ import email import imaplib import re +from datetime import datetime +from datetime import timezone from email.message import Message from email.utils import parseaddr from typing import Any @@ -83,6 +85,7 @@ def validate_connector_settings(self) -> None: ) self.mail_client.login(user=self._username, password=self._password) + self.mail_client.logout() # impls for CredentialsConnector @@ -103,6 +106,8 @@ def get_or_raise(name: str) -> str: username = get_or_raise("username") password = get_or_raise("password") + + self._mail_client = imaplib.IMAP4_SSL(host=self._host, port=self._port) self.mail_client.login(user=username, password=password) # impls for CheckpointedConnector @@ -140,7 +145,10 @@ def load_from_checkpoint( mailbox = checkpoint.todo_mailboxes.pop(0) checkpoint.todo_email_ids = _fetch_email_ids_in_mailbox( - mail_client=self.mail_client, mailbox=mailbox + mail_client=self.mail_client, + mailbox=mailbox, + start=start, + end=end, ) current_todos = cast( @@ -201,13 +209,20 @@ def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> li def _fetch_email_ids_in_mailbox( - mail_client: imaplib.IMAP4_SSL, mailbox: str + mail_client: imaplib.IMAP4_SSL, + mailbox: str, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, ) -> list[str]: status, _ids = mail_client.select(mailbox=mailbox, readonly=True) if status != _IMAP_OKAY_STATUS: raise RuntimeError(f"Failed to select {mailbox=}") - status, email_ids_byte_array = mail_client.search(None, "ALL") + start_str = datetime.fromtimestamp(start, tz=timezone.utc).strftime("%d-%b-%Y") + end_str = datetime.fromtimestamp(end, tz=timezone.utc).strftime("%d-%b-%Y") + search_criteria = f'(SINCE "{start_str}" BEFORE "{end_str}")' + + status, email_ids_byte_array = mail_client.search(None, search_criteria) if status != _IMAP_OKAY_STATUS or not email_ids_byte_array: raise RuntimeError(f"Failed to fetch email ids; {status=}") From 9e4a414ff7fd658944bd9560f2b2f0b6e53a3868 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 18:51:38 -0700 Subject: [PATCH 19/31] Remove content-type header --- backend/onyx/connectors/imap/models.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index c5a335ffaee..7d9f3c43c41 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -12,7 +12,6 @@ "Delivered-To" # Used in mailing lists instead of the "to" header. ) _DATE_HEADER = "date" -_CONTENT_TYPE_HEADER = "Content-Type" _MESSAGE_ID_HEADER = "Message-ID" @@ -60,7 +59,6 @@ def _parse_date(date_str: str | None) -> datetime | None: to = _decode(header=_DELIVERED_TO_HEADER) date_str = _decode(header=_DATE_HEADER) date = _parse_date(date_str) - content_type = _decode(header=_CONTENT_TYPE_HEADER) return cls.model_validate( { @@ -69,6 +67,5 @@ def _parse_date(date_str: str | None) -> datetime | None: "sender": from_, "recipients": to, "date": date, - "content_type": content_type, } ) From bfed97a10be30aa2cb644d3b93b267ca4662dc21 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Mon, 7 Jul 2025 18:55:55 -0700 Subject: [PATCH 20/31] Add note about guards --- backend/onyx/connectors/imap/models.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index 7d9f3c43c41..704f6229a77 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -50,16 +50,18 @@ def _parse_date(date_str: str | None) -> datetime | None: except (TypeError, ValueError): return None - # It's possible for the subject line to not exist or be an empty string. message_id = _decode(header=_MESSAGE_ID_HEADER) + # It's possible for the subject line to not exist or be an empty string. subject = _decode(header=_SUBJECT_HEADER) or "Unknown Subject" from_ = _decode(header=_FROM_HEADER) to = _decode(header=_TO_HEADER) if not to: to = _decode(header=_DELIVERED_TO_HEADER) date_str = _decode(header=_DATE_HEADER) - date = _parse_date(date_str) + date = _parse_date(date_str=date_str) + # If any of the above are `None`, model validation will fail. + # Therefore, no guards (i.e.: `if
is None: raise RuntimeError(..)`) were written. return cls.model_validate( { "id": message_id, From 0966668c645d45d3900c4260d6b6113d901cf78b Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 11:26:55 -0700 Subject: [PATCH 21/31] Change default parameters to be `None` instead of `[]` --- backend/onyx/connectors/imap/connector.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index c9b35839803..fb746d4fb10 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -56,7 +56,7 @@ def __init__( self, host: str, port: int = _DEFAULT_IMAP_PORT_NUMBER, - mailboxes: list[str] = [], + mailboxes: list[str] | None = None, ) -> None: self._host = host self._port = port @@ -316,6 +316,10 @@ def _parse_email_body( def _sanitize_mailbox_names(mailboxes: list[str]) -> list[str]: + """ + Mailboxes with special characters in them must be enclosed by double-quotes, as per the IMAP protocol. + Just to be safe, we wrap *all* mailboxes with double-quotes. + """ return [f'"{mailbox}"' for mailbox in mailboxes if mailbox] From 72292267ae32b7de4ae45ddbbb1d9d80a9af7a0c Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 12:44:05 -0700 Subject: [PATCH 22/31] Address comment on PR --- backend/onyx/connectors/imap/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index fb746d4fb10..e2180157b81 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -143,7 +143,7 @@ def load_from_checkpoint( checkpoint.has_more = False return checkpoint - mailbox = checkpoint.todo_mailboxes.pop(0) + mailbox = checkpoint.todo_mailboxes.pop() checkpoint.todo_email_ids = _fetch_email_ids_in_mailbox( mail_client=self.mail_client, mailbox=mailbox, From 3337b056c4a1423a83b474af4498cfcb9ebe2b12 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 13:02:34 -0700 Subject: [PATCH 23/31] Implement more PR suggestions --- backend/onyx/connectors/imap/connector.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index e2180157b81..31cb38346e9 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -254,9 +254,6 @@ def _convert_email_headers_and_body_into_document( _sender_name, sender_addr = _parse_singular_addr(raw_header=email_headers.sender) parsed_recipients = _parse_addrs(raw_header=email_headers.recipients) - recipient_emails = set(addr for _name, addr in parsed_recipients) - - title = f"{sender_addr} to {recipient_emails} about {email_headers.subject}" email_body = _parse_email_body(email_msg=email_msg, email_headers=email_headers) primary_owners = [ BasicExpertInfo(display_name=recipient_name, email=recipient_addr) @@ -266,13 +263,13 @@ def _convert_email_headers_and_body_into_document( return Document( id=email_headers.id, title=email_headers.subject, - semantic_identifier=title, + semantic_identifier=email_headers.subject, metadata={}, source=DocumentSource.IMAP, sections=[TextSection(text=email_body)], primary_owners=primary_owners, external_access=ExternalAccess( - external_user_emails=recipient_emails, + external_user_emails=set(addr for _name, addr in parsed_recipients), external_user_group_ids=set(), is_public=False, ), From 66382781692d81c5102401d664beaab0a148bcaf Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 13:03:35 -0700 Subject: [PATCH 24/31] More PR suggestions --- backend/onyx/connectors/imap/connector.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 31cb38346e9..500bc92808d 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -337,8 +337,7 @@ def _parse_singular_addr(raw_header: str) -> tuple[str, str]: f"Expected a singular address, but instead got multiple; {raw_header=} {addrs=}" ) - [(name, addr)] = addrs - return name, addr + return addrs[0] if __name__ == "__main__": From 529f75e2b0836a63f5a747c8d338d1574cc93fe6 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 13:15:18 -0700 Subject: [PATCH 25/31] Implement more PR suggestions --- backend/onyx/connectors/imap/connector.py | 65 +++++++++++------------ 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 500bc92808d..5839b0530b4 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -61,8 +61,7 @@ def __init__( self._host = host self._port = port self._mailboxes = mailboxes - self._username: str | None = None - self._password: str | None = None + self._credentials: dict[str, Any] | None = None self._mail_client: imaplib.IMAP4_SSL | None = None @property @@ -79,12 +78,16 @@ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None raise NotImplementedError("Use `set_credentials_provider` instead") def validate_connector_settings(self) -> None: - if not self._username or not self._password: + if not self._credentials: raise RuntimeError( - "Credentials not yet set; call `set_credentials_provider` first" + "`self._credentials` not set; call `set_credentials_provider` first" ) - self.mail_client.login(user=self._username, password=self._password) + username, password = _get_username_and_password_from_credentials_dict( + credentials=self._credentials + ) + + self.mail_client.login(user=username, password=password) self.mail_client.logout() # impls for CredentialsConnector @@ -92,20 +95,11 @@ def validate_connector_settings(self) -> None: def set_credentials_provider( self, credentials_provider: CredentialsProviderInterface ) -> None: - credentials = credentials_provider.get_credentials() - - def get_or_raise(name: str) -> str: - value = credentials.get(name) - if not value: - raise RuntimeError(f"Credential item {name=} was not found") - if not isinstance(value, str): - raise RuntimeError( - f"Credential item {name=} must be of type str, instead received {type(name)=}" - ) - return value + self._credentials = credentials_provider.get_credentials() - username = get_or_raise("username") - password = get_or_raise("password") + username, password = _get_username_and_password_from_credentials_dict( + credentials=self._credentials + ) self._mail_client = imaplib.IMAP4_SSL(host=self._host, port=self._port) self.mail_client.login(user=username, password=password) @@ -177,6 +171,25 @@ def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: return ImapCheckpoint.model_validate_json(json_data=checkpoint_json) +def _get_username_and_password_from_credentials_dict( + credentials: dict[str, Any], +) -> tuple[str, str]: + def get_or_raise(name: str) -> str: + value = credentials.get(name) + if not value: + raise RuntimeError(f"Credential item {name=} was not found") + if not isinstance(value, str): + raise RuntimeError( + f"Credential item {name=} must be of type str, instead received {type(name)=}" + ) + return value + + username = get_or_raise("username") + password = get_or_raise("password") + + return username, password + + def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: status, mailboxes_data = mail_client.list(directory="*", pattern="*") if status != _IMAP_OKAY_STATUS: @@ -376,22 +389,6 @@ def _parse_singular_addr(raw_header: str) -> tuple[str, str]: ) ) - # # Manual iteration through the checkpointing logic. - # # Uncomment to add breakpoints and step through manually. - # checkpoint = imap_connector.build_dummy_checkpoint() - # while True: - # gen = imap_connector.load_from_checkpoint( - # start=0.0, end=time.time(), checkpoint=checkpoint - # ) - # while True: - # try: - # doc = next(gen) - # except StopIteration as e: - # checkpoint = cast(ImapCheckpoint, copy.deepcopy(e.value)) - # break - # if not checkpoint.has_more: - # break - for doc in load_all_docs_from_checkpoint_connector( connector=imap_connector, start=0, From f266de48b6957aa9f68d7701ef641d0fbd61b58d Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 13:31:07 -0700 Subject: [PATCH 26/31] Change up login/logout flow (PR suggestion) --- backend/onyx/connectors/imap/connector.py | 80 +++++++++++++---------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 5839b0530b4..5edd4c9429b 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -6,6 +6,7 @@ from datetime import timezone from email.message import Message from email.utils import parseaddr +from enum import Enum from typing import Any from typing import cast @@ -48,6 +49,11 @@ class ImapCheckpoint(ConnectorCheckpoint): todo_email_ids: list[str] | None = None +class LoginState(str, Enum): + LoggedIn = "logged_in" + LoggedOut = "logged_out" + + class ImapConnector( CheckpointedConnector[ImapCheckpoint], CredentialsConnector, @@ -63,6 +69,7 @@ def __init__( self._mailboxes = mailboxes self._credentials: dict[str, Any] | None = None self._mail_client: imaplib.IMAP4_SSL | None = None + self._login_state: LoginState = LoginState.LoggedOut @property def mail_client(self) -> imaplib.IMAP4_SSL: @@ -72,37 +79,57 @@ def mail_client(self) -> imaplib.IMAP4_SSL: ) return self._mail_client - # impls for BaseConnector - - def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: - raise NotImplementedError("Use `set_credentials_provider` instead") - - def validate_connector_settings(self) -> None: + @property + def credentials(self) -> dict[str, Any]: if not self._credentials: raise RuntimeError( - "`self._credentials` not set; call `set_credentials_provider` first" + "Credentials have not been initialized; call `set_credentials_provider` first" ) + return self._credentials + + def _login(self) -> None: + def get_or_raise(name: str) -> str: + value = self.credentials.get(name) + if not value: + raise RuntimeError(f"Credential item {name=} was not found") + if not isinstance(value, str): + raise RuntimeError( + f"Credential item {name=} must be of type str, instead received {type(name)=}" + ) + return value - username, password = _get_username_and_password_from_credentials_dict( - credentials=self._credentials - ) + if self._login_state == LoginState.LoggedIn: + return + + username = get_or_raise("username") + password = get_or_raise("password") + self._login_state = LoginState.LoggedIn self.mail_client.login(user=username, password=password) + + def _logout(self) -> None: + if self._login_state == LoginState.LoggedOut: + return + + self._login_state = LoginState.LoggedOut self.mail_client.logout() + # impls for BaseConnector + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + raise NotImplementedError("Use `set_credentials_provider` instead") + + def validate_connector_settings(self) -> None: + self._login() + self._logout() + # impls for CredentialsConnector def set_credentials_provider( self, credentials_provider: CredentialsProviderInterface ) -> None: self._credentials = credentials_provider.get_credentials() - - username, password = _get_username_and_password_from_credentials_dict( - credentials=self._credentials - ) - self._mail_client = imaplib.IMAP4_SSL(host=self._host, port=self._port) - self.mail_client.login(user=username, password=password) # impls for CheckpointedConnector @@ -115,6 +142,8 @@ def load_from_checkpoint( checkpoint = cast(ImapCheckpoint, copy.deepcopy(checkpoint)) checkpoint.has_more = True + self._login() + if checkpoint.todo_mailboxes is None: # This is the dummy checkpoint. # Fill it with mailboxes first. @@ -171,25 +200,6 @@ def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: return ImapCheckpoint.model_validate_json(json_data=checkpoint_json) -def _get_username_and_password_from_credentials_dict( - credentials: dict[str, Any], -) -> tuple[str, str]: - def get_or_raise(name: str) -> str: - value = credentials.get(name) - if not value: - raise RuntimeError(f"Credential item {name=} was not found") - if not isinstance(value, str): - raise RuntimeError( - f"Credential item {name=} must be of type str, instead received {type(name)=}" - ) - return value - - username = get_or_raise("username") - password = get_or_raise("password") - - return username, password - - def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: status, mailboxes_data = mail_client.list(directory="*", pattern="*") if status != _IMAP_OKAY_STATUS: From 94a8c0c59277813da76f21c3ab9426de9f3a9c1e Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 13:33:00 -0700 Subject: [PATCH 27/31] Move port number to be envvar --- backend/onyx/connectors/imap/connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 5edd4c9429b..ab2004de2e6 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -1,6 +1,7 @@ import copy import email import imaplib +import os import re from datetime import datetime from datetime import timezone @@ -29,7 +30,7 @@ logger = setup_logger() -_DEFAULT_IMAP_PORT_NUMBER = 993 +_DEFAULT_IMAP_PORT_NUMBER = int(os.environ.get("IMAP_PORT", 993)) _IMAP_OKAY_STATUS = "OK" _PAGE_SIZE = 100 @@ -364,7 +365,6 @@ def _parse_singular_addr(raw_header: str) -> tuple[str, str]: if __name__ == "__main__": - import os import time from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider From 72b19a4489a5cd0cf1406f113880948b30154ce5 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 13:34:44 -0700 Subject: [PATCH 28/31] Make globals variants in enum instead (PR suggestion) --- backend/onyx/connectors/imap/models.py | 30 ++++++++++++++------------ 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/backend/onyx/connectors/imap/models.py b/backend/onyx/connectors/imap/models.py index 704f6229a77..d07ee9bd202 100644 --- a/backend/onyx/connectors/imap/models.py +++ b/backend/onyx/connectors/imap/models.py @@ -1,18 +1,20 @@ import email from datetime import datetime from email.message import Message +from enum import Enum from pydantic import BaseModel -_SUBJECT_HEADER = "subject" -_FROM_HEADER = "from" -_TO_HEADER = "to" -_DELIVERED_TO_HEADER = ( - "Delivered-To" # Used in mailing lists instead of the "to" header. -) -_DATE_HEADER = "date" -_MESSAGE_ID_HEADER = "Message-ID" +class Header(str, Enum): + SUBJECT_HEADER = "subject" + FROM_HEADER = "from" + TO_HEADER = "to" + DELIVERED_TO_HEADER = ( + "Delivered-To" # Used in mailing lists instead of the "to" header. + ) + DATE_HEADER = "date" + MESSAGE_ID_HEADER = "Message-ID" class EmailHeaders(BaseModel): @@ -50,14 +52,14 @@ def _parse_date(date_str: str | None) -> datetime | None: except (TypeError, ValueError): return None - message_id = _decode(header=_MESSAGE_ID_HEADER) + message_id = _decode(header=Header.MESSAGE_ID_HEADER) # It's possible for the subject line to not exist or be an empty string. - subject = _decode(header=_SUBJECT_HEADER) or "Unknown Subject" - from_ = _decode(header=_FROM_HEADER) - to = _decode(header=_TO_HEADER) + subject = _decode(header=Header.SUBJECT_HEADER) or "Unknown Subject" + from_ = _decode(header=Header.FROM_HEADER) + to = _decode(header=Header.TO_HEADER) if not to: - to = _decode(header=_DELIVERED_TO_HEADER) - date_str = _decode(header=_DATE_HEADER) + to = _decode(header=Header.DELIVERED_TO_HEADER) + date_str = _decode(header=Header.DATE_HEADER) date = _parse_date(date_str=date_str) # If any of the above are `None`, model validation will fail. From 975758e526d5963f578d51b0928de20d5eeb64dc Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 15:26:42 -0700 Subject: [PATCH 29/31] Fix more documentation related suggestions on PR --- backend/onyx/connectors/imap/connector.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index ab2004de2e6..93c513babd4 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -219,6 +219,13 @@ def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> li ) continue + # The mailbox LIST response output can be found here: + # https://www.rfc-editor.org/rfc/rfc3501.html#section-7.2.2 + # + # The general format is: + # `() ` + # + # The below regex matches on that pattern; from there, we select the 3rd match (index 2), which is the mailbox-name. match = re.match(r'\([^)]*\)\s+"([^"]+)"\s+"?(.+?)"?$', mailboxes_str) if not match: logger.warn( @@ -267,7 +274,7 @@ def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | Non f"Message data should be a tuple; instead got a {type(data)=} {data=}" ) - _other, raw_email = data + _metadata, raw_email = data return email.message_from_bytes(raw_email) @@ -307,6 +314,8 @@ def _parse_email_body( body = None for part in email_msg.walk(): if part.is_multipart(): + # Multipart parts are *containers* for other parts, not the actual content itself. + # Therefore, we skip until we find the individual parts instead. continue charset = part.get_content_charset() or "utf-8" From 0f4c60b9907052e382bd9e8b95af42f48f51aaab Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 15:36:03 -0700 Subject: [PATCH 30/31] Have the imap connector implement `CheckpointedConnectorWithPermSync` instead --- backend/onyx/connectors/imap/connector.py | 87 +++++++++++++++-------- 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/backend/onyx/connectors/imap/connector.py b/backend/onyx/connectors/imap/connector.py index 93c513babd4..a5b8b2ef9c9 100644 --- a/backend/onyx/connectors/imap/connector.py +++ b/backend/onyx/connectors/imap/connector.py @@ -16,7 +16,7 @@ from onyx.access.models import ExternalAccess from onyx.configs.constants import DocumentSource from onyx.connectors.imap.models import EmailHeaders -from onyx.connectors.interfaces import CheckpointedConnector +from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync from onyx.connectors.interfaces import CheckpointOutput from onyx.connectors.interfaces import CredentialsConnector from onyx.connectors.interfaces import CredentialsProviderInterface @@ -56,8 +56,8 @@ class LoginState(str, Enum): class ImapConnector( - CheckpointedConnector[ImapCheckpoint], CredentialsConnector, + CheckpointedConnectorWithPermSync[ImapCheckpoint], ): def __init__( self, @@ -115,30 +115,12 @@ def _logout(self) -> None: self._login_state = LoginState.LoggedOut self.mail_client.logout() - # impls for BaseConnector - - def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: - raise NotImplementedError("Use `set_credentials_provider` instead") - - def validate_connector_settings(self) -> None: - self._login() - self._logout() - - # impls for CredentialsConnector - - def set_credentials_provider( - self, credentials_provider: CredentialsProviderInterface - ) -> None: - self._credentials = credentials_provider.get_credentials() - self._mail_client = imaplib.IMAP4_SSL(host=self._host, port=self._port) - - # impls for CheckpointedConnector - - def load_from_checkpoint( + def _load_from_checkpoint( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch, checkpoint: ImapCheckpoint, + include_perm_sync: bool, ) -> CheckpointOutput[ImapCheckpoint]: checkpoint = cast(ImapCheckpoint, copy.deepcopy(checkpoint)) checkpoint.has_more = True @@ -189,17 +171,60 @@ def load_from_checkpoint( email_headers = EmailHeaders.from_email_msg(email_msg=email_msg) yield _convert_email_headers_and_body_into_document( - email_msg=email_msg, email_headers=email_headers + email_msg=email_msg, + email_headers=email_headers, + include_perm_sync=include_perm_sync, ) return checkpoint + # impls for BaseConnector + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + raise NotImplementedError("Use `set_credentials_provider` instead") + + def validate_connector_settings(self) -> None: + self._login() + self._logout() + + # impls for CredentialsConnector + + def set_credentials_provider( + self, credentials_provider: CredentialsProviderInterface + ) -> None: + self._credentials = credentials_provider.get_credentials() + self._mail_client = imaplib.IMAP4_SSL(host=self._host, port=self._port) + + # impls for CheckpointedConnector + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ImapCheckpoint, + ) -> CheckpointOutput[ImapCheckpoint]: + return self._load_from_checkpoint( + start=start, end=end, checkpoint=checkpoint, include_perm_sync=False + ) + def build_dummy_checkpoint(self) -> ImapCheckpoint: return ImapCheckpoint(has_more=True) def validate_checkpoint_json(self, checkpoint_json: str) -> ImapCheckpoint: return ImapCheckpoint.model_validate_json(json_data=checkpoint_json) + # impls for CheckpointedConnectorWithPermSync + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ImapCheckpoint, + ) -> CheckpointOutput[ImapCheckpoint]: + return self._load_from_checkpoint( + start=start, end=end, checkpoint=checkpoint, include_perm_sync=True + ) + def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: status, mailboxes_data = mail_client.list(directory="*", pattern="*") @@ -281,6 +306,7 @@ def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | Non def _convert_email_headers_and_body_into_document( email_msg: Message, email_headers: EmailHeaders, + include_perm_sync: bool, ) -> Document: _sender_name, sender_addr = _parse_singular_addr(raw_header=email_headers.sender) parsed_recipients = _parse_addrs(raw_header=email_headers.recipients) @@ -290,6 +316,15 @@ def _convert_email_headers_and_body_into_document( BasicExpertInfo(display_name=recipient_name, email=recipient_addr) for recipient_name, recipient_addr in parsed_recipients ] + external_access = ( + ExternalAccess( + external_user_emails=set(addr for _name, addr in parsed_recipients), + external_user_group_ids=set(), + is_public=False, + ) + if include_perm_sync + else None + ) return Document( id=email_headers.id, @@ -299,11 +334,7 @@ def _convert_email_headers_and_body_into_document( source=DocumentSource.IMAP, sections=[TextSection(text=email_body)], primary_owners=primary_owners, - external_access=ExternalAccess( - external_user_emails=set(addr for _name, addr in parsed_recipients), - external_user_group_ids=set(), - is_public=False, - ), + external_access=external_access, ) From b3e0a068b5048b0a8fcecd7db155b7dbf2d5c6f2 Mon Sep 17 00:00:00 2001 From: Raunak Bhagat Date: Tue, 8 Jul 2025 15:46:08 -0700 Subject: [PATCH 31/31] Add helper for loading all docs with permission syncing --- .../connectors/imap/test_imap_connector.py | 6 ++- backend/tests/daily/connectors/utils.py | 38 ++++++++++++++++--- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/backend/tests/daily/connectors/imap/test_imap_connector.py b/backend/tests/daily/connectors/imap/test_imap_connector.py index d9602509c66..ec74c9043a8 100644 --- a/backend/tests/daily/connectors/imap/test_imap_connector.py +++ b/backend/tests/daily/connectors/imap/test_imap_connector.py @@ -7,7 +7,9 @@ from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider from onyx.connectors.imap.connector import ImapConnector from tests.daily.connectors.imap.models import EmailDoc -from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector +from tests.daily.connectors.utils import ( + load_all_docs_from_checkpoint_connector_with_perm_sync, +) from tests.daily.connectors.utils import to_documents @@ -68,7 +70,7 @@ def test_imap_connector( EmailDoc.from_doc(document=document) for document in to_documents( iterator=iter( - load_all_docs_from_checkpoint_connector( + load_all_docs_from_checkpoint_connector_with_perm_sync( connector=imap_connector, start=0, end=time.time(), diff --git a/backend/tests/daily/connectors/utils.py b/backend/tests/daily/connectors/utils.py index 768fe8d1fd7..388d567580d 100644 --- a/backend/tests/daily/connectors/utils.py +++ b/backend/tests/daily/connectors/utils.py @@ -1,3 +1,4 @@ +from collections.abc import Callable from collections.abc import Iterator from typing import cast from typing import TypeVar @@ -5,6 +6,7 @@ from onyx.connectors.connector_runner import CheckpointOutputWrapper from onyx.connectors.interfaces import CheckpointedConnector from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync +from onyx.connectors.interfaces import CheckpointOutput from onyx.connectors.interfaces import SecondsSinceUnixEpoch from onyx.connectors.models import ConnectorCheckpoint from onyx.connectors.models import ConnectorFailure @@ -15,21 +17,19 @@ _ITERATION_LIMIT = 100_000 CT = TypeVar("CT", bound=ConnectorCheckpoint) +LoadFunction = Callable[[CT], CheckpointOutput[CT]] -def load_all_docs_from_checkpoint_connector( +def _load_all_docs( connector: CheckpointedConnector[CT], - start: SecondsSinceUnixEpoch, - end: SecondsSinceUnixEpoch, + load: LoadFunction, ) -> list[Document]: num_iterations = 0 checkpoint = cast(CT, connector.build_dummy_checkpoint()) documents: list[Document] = [] while checkpoint.has_more: - doc_batch_generator = CheckpointOutputWrapper[CT]()( - connector.load_from_checkpoint(start, end, checkpoint) - ) + doc_batch_generator = CheckpointOutputWrapper[CT]()(load(checkpoint)) for document, failure, next_checkpoint in doc_batch_generator: if failure is not None: raise RuntimeError(f"Failed to load documents: {failure}") @@ -45,6 +45,32 @@ def load_all_docs_from_checkpoint_connector( return documents +def load_all_docs_from_checkpoint_connector_with_perm_sync( + connector: CheckpointedConnectorWithPermSync[CT], + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, +) -> list[Document]: + return _load_all_docs( + connector=connector, + load=lambda checkpoint: connector.load_from_checkpoint_with_perm_sync( + start=start, end=end, checkpoint=checkpoint + ), + ) + + +def load_all_docs_from_checkpoint_connector( + connector: CheckpointedConnector[CT], + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, +) -> list[Document]: + return _load_all_docs( + connector=connector, + load=lambda checkpoint: connector.load_from_checkpoint( + start=start, end=end, checkpoint=checkpoint + ), + ) + + def load_everything_from_checkpoint_connector( connector: CheckpointedConnector[CT], start: SecondsSinceUnixEpoch,