-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix window_start #4689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Fix window_start #4689
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
150 changes: 150 additions & 0 deletions
150
backend/tests/integration/tests/indexing/test_polling.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
import uuid | ||
from datetime import datetime | ||
from datetime import timedelta | ||
from datetime import timezone | ||
|
||
import httpx | ||
|
||
from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET | ||
from onyx.configs.constants import DocumentSource | ||
from onyx.connectors.mock_connector.connector import MockConnectorCheckpoint | ||
from onyx.connectors.models import InputType | ||
from onyx.db.enums import IndexingStatus | ||
from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_HOST | ||
from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_PORT | ||
from tests.integration.common_utils.managers.cc_pair import CCPairManager | ||
from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager | ||
from tests.integration.common_utils.test_document_utils import create_test_document | ||
from tests.integration.common_utils.test_models import DATestUser | ||
|
||
|
||
def _setup_mock_connector( | ||
mock_server_client: httpx.Client, | ||
admin_user: DATestUser, | ||
) -> None: | ||
test_doc = create_test_document() | ||
successful_response = { | ||
"documents": [test_doc.model_dump(mode="json")], | ||
"checkpoint": MockConnectorCheckpoint(has_more=False).model_dump(mode="json"), | ||
"failures": [], | ||
} | ||
response = mock_server_client.post( | ||
"/set-behavior", | ||
json=[successful_response, successful_response], # For two attempts | ||
) | ||
assert response.status_code == 200 | ||
|
||
|
||
def test_poll_connector_time_ranges( | ||
mock_server_client: httpx.Client, | ||
admin_user: DATestUser, | ||
) -> None: | ||
""" | ||
Tests that poll connectors correctly set their poll_range_start and poll_range_end | ||
across multiple indexing attempts. | ||
""" | ||
# Set up mock server behavior - a simple successful response | ||
_setup_mock_connector(mock_server_client, admin_user) | ||
|
||
# Create a CC Pair for the mock connector with POLL input type | ||
cc_pair_name = f"mock-poll-time-range-{uuid.uuid4()}" | ||
cc_pair = CCPairManager.create_from_scratch( | ||
name=cc_pair_name, | ||
source=DocumentSource.MOCK_CONNECTOR, | ||
input_type=InputType.POLL, | ||
connector_specific_config={ | ||
"mock_server_host": MOCK_CONNECTOR_SERVER_HOST, | ||
"mock_server_port": MOCK_CONNECTOR_SERVER_PORT, | ||
}, | ||
user_performing_action=admin_user, | ||
refresh_freq=3, # refresh often to ensure the second attempt actually runs | ||
) | ||
Weves marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# --- First Indexing Attempt --- | ||
time_before_first_attempt = datetime.now(timezone.utc) | ||
first_index_attempt = IndexAttemptManager.wait_for_index_attempt_start( | ||
cc_pair_id=cc_pair.id, | ||
user_performing_action=admin_user, | ||
) | ||
IndexAttemptManager.wait_for_index_attempt_completion( | ||
index_attempt_id=first_index_attempt.id, | ||
cc_pair_id=cc_pair.id, | ||
user_performing_action=admin_user, | ||
) | ||
time_after_first_attempt = datetime.now(timezone.utc) | ||
|
||
# Fetch and validate the first attempt | ||
completed_first_attempt = IndexAttemptManager.get_index_attempt_by_id( | ||
index_attempt_id=first_index_attempt.id, | ||
cc_pair_id=cc_pair.id, | ||
user_performing_action=admin_user, | ||
) | ||
assert completed_first_attempt.status == IndexingStatus.SUCCESS | ||
assert completed_first_attempt.poll_range_start is not None | ||
assert completed_first_attempt.poll_range_end is not None | ||
|
||
# For the first run (no prior successful attempts), poll_range_start should be epoch (0) | ||
expected_first_start = datetime.fromtimestamp(0, tz=timezone.utc) | ||
assert completed_first_attempt.poll_range_start == expected_first_start | ||
|
||
# `poll_range_end` should be sometime in between the time the attempt | ||
# started and the time it finished. | ||
# no way to have a more precise assertion here since the `poll_range_end` | ||
# can really be set anytime in that range and be "correct" | ||
assert ( | ||
time_before_first_attempt | ||
<= completed_first_attempt.poll_range_end | ||
<= time_after_first_attempt | ||
) | ||
|
||
first_attempt_poll_end = completed_first_attempt.poll_range_end | ||
|
||
# --- Second Indexing Attempt --- | ||
# Trigger another run manually (since automatic refresh might be too slow for test) | ||
# Ensure there's a slight delay so the poll window moves | ||
# In a real scenario, the scheduler would wait for the refresh frequency. | ||
# Here we manually trigger a new run. | ||
_setup_mock_connector(mock_server_client, admin_user) | ||
CCPairManager.run_once( | ||
cc_pair, from_beginning=False, user_performing_action=admin_user | ||
) | ||
Weves marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
time_before_second_attempt = datetime.now(timezone.utc) | ||
second_index_attempt = IndexAttemptManager.wait_for_index_attempt_start( | ||
cc_pair_id=cc_pair.id, | ||
index_attempts_to_ignore=[first_index_attempt.id], | ||
user_performing_action=admin_user, | ||
) | ||
IndexAttemptManager.wait_for_index_attempt_completion( | ||
index_attempt_id=second_index_attempt.id, | ||
cc_pair_id=cc_pair.id, | ||
user_performing_action=admin_user, | ||
) | ||
time_after_second_attempt = datetime.now(timezone.utc) | ||
|
||
# Fetch and validate the second attempt | ||
completed_second_attempt = IndexAttemptManager.get_index_attempt_by_id( | ||
index_attempt_id=second_index_attempt.id, | ||
cc_pair_id=cc_pair.id, | ||
user_performing_action=admin_user, | ||
) | ||
assert completed_second_attempt.status == IndexingStatus.SUCCESS | ||
assert completed_second_attempt.poll_range_start is not None | ||
assert completed_second_attempt.poll_range_end is not None | ||
|
||
# For the second run, poll_range_start should be the previous successful attempt's | ||
# poll_range_end minus the POLL_CONNECTOR_OFFSET | ||
expected_second_start = first_attempt_poll_end - timedelta( | ||
minutes=POLL_CONNECTOR_OFFSET | ||
) | ||
assert completed_second_attempt.poll_range_start == expected_second_start | ||
|
||
# `poll_range_end` should be sometime in between the time the attempt | ||
# started and the time it finished. | ||
# again, no way to have a more precise assertion here since the `poll_range_end` | ||
# can really be set anytime in that range and be "correct" | ||
assert ( | ||
time_before_second_attempt | ||
<= completed_second_attempt.poll_range_end | ||
<= time_after_second_attempt | ||
) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this previously raised an exception for exception types that needed more args