25
25
from onyx .connectors .interfaces import SecondsSinceUnixEpoch
26
26
from onyx .connectors .models import BasicExpertInfo
27
27
from onyx .connectors .models import ConnectorCheckpoint
28
+ from onyx .connectors .models import ConnectorFailure
28
29
from onyx .connectors .models import ConnectorMissingCredentialError
29
30
from onyx .connectors .models import Document
31
+ from onyx .connectors .models import EntityFailure
30
32
from onyx .connectors .models import TextSection
31
33
from onyx .file_processing .html_utils import parse_html_page_basic
32
34
from onyx .utils .logger import setup_logger
@@ -191,7 +193,7 @@ def load_from_checkpoint(
191
193
)
192
194
193
195
docs = [
194
- _collect_document_for_channel_id (
196
+ _collect_documents_for_channel (
195
197
graph_client = self .graph_client ,
196
198
team = team ,
197
199
channel = channel ,
@@ -458,18 +460,17 @@ def _collect_all_channels_from_team(
458
460
return channels
459
461
460
462
461
- def _collect_document_for_channel_id (
463
+ def _collect_documents_for_channel (
462
464
graph_client : GraphClient ,
463
465
team : Team ,
464
466
channel : Channel ,
465
467
start : SecondsSinceUnixEpoch ,
466
468
end : SecondsSinceUnixEpoch ,
467
- ) -> Iterator [Document | None ]:
469
+ ) -> Iterator [Document | None | ConnectorFailure ]:
468
470
"""
469
- This function yields just one singular `Document`.
471
+ This function yields an iterator of `Document`s, where each `Document` corresponds to a "thread" .
470
472
471
- The reason why this function returns an instance of `Iterator` is because
472
- that is what `parallel_yield` expects. We want this to be lazily evaluated.
473
+ A "thread" is the conjunction of the "root" message and all of its replies.
473
474
"""
474
475
475
476
# Server-side filter conditions are not supported on the chat-messages API.
@@ -487,18 +488,27 @@ def _collect_document_for_channel_id(
487
488
if not _should_process_message (message = message , start = start , end = end ):
488
489
continue
489
490
490
- replies = list (message .replies .get_all ().execute_query ())
491
- thread = [message ]
492
- thread .extend (replies [::- 1 ])
493
-
494
- # Note:
495
- # We convert an entire *thread* (including the root message and its replies) into one, singular `Document`.
496
- # I.e., we don't convert each individual message and each individual reply into their own individual `Document`s.
497
- if doc := _convert_thread_to_document (
498
- channel = channel ,
499
- thread = thread ,
500
- ):
501
- yield doc
491
+ try :
492
+ replies = list (message .replies .get_all ().execute_query ())
493
+ thread = [message ]
494
+ thread .extend (replies [::- 1 ])
495
+
496
+ # Note:
497
+ # We convert an entire *thread* (including the root message and its replies) into one, singular `Document`.
498
+ # I.e., we don't convert each individual message and each individual reply into their own individual `Document`s.
499
+ if doc := _convert_thread_to_document (
500
+ channel = channel ,
501
+ thread = thread ,
502
+ ):
503
+ yield doc
504
+ except Exception as e :
505
+ yield ConnectorFailure (
506
+ failed_entity = EntityFailure (
507
+ entity_id = "messages" ,
508
+ ),
509
+ failure_message = f"Retrieval of message and its replies failed; { channel .id = } { message .id } " ,
510
+ exception = e ,
511
+ )
502
512
503
513
504
514
def _should_process_message (
0 commit comments