diff --git a/backend/onyx/connectors/confluence/connector.py b/backend/onyx/connectors/confluence/connector.py index 4407fdd557d..de220b890f2 100644 --- a/backend/onyx/connectors/confluence/connector.py +++ b/backend/onyx/connectors/confluence/connector.py @@ -265,6 +265,7 @@ def _convert_page_to_document( # Extract basic page information page_id = page["id"] page_title = page["title"] + logger.info(f"Converting page {page_title} to document") page_url = build_confluence_document_id( self.wiki_base, page["_links"]["webui"], self.is_cloud ) @@ -396,7 +397,9 @@ def _fetch_page_attachments( ) continue - logger.info(f"Processing attachment: {attachment['title']}") + logger.info( + f"Processing attachment: {attachment['title']} attached to page {page['title']}" + ) # Attempt to get textual content or image summarization: object_url = build_confluence_document_id( @@ -458,7 +461,9 @@ def _fetch_document_batches( - Attempt to convert it with convert_attachment_to_content(...) - If successful, create a new Section with the extracted text or summary. """ - doc_count = 0 + + # number of documents/errors yielded + yield_count = 0 checkpoint = copy.deepcopy(checkpoint) prev_doc_ids = checkpoint.last_seen_doc_ids @@ -474,12 +479,17 @@ def _fetch_document_batches( expand=",".join(_PAGE_EXPANSION_FIELDS), limit=2 * self.batch_size, ): + # create checkpoint after enough documents have been processed + if yield_count >= self.batch_size: + return checkpoint + if page["id"] in prev_doc_ids: # There are a few seconds of fuzziness in the request, # so we skip if we saw this page on the last run continue # Build doc from page doc_or_failure = self._convert_page_to_document(page) + yield_count += 1 if isinstance(doc_or_failure, ConnectorFailure): yield doc_or_failure @@ -497,14 +507,10 @@ def _fetch_document_batches( continue # yield completed document - doc_count += 1 + checkpoint.last_seen_doc_ids.append(page["id"]) yield doc_or_failure - # create checkpoint after enough documents have been processed - if doc_count >= self.batch_size: - return checkpoint - checkpoint.has_more = False return checkpoint diff --git a/backend/tests/unit/onyx/connectors/confluence/test_confluence_checkpointing.py b/backend/tests/unit/onyx/connectors/confluence/test_confluence_checkpointing.py index 30f4e041938..c5fc5d19d97 100644 --- a/backend/tests/unit/onyx/connectors/confluence/test_confluence_checkpointing.py +++ b/backend/tests/unit/onyx/connectors/confluence/test_confluence_checkpointing.py @@ -148,7 +148,7 @@ def test_load_from_checkpoint_happy_path( assert confluence_client is not None, "bad test setup" paginated_cql_mock = cast(MagicMock, confluence_client.paginated_cql_retrieval) paginated_cql_mock.side_effect = [ - [mock_page1, mock_page2], + [mock_page1, mock_page2, mock_page3], [], # comments [], # attachments [], # comments @@ -366,22 +366,16 @@ def test_checkpoint_progress( confluence_connector, 0, end_time ) - assert len(outputs) == 2 + assert len(outputs) == 1 first_checkpoint = outputs[0].next_checkpoint - last_checkpoint = outputs[-1].next_checkpoint assert first_checkpoint == ConfluenceCheckpoint( last_updated=later_timestamp.timestamp(), - has_more=True, + has_more=False, last_seen_doc_ids=["1", "2"], ) - # Verify checkpoint contains both document IDs and latest timestamp - assert last_checkpoint == ConfluenceCheckpoint( - last_updated=later_timestamp.timestamp(), has_more=False, last_seen_doc_ids=[] - ) - assert len(outputs[0].items) == 2 assert isinstance(outputs[0].items[0], Document) assert outputs[0].items[0].semantic_identifier == "Page 1" @@ -404,11 +398,12 @@ def test_checkpoint_progress( ] # Use the checkpoint from first run + first_checkpoint.has_more = True outputs_with_checkpoint = load_everything_from_checkpoint_connector_from_checkpoint( confluence_connector, 0, end_time, first_checkpoint ) - # Verify no documents were processed since they were in last_seen_doc_ids + # Verify only the new page was processed since the others were in last_seen_doc_ids assert len(outputs_with_checkpoint) == 1 assert len(outputs_with_checkpoint[0].items) == 1 assert isinstance(outputs_with_checkpoint[0].items[0], Document)