-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix(infra): Fix Docfetching #5219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements backpressure controls to prevent unbounded growth of document indexing queues in the Onyx system. The changes add two new configuration constants: MAX_ACTIVE_DOCFETCHING_ATTEMPTS
(default 64) and MAX_DOCFETCHING_QUEUE_LENGTH
(default 200), both configurable via environment variables following established patterns.
The implementation adds queue management logic in the check_for_indexing
function within the docprocessing tasks. When either limit is exceeded - too many tasks queued in the Celery broker or too many active indexing attempts in the database - the system skips creating new indexing tasks and logs a warning. This prevents resource exhaustion scenarios where the document indexing pipeline could overwhelm Redis queues or database connections under heavy load.
The backpressure check is wrapped in exception handling to ensure it operates as a best-effort defensive measure without becoming a single point of failure. This fits into the broader Celery-based background task architecture that handles document processing across the Onyx platform, providing a critical safety valve for one of the most resource-intensive operations in the system.
PR Description Notes:
- The PR description is a template and lacks actual details about the changes
- No testing information is provided
- The description should explain the backpressure implementation and its purpose
Confidence score: 4/5
- This PR implements essential infrastructure safeguards with minimal risk of breaking existing functionality
- Score reflects solid defensive programming practices and proper integration with existing systems
- Pay close attention to the exception handling in the backpressure logic to ensure it doesn't mask real errors
2 files reviewed, 1 comment
# Backpressure: prevent unbounded growth of docfetching tasks | ||
try: | ||
# 1) Broker queue length cap | ||
r_celery = self.app.broker_connection().channel().client # type: ignore | ||
current_df_queue_len = celery_get_queue_length( | ||
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery | ||
) | ||
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH: | ||
task_logger.warning( | ||
"check_for_indexing - Skipping enqueue due to docfetching queue cap: " | ||
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}" | ||
) | ||
return tasks_created | ||
|
||
# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS) | ||
active_attempts_count = ( | ||
db_session.execute( | ||
select(func.count(IndexAttempt.id)).where( | ||
IndexAttempt.status.in_( | ||
[ | ||
IndexingStatus.NOT_STARTED, | ||
IndexingStatus.IN_PROGRESS, | ||
] | ||
) | ||
) | ||
) | ||
.scalars() | ||
.first() | ||
or 0 | ||
) | ||
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS: | ||
task_logger.warning( | ||
"check_for_indexing - Skipping enqueue due to active attempts cap: " | ||
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}" | ||
) | ||
return tasks_created | ||
except Exception: | ||
# Best-effort backpressure; do not fail task if metrics fetch fails | ||
task_logger.exception( | ||
"check_for_indexing - backpressure check failed" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The backpressure implementation should be moved outside the per-cc_pair loop. Currently it checks limits for every connector pair, which could cause inconsistent behavior where some cc_pairs are processed but others are not within the same run.
# Backpressure: prevent unbounded growth of docfetching tasks | |
try: | |
# 1) Broker queue length cap | |
r_celery = self.app.broker_connection().channel().client # type: ignore | |
current_df_queue_len = celery_get_queue_length( | |
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery | |
) | |
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH: | |
task_logger.warning( | |
"check_for_indexing - Skipping enqueue due to docfetching queue cap: " | |
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}" | |
) | |
return tasks_created | |
# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS) | |
active_attempts_count = ( | |
db_session.execute( | |
select(func.count(IndexAttempt.id)).where( | |
IndexAttempt.status.in_( | |
[ | |
IndexingStatus.NOT_STARTED, | |
IndexingStatus.IN_PROGRESS, | |
] | |
) | |
) | |
) | |
.scalars() | |
.first() | |
or 0 | |
) | |
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS: | |
task_logger.warning( | |
"check_for_indexing - Skipping enqueue due to active attempts cap: " | |
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}" | |
) | |
return tasks_created | |
except Exception: | |
# Best-effort backpressure; do not fail task if metrics fetch fails | |
task_logger.exception( | |
"check_for_indexing - backpressure check failed" | |
) | |
# Backpressure: prevent unbounded growth of docfetching tasks | |
# Check limits once before processing any cc_pairs to ensure consistent behavior | |
try: | |
# 1) Broker queue length cap | |
r_celery = self.app.broker_connection().channel().client # type: ignore | |
current_df_queue_len = celery_get_queue_length( | |
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery | |
) | |
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH: | |
task_logger.warning( | |
"check_for_indexing - Skipping enqueue due to docfetching queue cap: " | |
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}" | |
) | |
return tasks_created | |
# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS) | |
with get_session_with_current_tenant() as db_session: | |
active_attempts_count = ( | |
db_session.execute( | |
select(func.count(IndexAttempt.id)).where( | |
IndexAttempt.status.in_( | |
[ | |
IndexingStatus.NOT_STARTED, | |
IndexingStatus.IN_PROGRESS, | |
] | |
) | |
) | |
) | |
.scalars() | |
.first() | |
or 0 | |
) | |
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS: | |
task_logger.warning( | |
"check_for_indexing - Skipping enqueue due to active attempts cap: " | |
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}" | |
) | |
return tasks_created | |
except Exception: | |
# Best-effort backpressure; do not fail task if metrics fetch fails | |
task_logger.exception( | |
"check_for_indexing - backpressure check failed" | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 2 files
Description
[Provide a brief description of the changes in this PR]
How Has This Been Tested?
[Describe the tests you ran to verify your changes]
Backporting (check the box to trigger backport action)
Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.
Summary by cubic
Added backpressure to docfetching to prevent runaway queue growth and overload. Enqueueing is now capped by broker queue length and active indexing attempts.