25
25
from onyx .connectors .interfaces import SecondsSinceUnixEpoch
26
26
from onyx .connectors .models import BasicExpertInfo
27
27
from onyx .connectors .models import ConnectorCheckpoint
28
+ from onyx .connectors .models import ConnectorFailure
28
29
from onyx .connectors .models import ConnectorMissingCredentialError
29
30
from onyx .connectors .models import Document
31
+ from onyx .connectors .models import EntityFailure
30
32
from onyx .connectors .models import TextSection
31
33
from onyx .file_processing .html_utils import parse_html_page_basic
32
34
from onyx .utils .logger import setup_logger
@@ -191,7 +193,7 @@ def load_from_checkpoint(
191
193
)
192
194
193
195
docs = [
194
- _collect_document_for_channel_id (
196
+ _collect_documents_for_channel (
195
197
graph_client = self .graph_client ,
196
198
team = team ,
197
199
channel = channel ,
@@ -458,18 +460,17 @@ def _collect_all_channels_from_team(
458
460
return channels
459
461
460
462
461
- def _collect_document_for_channel_id (
463
+ def _collect_documents_for_channel (
462
464
graph_client : GraphClient ,
463
465
team : Team ,
464
466
channel : Channel ,
465
467
start : SecondsSinceUnixEpoch ,
466
468
end : SecondsSinceUnixEpoch ,
467
- ) -> Iterator [Document | None ]:
469
+ ) -> Iterator [Document | None | ConnectorFailure ]:
468
470
"""
469
- This function yields just one singular `Document`.
471
+ This function yields an iterator of `Document`s, where each `Document` corresponds to a "thread" .
470
472
471
- The reason why this function returns an instance of `Iterator` is because
472
- that is what `parallel_yield` expects. We want this to be lazily evaluated.
473
+ A "thread" is the conjunction of the "root" message and all of its replies.
473
474
"""
474
475
475
476
# Server-side filter conditions are not supported on the chat-messages API.
@@ -483,19 +484,37 @@ def _collect_document_for_channel_id(
483
484
page_loaded = lambda _ : None
484
485
).execute_query ()
485
486
486
- thread = [
487
- message
488
- for message in message_collection
489
- if _filter_message (message = message , start = start , end = end )
490
- ]
487
+ for message in message_collection :
488
+ if not message .id :
489
+ continue
491
490
492
- yield _convert_thread_to_document (
493
- channel = channel ,
494
- thread = thread ,
495
- )
491
+ if not _should_process_message (message = message , start = start , end = end ):
492
+ continue
493
+
494
+ try :
495
+ replies = list (message .replies .get_all ().execute_query ())
496
+ thread = [message ]
497
+ thread .extend (replies [::- 1 ])
498
+
499
+ # Note:
500
+ # We convert an entire *thread* (including the root message and its replies) into one, singular `Document`.
501
+ # I.e., we don't convert each individual message and each individual reply into their own individual `Document`s.
502
+ if doc := _convert_thread_to_document (
503
+ channel = channel ,
504
+ thread = thread ,
505
+ ):
506
+ yield doc
507
+ except Exception as e :
508
+ yield ConnectorFailure (
509
+ failed_entity = EntityFailure (
510
+ entity_id = message .id ,
511
+ ),
512
+ failure_message = f"Retrieval of message and its replies failed; { channel .id = } { message .id } " ,
513
+ exception = e ,
514
+ )
496
515
497
516
498
- def _filter_message (
517
+ def _should_process_message (
499
518
message : ChatMessage ,
500
519
start : SecondsSinceUnixEpoch ,
501
520
end : SecondsSinceUnixEpoch ,
0 commit comments