From 1955c1d67bd9d1c54a264e3c8d8b62a291acb663 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 30 Dec 2024 21:53:00 -0800 Subject: [PATCH 1/2] re-enable celery task execution logging in primary worker --- backend/onyx/background/celery/apps/app_base.py | 12 +++++++++++- backend/onyx/background/celery/apps/primary.py | 5 +++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 7827fd054b3..07286745337 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -414,11 +414,21 @@ def on_setup_logging( task_logger.setLevel(loglevel) task_logger.propagate = False - # Hide celery task received and succeeded/failed messages + # hide celery task received spam + # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received" strategy.logger.setLevel(logging.WARNING) + + # uncomment this to hide celery task succeeded/failed spam + # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None" trace.logger.setLevel(logging.WARNING) +def set_task_finished_log_level(logLevel: int) -> None: + """call this to override the setLevel in on_setup_logging. We are interested + in the task timings in the cloud but it can be spammy for self hosted.""" + trace.logger.setLevel(logLevel) + + class TenantContextFilter(logging.Filter): """Logging filter to inject tenant ID into the logger's name.""" diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 5a3b61552f9..267d51ade7c 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -1,3 +1,4 @@ +import logging import multiprocessing from typing import Any from typing import cast @@ -194,6 +195,10 @@ def on_setup_logging( ) -> None: app_base.on_setup_logging(loglevel, logfile, format, colorize, **kwargs) + # this can be spammy, so just enable it in the cloud for now + if MULTI_TENANT: + app_base.set_task_finished_log_level(logging.INFO) + class HubPeriodicTask(bootsteps.StartStopStep): """Regularly reacquires the primary worker lock outside of the task queue. From b43a8e48c63387460387f318aa7195cc018d499d Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Tue, 31 Dec 2024 00:10:33 -0800 Subject: [PATCH 2/2] add some return types to distinguish when the task is actually performing work --- .../background/celery/tasks/connector_deletion/tasks.py | 8 ++++++-- .../celery/tasks/doc_permission_syncing/tasks.py | 6 ++++-- .../celery/tasks/external_group_syncing/tasks.py | 6 ++++-- backend/onyx/background/celery/tasks/pruning/tasks.py | 8 +++++--- backend/onyx/background/celery/tasks/vespa/tasks.py | 6 +++--- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 39ed7d25d05..799644adb17 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -34,7 +34,9 @@ class TaskDependencyError(RuntimeError): trail=False, bind=True, ) -def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> None: +def check_for_connector_deletion_task( + self: Task, *, tenant_id: str | None +) -> bool | None: r = get_redis_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( @@ -45,7 +47,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N try: # these tasks should never overlap if not lock_beat.acquire(blocking=False): - return + return None # collect cc_pair_ids cc_pair_ids: list[int] = [] @@ -81,6 +83,8 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N if lock_beat.owned(): lock_beat.release() + return True + def try_generate_document_cc_pair_cleanup_tasks( app: Celery, diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index ddf1f958079..faa9240f508 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -91,7 +91,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b soft_time_limit=JOB_TIMEOUT, bind=True, ) -def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None: +def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool | None: r = get_redis_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( @@ -102,7 +102,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None try: # these tasks should never overlap if not lock_beat.acquire(blocking=False): - return + return None # get all cc pairs that need to be synced cc_pair_ids_to_sync: list[int] = [] @@ -131,6 +131,8 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None if lock_beat.owned(): lock_beat.release() + return True + def try_creating_permissions_sync_task( app: Celery, diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index ce66d318bf3..bad23c120ff 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -94,7 +94,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: soft_time_limit=JOB_TIMEOUT, bind=True, ) -def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None: +def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None: r = get_redis_client(tenant_id=tenant_id) lock_beat: RedisLock = r.lock( @@ -105,7 +105,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None: try: # these tasks should never overlap if not lock_beat.acquire(blocking=False): - return + return None cc_pair_ids_to_sync: list[int] = [] with get_session_with_tenant(tenant_id) as db_session: @@ -149,6 +149,8 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None: if lock_beat.owned(): lock_beat.release() + return True + def try_creating_external_group_sync_task( app: Celery, diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index 46b985abfe0..920bc44cdf6 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -81,10 +81,10 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool: soft_time_limit=JOB_TIMEOUT, bind=True, ) -def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: +def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None: r = get_redis_client(tenant_id=tenant_id) - lock_beat = r.lock( + lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, ) @@ -92,7 +92,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: try: # these tasks should never overlap if not lock_beat.acquire(blocking=False): - return + return None cc_pair_ids: list[int] = [] with get_session_with_tenant(tenant_id) as db_session: @@ -127,6 +127,8 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: if lock_beat.owned(): lock_beat.release() + return True + def try_creating_prune_generator_task( celery_app: Celery, diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index bf943b8fbd1..508f90bef41 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -88,7 +88,7 @@ trail=False, bind=True, ) -def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: +def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | None: """Runs periodically to check if any document needs syncing. Generates sets of tasks for Celery if syncing is needed.""" time_start = time.monotonic() @@ -103,7 +103,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: try: # these tasks should never overlap if not lock_beat.acquire(blocking=False): - return + return None with get_session_with_tenant(tenant_id) as db_session: try_generate_stale_document_sync_tasks( @@ -166,7 +166,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: time_elapsed = time.monotonic() - time_start task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}") - return + return True def try_generate_stale_document_sync_tasks(