Skip to content

Commit e46a4fa

Browse files
WevesZhipengHe
authored andcommitted
Fix window_start (onyx-dot-app#4689)
* Fix window_start * Add comment
1 parent d3ef83b commit e46a4fa

File tree

5 files changed

+183
-36
lines changed

5 files changed

+183
-36
lines changed

backend/onyx/background/celery/tasks/indexing/tasks.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -898,9 +898,14 @@ def connector_indexing_task(
898898
)
899899

900900
# special bulletproofing ... truncate long exception messages
901-
sanitized_e = type(e)(str(e)[:1024])
902-
sanitized_e.__traceback__ = e.__traceback__
903-
raise sanitized_e
901+
# for exception types that require more args, this will fail
902+
# thus the try/except
903+
try:
904+
sanitized_e = type(e)(str(e)[:1024])
905+
sanitized_e.__traceback__ = e.__traceback__
906+
raise sanitized_e
907+
except Exception:
908+
raise e
904909

905910
finally:
906911
if lock.owned():

backend/onyx/background/indexing/run_indexing.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from onyx.connectors.models import IndexAttemptMetadata
3030
from onyx.connectors.models import TextSection
3131
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
32-
from onyx.db.connector_credential_pair import get_last_successful_attempt_time
32+
from onyx.db.connector_credential_pair import get_last_successful_attempt_poll_range_end
3333
from onyx.db.connector_credential_pair import update_connector_credential_pair
3434
from onyx.db.constants import CONNECTOR_VALIDATION_ERROR_MESSAGE_PREFIX
3535
from onyx.db.engine import get_session_with_current_tenant
@@ -296,20 +296,19 @@ def _run_indexing(
296296
search_settings_status=index_attempt_start.search_settings.status,
297297
)
298298

299-
last_successful_index_time = (
299+
last_successful_index_poll_range_end = (
300300
ctx.earliest_index_time
301301
if ctx.from_beginning
302-
else get_last_successful_attempt_time(
303-
connector_id=ctx.connector_id,
304-
credential_id=ctx.credential_id,
302+
else get_last_successful_attempt_poll_range_end(
303+
cc_pair_id=ctx.cc_pair_id,
305304
earliest_index=ctx.earliest_index_time,
306305
search_settings=index_attempt_start.search_settings,
307306
db_session=db_session_temp,
308307
)
309308
)
310-
if last_successful_index_time > POLL_CONNECTOR_OFFSET:
309+
if last_successful_index_poll_range_end > POLL_CONNECTOR_OFFSET:
311310
window_start = datetime.fromtimestamp(
312-
last_successful_index_time, tz=timezone.utc
311+
last_successful_index_poll_range_end, tz=timezone.utc
313312
) - timedelta(minutes=POLL_CONNECTOR_OFFSET)
314313
else:
315314
# don't go into "negative" time if we've never indexed before

backend/onyx/db/connector_credential_pair.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from onyx.db.models import Credential
2727
from onyx.db.models import IndexAttempt
2828
from onyx.db.models import IndexingStatus
29-
from onyx.db.models import IndexModelStatus
3029
from onyx.db.models import SearchSettings
3130
from onyx.db.models import User
3231
from onyx.db.models import User__UserGroup
@@ -283,50 +282,40 @@ def get_connector_credential_pairs_for_source(
283282
return list(db_session.scalars(stmt).unique().all())
284283

285284

286-
def get_last_successful_attempt_time(
287-
connector_id: int,
288-
credential_id: int,
285+
def get_last_successful_attempt_poll_range_end(
286+
cc_pair_id: int,
289287
earliest_index: float,
290288
search_settings: SearchSettings,
291289
db_session: Session,
292290
) -> float:
293-
"""Gets the timestamp of the last successful index run stored in
294-
the CC Pair row in the database"""
295-
if search_settings.status == IndexModelStatus.PRESENT:
296-
connector_credential_pair = get_connector_credential_pair(
297-
db_session=db_session,
298-
connector_id=connector_id,
299-
credential_id=credential_id,
300-
)
301-
if (
302-
connector_credential_pair is None
303-
or connector_credential_pair.last_successful_index_time is None
304-
):
305-
return earliest_index
291+
"""Used to get the latest `poll_range_end` for a given connector and credential.
306292
307-
return connector_credential_pair.last_successful_index_time.timestamp()
293+
This can be used to determine the next "start" time for a new index attempt.
308294
309-
# For Secondary Index we don't keep track of the latest success, so have to calculate it live
310-
attempt = (
295+
Note that the attempts time_started is not necessarily correct - that gets set
296+
separately and is similar but not exactly the same as the `poll_range_end`.
297+
"""
298+
latest_successful_index_attempt = (
311299
db_session.query(IndexAttempt)
312300
.join(
313301
ConnectorCredentialPair,
314302
IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id,
315303
)
316304
.filter(
317-
ConnectorCredentialPair.connector_id == connector_id,
318-
ConnectorCredentialPair.credential_id == credential_id,
305+
ConnectorCredentialPair.id == cc_pair_id,
319306
IndexAttempt.search_settings_id == search_settings.id,
320307
IndexAttempt.status == IndexingStatus.SUCCESS,
321308
)
322-
.order_by(IndexAttempt.time_started.desc())
309+
.order_by(IndexAttempt.poll_range_end.desc())
323310
.first()
324311
)
325-
326-
if not attempt or not attempt.time_started:
312+
if (
313+
not latest_successful_index_attempt
314+
or not latest_successful_index_attempt.poll_range_end
315+
):
327316
return earliest_index
328317

329-
return attempt.time_started.timestamp()
318+
return latest_successful_index_attempt.poll_range_end.timestamp()
330319

331320

332321
"""Updates"""

backend/onyx/server/documents/models.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ class IndexAttemptSnapshot(BaseModel):
163163
full_exception_trace: str | None
164164
time_started: str | None
165165
time_updated: str
166+
poll_range_start: datetime | None = None
167+
poll_range_end: datetime | None = None
166168

167169
@classmethod
168170
def from_index_attempt_db_model(
@@ -184,6 +186,8 @@ def from_index_attempt_db_model(
184186
else None
185187
),
186188
time_updated=index_attempt.time_updated.isoformat(),
189+
poll_range_start=index_attempt.poll_range_start,
190+
poll_range_end=index_attempt.poll_range_end,
187191
)
188192

189193

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import uuid
2+
from datetime import datetime
3+
from datetime import timedelta
4+
from datetime import timezone
5+
6+
import httpx
7+
8+
from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET
9+
from onyx.configs.constants import DocumentSource
10+
from onyx.connectors.mock_connector.connector import MockConnectorCheckpoint
11+
from onyx.connectors.models import InputType
12+
from onyx.db.enums import IndexingStatus
13+
from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_HOST
14+
from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_PORT
15+
from tests.integration.common_utils.managers.cc_pair import CCPairManager
16+
from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager
17+
from tests.integration.common_utils.test_document_utils import create_test_document
18+
from tests.integration.common_utils.test_models import DATestUser
19+
20+
21+
def _setup_mock_connector(
22+
mock_server_client: httpx.Client,
23+
admin_user: DATestUser,
24+
) -> None:
25+
test_doc = create_test_document()
26+
successful_response = {
27+
"documents": [test_doc.model_dump(mode="json")],
28+
"checkpoint": MockConnectorCheckpoint(has_more=False).model_dump(mode="json"),
29+
"failures": [],
30+
}
31+
response = mock_server_client.post(
32+
"/set-behavior",
33+
json=[successful_response, successful_response], # For two attempts
34+
)
35+
assert response.status_code == 200
36+
37+
38+
def test_poll_connector_time_ranges(
39+
mock_server_client: httpx.Client,
40+
admin_user: DATestUser,
41+
) -> None:
42+
"""
43+
Tests that poll connectors correctly set their poll_range_start and poll_range_end
44+
across multiple indexing attempts.
45+
"""
46+
# Set up mock server behavior - a simple successful response
47+
_setup_mock_connector(mock_server_client, admin_user)
48+
49+
# Create a CC Pair for the mock connector with POLL input type
50+
cc_pair_name = f"mock-poll-time-range-{uuid.uuid4()}"
51+
cc_pair = CCPairManager.create_from_scratch(
52+
name=cc_pair_name,
53+
source=DocumentSource.MOCK_CONNECTOR,
54+
input_type=InputType.POLL,
55+
connector_specific_config={
56+
"mock_server_host": MOCK_CONNECTOR_SERVER_HOST,
57+
"mock_server_port": MOCK_CONNECTOR_SERVER_PORT,
58+
},
59+
user_performing_action=admin_user,
60+
refresh_freq=3, # refresh often to ensure the second attempt actually runs
61+
)
62+
63+
# --- First Indexing Attempt ---
64+
time_before_first_attempt = datetime.now(timezone.utc)
65+
first_index_attempt = IndexAttemptManager.wait_for_index_attempt_start(
66+
cc_pair_id=cc_pair.id,
67+
user_performing_action=admin_user,
68+
)
69+
IndexAttemptManager.wait_for_index_attempt_completion(
70+
index_attempt_id=first_index_attempt.id,
71+
cc_pair_id=cc_pair.id,
72+
user_performing_action=admin_user,
73+
)
74+
time_after_first_attempt = datetime.now(timezone.utc)
75+
76+
# Fetch and validate the first attempt
77+
completed_first_attempt = IndexAttemptManager.get_index_attempt_by_id(
78+
index_attempt_id=first_index_attempt.id,
79+
cc_pair_id=cc_pair.id,
80+
user_performing_action=admin_user,
81+
)
82+
assert completed_first_attempt.status == IndexingStatus.SUCCESS
83+
assert completed_first_attempt.poll_range_start is not None
84+
assert completed_first_attempt.poll_range_end is not None
85+
86+
# For the first run (no prior successful attempts), poll_range_start should be epoch (0)
87+
expected_first_start = datetime.fromtimestamp(0, tz=timezone.utc)
88+
assert completed_first_attempt.poll_range_start == expected_first_start
89+
90+
# `poll_range_end` should be sometime in between the time the attempt
91+
# started and the time it finished.
92+
# no way to have a more precise assertion here since the `poll_range_end`
93+
# can really be set anytime in that range and be "correct"
94+
assert (
95+
time_before_first_attempt
96+
<= completed_first_attempt.poll_range_end
97+
<= time_after_first_attempt
98+
)
99+
100+
first_attempt_poll_end = completed_first_attempt.poll_range_end
101+
102+
# --- Second Indexing Attempt ---
103+
# Trigger another run manually (since automatic refresh might be too slow for test)
104+
# Ensure there's a slight delay so the poll window moves
105+
# In a real scenario, the scheduler would wait for the refresh frequency.
106+
# Here we manually trigger a new run.
107+
_setup_mock_connector(mock_server_client, admin_user)
108+
CCPairManager.run_once(
109+
cc_pair, from_beginning=False, user_performing_action=admin_user
110+
)
111+
112+
time_before_second_attempt = datetime.now(timezone.utc)
113+
second_index_attempt = IndexAttemptManager.wait_for_index_attempt_start(
114+
cc_pair_id=cc_pair.id,
115+
index_attempts_to_ignore=[first_index_attempt.id],
116+
user_performing_action=admin_user,
117+
)
118+
IndexAttemptManager.wait_for_index_attempt_completion(
119+
index_attempt_id=second_index_attempt.id,
120+
cc_pair_id=cc_pair.id,
121+
user_performing_action=admin_user,
122+
)
123+
time_after_second_attempt = datetime.now(timezone.utc)
124+
125+
# Fetch and validate the second attempt
126+
completed_second_attempt = IndexAttemptManager.get_index_attempt_by_id(
127+
index_attempt_id=second_index_attempt.id,
128+
cc_pair_id=cc_pair.id,
129+
user_performing_action=admin_user,
130+
)
131+
assert completed_second_attempt.status == IndexingStatus.SUCCESS
132+
assert completed_second_attempt.poll_range_start is not None
133+
assert completed_second_attempt.poll_range_end is not None
134+
135+
# For the second run, poll_range_start should be the previous successful attempt's
136+
# poll_range_end minus the POLL_CONNECTOR_OFFSET
137+
expected_second_start = first_attempt_poll_end - timedelta(
138+
minutes=POLL_CONNECTOR_OFFSET
139+
)
140+
assert completed_second_attempt.poll_range_start == expected_second_start
141+
142+
# `poll_range_end` should be sometime in between the time the attempt
143+
# started and the time it finished.
144+
# again, no way to have a more precise assertion here since the `poll_range_end`
145+
# can really be set anytime in that range and be "correct"
146+
assert (
147+
time_before_second_attempt
148+
<= completed_second_attempt.poll_range_end
149+
<= time_after_second_attempt
150+
)

0 commit comments

Comments
 (0)