Skip to content

Conversation

justin-tahara
Copy link
Contributor

@justin-tahara justin-tahara commented Aug 20, 2025

Description

[Provide a brief description of the changes in this PR]

How Has This Been Tested?

[Describe the tests you ran to verify your changes]

Backporting (check the box to trigger backport action)

Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.

  • This PR should be backported (make sure to check that the backport attempt succeeds)
  • [Optional] Override Linear Check

Summary by cubic

Added backpressure to docfetching to prevent runaway queue growth and overload. Enqueueing is now capped by broker queue length and active indexing attempts.

  • Bug Fixes
    • Skip enqueue when docfetching queue length >= MAX_DOCFETCHING_QUEUE_LENGTH (default 200).
    • Skip enqueue when active indexing attempts (NOT_STARTED/IN_PROGRESS) >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS (default 64).
    • Best-effort checks: if metrics lookup fails, continue without blocking and log a warning.

* feat(infra): Add WAF implementation

* Addressing greptile comments

* Additional removal of unnecessary code
@justin-tahara justin-tahara requested a review from a team as a code owner August 20, 2025 01:55
Copy link

vercel bot commented Aug 20, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
internal-search Ready Ready Preview Comment Aug 20, 2025 2:01am

@justin-tahara justin-tahara marked this pull request as draft August 20, 2025 01:55
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements backpressure controls to prevent unbounded growth of document indexing queues in the Onyx system. The changes add two new configuration constants: MAX_ACTIVE_DOCFETCHING_ATTEMPTS (default 64) and MAX_DOCFETCHING_QUEUE_LENGTH (default 200), both configurable via environment variables following established patterns.

The implementation adds queue management logic in the check_for_indexing function within the docprocessing tasks. When either limit is exceeded - too many tasks queued in the Celery broker or too many active indexing attempts in the database - the system skips creating new indexing tasks and logs a warning. This prevents resource exhaustion scenarios where the document indexing pipeline could overwhelm Redis queues or database connections under heavy load.

The backpressure check is wrapped in exception handling to ensure it operates as a best-effort defensive measure without becoming a single point of failure. This fits into the broader Celery-based background task architecture that handles document processing across the Onyx platform, providing a critical safety valve for one of the most resource-intensive operations in the system.

PR Description Notes:

  • The PR description is a template and lacks actual details about the changes
  • No testing information is provided
  • The description should explain the backpressure implementation and its purpose

Confidence score: 4/5

  • This PR implements essential infrastructure safeguards with minimal risk of breaking existing functionality
  • Score reflects solid defensive programming practices and proper integration with existing systems
  • Pay close attention to the exception handling in the backpressure logic to ensure it doesn't mask real errors

2 files reviewed, 1 comment

Edit Code Review Bot Settings | Greptile

Comment on lines +694 to +735
# Backpressure: prevent unbounded growth of docfetching tasks
try:
# 1) Broker queue length cap
r_celery = self.app.broker_connection().channel().client # type: ignore
current_df_queue_len = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
)
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to docfetching queue cap: "
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}"
)
return tasks_created

# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS)
active_attempts_count = (
db_session.execute(
select(func.count(IndexAttempt.id)).where(
IndexAttempt.status.in_(
[
IndexingStatus.NOT_STARTED,
IndexingStatus.IN_PROGRESS,
]
)
)
)
.scalars()
.first()
or 0
)
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to active attempts cap: "
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}"
)
return tasks_created
except Exception:
# Best-effort backpressure; do not fail task if metrics fetch fails
task_logger.exception(
"check_for_indexing - backpressure check failed"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The backpressure implementation should be moved outside the per-cc_pair loop. Currently it checks limits for every connector pair, which could cause inconsistent behavior where some cc_pairs are processed but others are not within the same run.

Suggested change
# Backpressure: prevent unbounded growth of docfetching tasks
try:
# 1) Broker queue length cap
r_celery = self.app.broker_connection().channel().client # type: ignore
current_df_queue_len = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
)
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to docfetching queue cap: "
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}"
)
return tasks_created
# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS)
active_attempts_count = (
db_session.execute(
select(func.count(IndexAttempt.id)).where(
IndexAttempt.status.in_(
[
IndexingStatus.NOT_STARTED,
IndexingStatus.IN_PROGRESS,
]
)
)
)
.scalars()
.first()
or 0
)
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to active attempts cap: "
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}"
)
return tasks_created
except Exception:
# Best-effort backpressure; do not fail task if metrics fetch fails
task_logger.exception(
"check_for_indexing - backpressure check failed"
)
# Backpressure: prevent unbounded growth of docfetching tasks
# Check limits once before processing any cc_pairs to ensure consistent behavior
try:
# 1) Broker queue length cap
r_celery = self.app.broker_connection().channel().client # type: ignore
current_df_queue_len = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
)
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to docfetching queue cap: "
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}"
)
return tasks_created
# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS)
with get_session_with_current_tenant() as db_session:
active_attempts_count = (
db_session.execute(
select(func.count(IndexAttempt.id)).where(
IndexAttempt.status.in_(
[
IndexingStatus.NOT_STARTED,
IndexingStatus.IN_PROGRESS,
]
)
)
)
.scalars()
.first()
or 0
)
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to active attempts cap: "
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}"
)
return tasks_created
except Exception:
# Best-effort backpressure; do not fail task if metrics fetch fails
task_logger.exception(
"check_for_indexing - backpressure check failed"
)

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 2 files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant