Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions backend/onyx/connectors/teams/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import EntityFailure
from onyx.connectors.models import TextSection
from onyx.file_processing.html_utils import parse_html_page_basic
from onyx.utils.logger import setup_logger
Expand Down Expand Up @@ -191,7 +193,7 @@ def load_from_checkpoint(
)

docs = [
_collect_document_for_channel_id(
_collect_documents_for_channel(
graph_client=self.graph_client,
team=team,
channel=channel,
Expand Down Expand Up @@ -458,18 +460,17 @@ def _collect_all_channels_from_team(
return channels


def _collect_document_for_channel_id(
def _collect_documents_for_channel(
graph_client: GraphClient,
team: Team,
channel: Channel,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
) -> Iterator[Document | None]:
) -> Iterator[Document | None | ConnectorFailure]:
"""
This function yields just one singular `Document`.
This function yields an iterator of `Document`s, where each `Document` corresponds to a "thread".

The reason why this function returns an instance of `Iterator` is because
that is what `parallel_yield` expects. We want this to be lazily evaluated.
A "thread" is the conjunction of the "root" message and all of its replies.
"""

# Server-side filter conditions are not supported on the chat-messages API.
Expand All @@ -483,19 +484,34 @@ def _collect_document_for_channel_id(
page_loaded=lambda _: None
).execute_query()

thread = [
message
for message in message_collection
if _filter_message(message=message, start=start, end=end)
]
for message in message_collection:
if not _should_process_message(message=message, start=start, end=end):
continue

yield _convert_thread_to_document(
channel=channel,
thread=thread,
)
try:
replies = list(message.replies.get_all().execute_query())
thread = [message]
thread.extend(replies[::-1])

# Note:
# We convert an entire *thread* (including the root message and its replies) into one, singular `Document`.
# I.e., we don't convert each individual message and each individual reply into their own individual `Document`s.
if doc := _convert_thread_to_document(
channel=channel,
thread=thread,
):
yield doc
except Exception as e:
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id="messages",
),
failure_message=f"Retrieval of message and its replies failed; {channel.id=} {message.id}",
exception=e,
)


def _filter_message(
def _should_process_message(
message: ChatMessage,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
Expand Down
16 changes: 13 additions & 3 deletions backend/tests/daily/connectors/teams/test_teams_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,24 @@ def teams_connector(
@pytest.mark.parametrize(
"teams_connector,expected_messages",
[
[["Onyx-Testing"], set(["This is the first message in Onyx-Testing ..."])],
[
["Onyx-Testing"],
set(
[
"This is the first message in Onyx-Testing ...This is a reply!This is a second reply.Third.4th.5",
"Testing body.",
]
),
],
[
["Onyx"],
set(
[
"yeah!",
"but not least",
"Hello, world!",
"My favorite color is red.\n\xa0\nPablos favorite color is blue",
"but not leastyeah!",
"My favorite color is red.\n\xa0\nPablos favorite color is bluePika's favorite color is greenbut"
" it might also be yellow",
]
),
],
Expand Down
Loading