Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,9 +898,14 @@ def connector_indexing_task(
)

# special bulletproofing ... truncate long exception messages
sanitized_e = type(e)(str(e)[:1024])
sanitized_e.__traceback__ = e.__traceback__
raise sanitized_e
# for exception types that require more args, this will fail
# thus the try/except
try:
sanitized_e = type(e)(str(e)[:1024])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this previously raised an exception for exception types that needed more args

sanitized_e.__traceback__ = e.__traceback__
raise sanitized_e
except Exception:
raise e

finally:
if lock.owned():
Expand Down
13 changes: 6 additions & 7 deletions backend/onyx/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from onyx.connectors.models import IndexAttemptMetadata
from onyx.connectors.models import TextSection
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import get_last_successful_attempt_time
from onyx.db.connector_credential_pair import get_last_successful_attempt_poll_range_end
from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.constants import CONNECTOR_VALIDATION_ERROR_MESSAGE_PREFIX
from onyx.db.engine import get_session_with_current_tenant
Expand Down Expand Up @@ -296,20 +296,19 @@ def _run_indexing(
search_settings_status=index_attempt_start.search_settings.status,
)

last_successful_index_time = (
last_successful_index_poll_range_end = (
ctx.earliest_index_time
if ctx.from_beginning
else get_last_successful_attempt_time(
connector_id=ctx.connector_id,
credential_id=ctx.credential_id,
else get_last_successful_attempt_poll_range_end(
cc_pair_id=ctx.cc_pair_id,
earliest_index=ctx.earliest_index_time,
search_settings=index_attempt_start.search_settings,
db_session=db_session_temp,
)
)
if last_successful_index_time > POLL_CONNECTOR_OFFSET:
if last_successful_index_poll_range_end > POLL_CONNECTOR_OFFSET:
window_start = datetime.fromtimestamp(
last_successful_index_time, tz=timezone.utc
last_successful_index_poll_range_end, tz=timezone.utc
) - timedelta(minutes=POLL_CONNECTOR_OFFSET)
else:
# don't go into "negative" time if we've never indexed before
Expand Down
41 changes: 15 additions & 26 deletions backend/onyx/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from onyx.db.models import Credential
from onyx.db.models import IndexAttempt
from onyx.db.models import IndexingStatus
from onyx.db.models import IndexModelStatus
from onyx.db.models import SearchSettings
from onyx.db.models import User
from onyx.db.models import User__UserGroup
Expand Down Expand Up @@ -283,50 +282,40 @@ def get_connector_credential_pairs_for_source(
return list(db_session.scalars(stmt).unique().all())


def get_last_successful_attempt_time(
connector_id: int,
credential_id: int,
def get_last_successful_attempt_poll_range_end(
cc_pair_id: int,
earliest_index: float,
search_settings: SearchSettings,
db_session: Session,
) -> float:
"""Gets the timestamp of the last successful index run stored in
the CC Pair row in the database"""
if search_settings.status == IndexModelStatus.PRESENT:
connector_credential_pair = get_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
if (
connector_credential_pair is None
or connector_credential_pair.last_successful_index_time is None
):
return earliest_index
"""Used to get the latest `poll_range_end` for a given connector and credential.

return connector_credential_pair.last_successful_index_time.timestamp()
This can be used to determine the next "start" time for a new index attempt.

# For Secondary Index we don't keep track of the latest success, so have to calculate it live
attempt = (
Note that the attempts time_started is not necessarily correct - that gets set
separately and is similar but not exactly the same as the `poll_range_end`.
"""
latest_successful_index_attempt = (
db_session.query(IndexAttempt)
.join(
ConnectorCredentialPair,
IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id,
)
.filter(
ConnectorCredentialPair.connector_id == connector_id,
ConnectorCredentialPair.credential_id == credential_id,
ConnectorCredentialPair.id == cc_pair_id,
IndexAttempt.search_settings_id == search_settings.id,
IndexAttempt.status == IndexingStatus.SUCCESS,
)
.order_by(IndexAttempt.time_started.desc())
.order_by(IndexAttempt.poll_range_end.desc())
.first()
)

if not attempt or not attempt.time_started:
if (
not latest_successful_index_attempt
or not latest_successful_index_attempt.poll_range_end
):
return earliest_index

return attempt.time_started.timestamp()
return latest_successful_index_attempt.poll_range_end.timestamp()


"""Updates"""
Expand Down
4 changes: 4 additions & 0 deletions backend/onyx/server/documents/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class IndexAttemptSnapshot(BaseModel):
full_exception_trace: str | None
time_started: str | None
time_updated: str
poll_range_start: datetime | None = None
poll_range_end: datetime | None = None

@classmethod
def from_index_attempt_db_model(
Expand All @@ -188,6 +190,8 @@ def from_index_attempt_db_model(
else None
),
time_updated=index_attempt.time_updated.isoformat(),
poll_range_start=index_attempt.poll_range_start,
poll_range_end=index_attempt.poll_range_end,
)


Expand Down
150 changes: 150 additions & 0 deletions backend/tests/integration/tests/indexing/test_polling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import uuid
from datetime import datetime
from datetime import timedelta
from datetime import timezone

import httpx

from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET
from onyx.configs.constants import DocumentSource
from onyx.connectors.mock_connector.connector import MockConnectorCheckpoint
from onyx.connectors.models import InputType
from onyx.db.enums import IndexingStatus
from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_HOST
from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_PORT
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager
from tests.integration.common_utils.test_document_utils import create_test_document
from tests.integration.common_utils.test_models import DATestUser


def _setup_mock_connector(
mock_server_client: httpx.Client,
admin_user: DATestUser,
) -> None:
test_doc = create_test_document()
successful_response = {
"documents": [test_doc.model_dump(mode="json")],
"checkpoint": MockConnectorCheckpoint(has_more=False).model_dump(mode="json"),
"failures": [],
}
response = mock_server_client.post(
"/set-behavior",
json=[successful_response, successful_response], # For two attempts
)
assert response.status_code == 200


def test_poll_connector_time_ranges(
mock_server_client: httpx.Client,
admin_user: DATestUser,
) -> None:
"""
Tests that poll connectors correctly set their poll_range_start and poll_range_end
across multiple indexing attempts.
"""
# Set up mock server behavior - a simple successful response
_setup_mock_connector(mock_server_client, admin_user)

# Create a CC Pair for the mock connector with POLL input type
cc_pair_name = f"mock-poll-time-range-{uuid.uuid4()}"
cc_pair = CCPairManager.create_from_scratch(
name=cc_pair_name,
source=DocumentSource.MOCK_CONNECTOR,
input_type=InputType.POLL,
connector_specific_config={
"mock_server_host": MOCK_CONNECTOR_SERVER_HOST,
"mock_server_port": MOCK_CONNECTOR_SERVER_PORT,
},
user_performing_action=admin_user,
refresh_freq=3, # refresh often to ensure the second attempt actually runs
)

# --- First Indexing Attempt ---
time_before_first_attempt = datetime.now(timezone.utc)
first_index_attempt = IndexAttemptManager.wait_for_index_attempt_start(
cc_pair_id=cc_pair.id,
user_performing_action=admin_user,
)
IndexAttemptManager.wait_for_index_attempt_completion(
index_attempt_id=first_index_attempt.id,
cc_pair_id=cc_pair.id,
user_performing_action=admin_user,
)
time_after_first_attempt = datetime.now(timezone.utc)

# Fetch and validate the first attempt
completed_first_attempt = IndexAttemptManager.get_index_attempt_by_id(
index_attempt_id=first_index_attempt.id,
cc_pair_id=cc_pair.id,
user_performing_action=admin_user,
)
assert completed_first_attempt.status == IndexingStatus.SUCCESS
assert completed_first_attempt.poll_range_start is not None
assert completed_first_attempt.poll_range_end is not None

# For the first run (no prior successful attempts), poll_range_start should be epoch (0)
expected_first_start = datetime.fromtimestamp(0, tz=timezone.utc)
assert completed_first_attempt.poll_range_start == expected_first_start

# `poll_range_end` should be sometime in between the time the attempt
# started and the time it finished.
# no way to have a more precise assertion here since the `poll_range_end`
# can really be set anytime in that range and be "correct"
assert (
time_before_first_attempt
<= completed_first_attempt.poll_range_end
<= time_after_first_attempt
)

first_attempt_poll_end = completed_first_attempt.poll_range_end

# --- Second Indexing Attempt ---
# Trigger another run manually (since automatic refresh might be too slow for test)
# Ensure there's a slight delay so the poll window moves
# In a real scenario, the scheduler would wait for the refresh frequency.
# Here we manually trigger a new run.
_setup_mock_connector(mock_server_client, admin_user)
CCPairManager.run_once(
cc_pair, from_beginning=False, user_performing_action=admin_user
)

time_before_second_attempt = datetime.now(timezone.utc)
second_index_attempt = IndexAttemptManager.wait_for_index_attempt_start(
cc_pair_id=cc_pair.id,
index_attempts_to_ignore=[first_index_attempt.id],
user_performing_action=admin_user,
)
IndexAttemptManager.wait_for_index_attempt_completion(
index_attempt_id=second_index_attempt.id,
cc_pair_id=cc_pair.id,
user_performing_action=admin_user,
)
time_after_second_attempt = datetime.now(timezone.utc)

# Fetch and validate the second attempt
completed_second_attempt = IndexAttemptManager.get_index_attempt_by_id(
index_attempt_id=second_index_attempt.id,
cc_pair_id=cc_pair.id,
user_performing_action=admin_user,
)
assert completed_second_attempt.status == IndexingStatus.SUCCESS
assert completed_second_attempt.poll_range_start is not None
assert completed_second_attempt.poll_range_end is not None

# For the second run, poll_range_start should be the previous successful attempt's
# poll_range_end minus the POLL_CONNECTOR_OFFSET
expected_second_start = first_attempt_poll_end - timedelta(
minutes=POLL_CONNECTOR_OFFSET
)
assert completed_second_attempt.poll_range_start == expected_second_start

# `poll_range_end` should be sometime in between the time the attempt
# started and the time it finished.
# again, no way to have a more precise assertion here since the `poll_range_end`
# can really be set anytime in that range and be "correct"
assert (
time_before_second_attempt
<= completed_second_attempt.poll_range_end
<= time_after_second_attempt
)