Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion backend/onyx/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,21 @@ def on_setup_logging(
task_logger.setLevel(loglevel)
task_logger.propagate = False

# Hide celery task received and succeeded/failed messages
# hide celery task received spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received"
strategy.logger.setLevel(logging.WARNING)

# uncomment this to hide celery task succeeded/failed spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None"
trace.logger.setLevel(logging.WARNING)


def set_task_finished_log_level(logLevel: int) -> None:
"""call this to override the setLevel in on_setup_logging. We are interested
in the task timings in the cloud but it can be spammy for self hosted."""
trace.logger.setLevel(logLevel)


class TenantContextFilter(logging.Filter):

"""Logging filter to inject tenant ID into the logger's name."""
Expand Down
5 changes: 5 additions & 0 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import multiprocessing
from typing import Any
from typing import cast
Expand Down Expand Up @@ -194,6 +195,10 @@ def on_setup_logging(
) -> None:
app_base.on_setup_logging(loglevel, logfile, format, colorize, **kwargs)

# this can be spammy, so just enable it in the cloud for now
if MULTI_TENANT:
app_base.set_task_finished_log_level(logging.INFO)


class HubPeriodicTask(bootsteps.StartStopStep):
"""Regularly reacquires the primary worker lock outside of the task queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class TaskDependencyError(RuntimeError):
trail=False,
bind=True,
)
def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> None:
def check_for_connector_deletion_task(
self: Task, *, tenant_id: str | None
) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -45,7 +47,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

# collect cc_pair_ids
cc_pair_ids: list[int] = []
Expand Down Expand Up @@ -81,6 +83,8 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
if lock_beat.owned():
lock_beat.release()

return True


def try_generate_document_cc_pair_cleanup_tasks(
app: Celery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None:
def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -102,7 +102,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

# get all cc pairs that need to be synced
cc_pair_ids_to_sync: list[int] = []
Expand Down Expand Up @@ -131,6 +131,8 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None
if lock_beat.owned():
lock_beat.release()

return True


def try_creating_permissions_sync_task(
app: Celery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat: RedisLock = r.lock(
Expand All @@ -105,7 +105,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down Expand Up @@ -149,6 +149,8 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned():
lock_beat.release()

return True


def try_creating_external_group_sync_task(
app: Celery,
Expand Down
8 changes: 5 additions & 3 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
soft_time_limit=JOB_TIMEOUT,
bind=True,
)
def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)

lock_beat = r.lock(
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down Expand Up @@ -127,6 +127,8 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
if lock_beat.owned():
lock_beat.release()

return True


def try_creating_prune_generator_task(
celery_app: Celery,
Expand Down
6 changes: 3 additions & 3 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
trail=False,
bind=True,
)
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | None:
"""Runs periodically to check if any document needs syncing.
Generates sets of tasks for Celery if syncing is needed."""
time_start = time.monotonic()
Expand All @@ -103,7 +103,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return
return None

with get_session_with_tenant(tenant_id) as db_session:
try_generate_stale_document_sync_tasks(
Expand Down Expand Up @@ -166,7 +166,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None:

time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
return
return True


def try_generate_stale_document_sync_tasks(
Expand Down
Loading