diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index eb7324455e6..c361d838680 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -898,9 +898,14 @@ def connector_indexing_task( ) # special bulletproofing ... truncate long exception messages - sanitized_e = type(e)(str(e)[:1024]) - sanitized_e.__traceback__ = e.__traceback__ - raise sanitized_e + # for exception types that require more args, this will fail + # thus the try/except + try: + sanitized_e = type(e)(str(e)[:1024]) + sanitized_e.__traceback__ = e.__traceback__ + raise sanitized_e + except Exception: + raise e finally: if lock.owned(): diff --git a/backend/onyx/background/indexing/run_indexing.py b/backend/onyx/background/indexing/run_indexing.py index 16756368b2c..41cbeb931e0 100644 --- a/backend/onyx/background/indexing/run_indexing.py +++ b/backend/onyx/background/indexing/run_indexing.py @@ -29,7 +29,7 @@ from onyx.connectors.models import IndexAttemptMetadata from onyx.connectors.models import TextSection from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id -from onyx.db.connector_credential_pair import get_last_successful_attempt_time +from onyx.db.connector_credential_pair import get_last_successful_attempt_poll_range_end from onyx.db.connector_credential_pair import update_connector_credential_pair from onyx.db.constants import CONNECTOR_VALIDATION_ERROR_MESSAGE_PREFIX from onyx.db.engine import get_session_with_current_tenant @@ -296,20 +296,19 @@ def _run_indexing( search_settings_status=index_attempt_start.search_settings.status, ) - last_successful_index_time = ( + last_successful_index_poll_range_end = ( ctx.earliest_index_time if ctx.from_beginning - else get_last_successful_attempt_time( - connector_id=ctx.connector_id, - credential_id=ctx.credential_id, + else get_last_successful_attempt_poll_range_end( + cc_pair_id=ctx.cc_pair_id, earliest_index=ctx.earliest_index_time, search_settings=index_attempt_start.search_settings, db_session=db_session_temp, ) ) - if last_successful_index_time > POLL_CONNECTOR_OFFSET: + if last_successful_index_poll_range_end > POLL_CONNECTOR_OFFSET: window_start = datetime.fromtimestamp( - last_successful_index_time, tz=timezone.utc + last_successful_index_poll_range_end, tz=timezone.utc ) - timedelta(minutes=POLL_CONNECTOR_OFFSET) else: # don't go into "negative" time if we've never indexed before diff --git a/backend/onyx/db/connector_credential_pair.py b/backend/onyx/db/connector_credential_pair.py index 75b13da8640..788fc672a20 100644 --- a/backend/onyx/db/connector_credential_pair.py +++ b/backend/onyx/db/connector_credential_pair.py @@ -26,7 +26,6 @@ from onyx.db.models import Credential from onyx.db.models import IndexAttempt from onyx.db.models import IndexingStatus -from onyx.db.models import IndexModelStatus from onyx.db.models import SearchSettings from onyx.db.models import User from onyx.db.models import User__UserGroup @@ -283,50 +282,40 @@ def get_connector_credential_pairs_for_source( return list(db_session.scalars(stmt).unique().all()) -def get_last_successful_attempt_time( - connector_id: int, - credential_id: int, +def get_last_successful_attempt_poll_range_end( + cc_pair_id: int, earliest_index: float, search_settings: SearchSettings, db_session: Session, ) -> float: - """Gets the timestamp of the last successful index run stored in - the CC Pair row in the database""" - if search_settings.status == IndexModelStatus.PRESENT: - connector_credential_pair = get_connector_credential_pair( - db_session=db_session, - connector_id=connector_id, - credential_id=credential_id, - ) - if ( - connector_credential_pair is None - or connector_credential_pair.last_successful_index_time is None - ): - return earliest_index + """Used to get the latest `poll_range_end` for a given connector and credential. - return connector_credential_pair.last_successful_index_time.timestamp() + This can be used to determine the next "start" time for a new index attempt. - # For Secondary Index we don't keep track of the latest success, so have to calculate it live - attempt = ( + Note that the attempts time_started is not necessarily correct - that gets set + separately and is similar but not exactly the same as the `poll_range_end`. + """ + latest_successful_index_attempt = ( db_session.query(IndexAttempt) .join( ConnectorCredentialPair, IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id, ) .filter( - ConnectorCredentialPair.connector_id == connector_id, - ConnectorCredentialPair.credential_id == credential_id, + ConnectorCredentialPair.id == cc_pair_id, IndexAttempt.search_settings_id == search_settings.id, IndexAttempt.status == IndexingStatus.SUCCESS, ) - .order_by(IndexAttempt.time_started.desc()) + .order_by(IndexAttempt.poll_range_end.desc()) .first() ) - - if not attempt or not attempt.time_started: + if ( + not latest_successful_index_attempt + or not latest_successful_index_attempt.poll_range_end + ): return earliest_index - return attempt.time_started.timestamp() + return latest_successful_index_attempt.poll_range_end.timestamp() """Updates""" diff --git a/backend/onyx/server/documents/models.py b/backend/onyx/server/documents/models.py index 365094b35fb..debe94b190e 100644 --- a/backend/onyx/server/documents/models.py +++ b/backend/onyx/server/documents/models.py @@ -167,6 +167,8 @@ class IndexAttemptSnapshot(BaseModel): full_exception_trace: str | None time_started: str | None time_updated: str + poll_range_start: datetime | None = None + poll_range_end: datetime | None = None @classmethod def from_index_attempt_db_model( @@ -188,6 +190,8 @@ def from_index_attempt_db_model( else None ), time_updated=index_attempt.time_updated.isoformat(), + poll_range_start=index_attempt.poll_range_start, + poll_range_end=index_attempt.poll_range_end, ) diff --git a/backend/tests/integration/tests/indexing/test_polling.py b/backend/tests/integration/tests/indexing/test_polling.py new file mode 100644 index 00000000000..7f4ee43887d --- /dev/null +++ b/backend/tests/integration/tests/indexing/test_polling.py @@ -0,0 +1,150 @@ +import uuid +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +import httpx + +from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET +from onyx.configs.constants import DocumentSource +from onyx.connectors.mock_connector.connector import MockConnectorCheckpoint +from onyx.connectors.models import InputType +from onyx.db.enums import IndexingStatus +from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_HOST +from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_PORT +from tests.integration.common_utils.managers.cc_pair import CCPairManager +from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager +from tests.integration.common_utils.test_document_utils import create_test_document +from tests.integration.common_utils.test_models import DATestUser + + +def _setup_mock_connector( + mock_server_client: httpx.Client, + admin_user: DATestUser, +) -> None: + test_doc = create_test_document() + successful_response = { + "documents": [test_doc.model_dump(mode="json")], + "checkpoint": MockConnectorCheckpoint(has_more=False).model_dump(mode="json"), + "failures": [], + } + response = mock_server_client.post( + "/set-behavior", + json=[successful_response, successful_response], # For two attempts + ) + assert response.status_code == 200 + + +def test_poll_connector_time_ranges( + mock_server_client: httpx.Client, + admin_user: DATestUser, +) -> None: + """ + Tests that poll connectors correctly set their poll_range_start and poll_range_end + across multiple indexing attempts. + """ + # Set up mock server behavior - a simple successful response + _setup_mock_connector(mock_server_client, admin_user) + + # Create a CC Pair for the mock connector with POLL input type + cc_pair_name = f"mock-poll-time-range-{uuid.uuid4()}" + cc_pair = CCPairManager.create_from_scratch( + name=cc_pair_name, + source=DocumentSource.MOCK_CONNECTOR, + input_type=InputType.POLL, + connector_specific_config={ + "mock_server_host": MOCK_CONNECTOR_SERVER_HOST, + "mock_server_port": MOCK_CONNECTOR_SERVER_PORT, + }, + user_performing_action=admin_user, + refresh_freq=3, # refresh often to ensure the second attempt actually runs + ) + + # --- First Indexing Attempt --- + time_before_first_attempt = datetime.now(timezone.utc) + first_index_attempt = IndexAttemptManager.wait_for_index_attempt_start( + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + IndexAttemptManager.wait_for_index_attempt_completion( + index_attempt_id=first_index_attempt.id, + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + time_after_first_attempt = datetime.now(timezone.utc) + + # Fetch and validate the first attempt + completed_first_attempt = IndexAttemptManager.get_index_attempt_by_id( + index_attempt_id=first_index_attempt.id, + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + assert completed_first_attempt.status == IndexingStatus.SUCCESS + assert completed_first_attempt.poll_range_start is not None + assert completed_first_attempt.poll_range_end is not None + + # For the first run (no prior successful attempts), poll_range_start should be epoch (0) + expected_first_start = datetime.fromtimestamp(0, tz=timezone.utc) + assert completed_first_attempt.poll_range_start == expected_first_start + + # `poll_range_end` should be sometime in between the time the attempt + # started and the time it finished. + # no way to have a more precise assertion here since the `poll_range_end` + # can really be set anytime in that range and be "correct" + assert ( + time_before_first_attempt + <= completed_first_attempt.poll_range_end + <= time_after_first_attempt + ) + + first_attempt_poll_end = completed_first_attempt.poll_range_end + + # --- Second Indexing Attempt --- + # Trigger another run manually (since automatic refresh might be too slow for test) + # Ensure there's a slight delay so the poll window moves + # In a real scenario, the scheduler would wait for the refresh frequency. + # Here we manually trigger a new run. + _setup_mock_connector(mock_server_client, admin_user) + CCPairManager.run_once( + cc_pair, from_beginning=False, user_performing_action=admin_user + ) + + time_before_second_attempt = datetime.now(timezone.utc) + second_index_attempt = IndexAttemptManager.wait_for_index_attempt_start( + cc_pair_id=cc_pair.id, + index_attempts_to_ignore=[first_index_attempt.id], + user_performing_action=admin_user, + ) + IndexAttemptManager.wait_for_index_attempt_completion( + index_attempt_id=second_index_attempt.id, + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + time_after_second_attempt = datetime.now(timezone.utc) + + # Fetch and validate the second attempt + completed_second_attempt = IndexAttemptManager.get_index_attempt_by_id( + index_attempt_id=second_index_attempt.id, + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + assert completed_second_attempt.status == IndexingStatus.SUCCESS + assert completed_second_attempt.poll_range_start is not None + assert completed_second_attempt.poll_range_end is not None + + # For the second run, poll_range_start should be the previous successful attempt's + # poll_range_end minus the POLL_CONNECTOR_OFFSET + expected_second_start = first_attempt_poll_end - timedelta( + minutes=POLL_CONNECTOR_OFFSET + ) + assert completed_second_attempt.poll_range_start == expected_second_start + + # `poll_range_end` should be sometime in between the time the attempt + # started and the time it finished. + # again, no way to have a more precise assertion here since the `poll_range_end` + # can really be set anytime in that range and be "correct" + assert ( + time_before_second_attempt + <= completed_second_attempt.poll_range_end + <= time_after_second_attempt + )