diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index d2cc42e18e2..9262b632dc2 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -60,7 +60,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) - SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=sender.concurrency) + + # rkuo: been seeing transient connection exceptions here, so upping the connection count + # from just concurrency/concurrency to concurrency/concurrency*2 + SqlEngine.init_engine( + pool_size=sender.concurrency, max_overflow=sender.concurrency * 2 + ) app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/configs/indexing.py b/backend/onyx/background/celery/configs/indexing.py index dcfb2c17037..244a07f7a25 100644 --- a/backend/onyx/background/celery/configs/indexing.py +++ b/backend/onyx/background/celery/configs/indexing.py @@ -19,7 +19,10 @@ # Indexing worker specific ... this lets us track the transition to STARTED in redis # We don't currently rely on this but it has the potential to be useful and # indexing tasks are not high volume -task_track_started = True + +# we don't turn this on yet because celery occasionally runs tasks more than once +# which means a duplicate run might change the task state unexpectedly +# task_track_started = True worker_concurrency = CELERY_WORKER_INDEXING_CONCURRENCY worker_pool = "threads" diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index d00ceefcf7d..a1bb14ca787 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -4,6 +4,12 @@ from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryTask +# choosing 15 minutes because it roughly gives us enough time to process many tasks +# we might be able to reduce this greatly if we can run a unified +# loop across all tenants rather than tasks per tenant + +BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) + # we set expires because it isn't necessary to queue up these tasks # it's only important that they run relatively regularly tasks_to_schedule = [ @@ -13,7 +19,7 @@ "schedule": timedelta(seconds=20), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -22,7 +28,7 @@ "schedule": timedelta(seconds=20), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -31,7 +37,7 @@ "schedule": timedelta(seconds=15), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -40,7 +46,7 @@ "schedule": timedelta(seconds=15), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -49,7 +55,7 @@ "schedule": timedelta(seconds=3600), "options": { "priority": OnyxCeleryPriority.LOWEST, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -58,7 +64,7 @@ "schedule": timedelta(seconds=5), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -67,7 +73,7 @@ "schedule": timedelta(seconds=30), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, { @@ -76,7 +82,7 @@ "schedule": timedelta(seconds=20), "options": { "priority": OnyxCeleryPriority.HIGH, - "expires": 60, + "expires": BEAT_EXPIRES_DEFAULT, }, }, ] diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 2c65995ef9d..ddf1f958079 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -1,6 +1,8 @@ +import time from datetime import datetime from datetime import timedelta from datetime import timezone +from time import sleep from uuid import uuid4 from celery import Celery @@ -18,6 +20,7 @@ from onyx.background.celery.apps.app_base import task_logger from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import DocumentSource @@ -91,7 +94,7 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> None: r = get_redis_client(tenant_id=tenant_id) - lock_beat = r.lock( + lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_CONNECTOR_DOC_PERMISSIONS_SYNC_BEAT_LOCK, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, ) @@ -219,6 +222,43 @@ def connector_permission_sync_generator_task( r = get_redis_client(tenant_id=tenant_id) + # this wait is needed to avoid a race condition where + # the primary worker sends the task and it is immediately executed + # before the primary worker can finalize the fence + start = time.monotonic() + while True: + if time.monotonic() - start > CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT: + raise ValueError( + f"connector_permission_sync_generator_task - timed out waiting for fence to be ready: " + f"fence={redis_connector.permissions.fence_key}" + ) + + if not redis_connector.permissions.fenced: # The fence must exist + raise ValueError( + f"connector_permission_sync_generator_task - fence not found: " + f"fence={redis_connector.permissions.fence_key}" + ) + + payload = redis_connector.permissions.payload # The payload must exist + if not payload: + raise ValueError( + "connector_permission_sync_generator_task: payload invalid or not found" + ) + + if payload.celery_task_id is None: + logger.info( + f"connector_permission_sync_generator_task - Waiting for fence: " + f"fence={redis_connector.permissions.fence_key}" + ) + sleep(1) + continue + + logger.info( + f"connector_permission_sync_generator_task - Fence found, continuing...: " + f"fence={redis_connector.permissions.fence_key}" + ) + break + lock: RedisLock = r.lock( OnyxRedisLocks.CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX + f"_{redis_connector.id}", @@ -254,8 +294,11 @@ def connector_permission_sync_generator_task( if not payload: raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}") - payload.started = datetime.now(timezone.utc) - redis_connector.permissions.set_fence(payload) + new_payload = RedisConnectorPermissionSyncPayload( + started=datetime.now(timezone.utc), + celery_task_id=payload.celery_task_id, + ) + redis_connector.permissions.set_fence(new_payload) document_external_accesses: list[DocExternalAccess] = doc_sync_func(cc_pair) diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index 204a3a63ce4..ce66d318bf3 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -97,7 +97,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None: r = get_redis_client(tenant_id=tenant_id) - lock_beat = r.lock( + lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, ) @@ -162,7 +162,7 @@ def try_creating_external_group_sync_task( LOCK_TIMEOUT = 30 - lock = r.lock( + lock: RedisLock = r.lock( DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_generate_external_group_sync_tasks", timeout=LOCK_TIMEOUT, ) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index a9ce4274c38..c6c9b05476d 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -1,3 +1,5 @@ +import os +import sys import time from datetime import datetime from datetime import timezone @@ -23,6 +25,7 @@ from onyx.background.indexing.run_indexing import run_indexing_entrypoint from onyx.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import DocumentSource @@ -71,14 +74,18 @@ class IndexingCallback(IndexingHeartbeatInterface): + PARENT_CHECK_INTERVAL = 60 + def __init__( self, + parent_pid: int, stop_key: str, generator_progress_key: str, redis_lock: RedisLock, redis_client: Redis, ): super().__init__() + self.parent_pid = parent_pid self.redis_lock: RedisLock = redis_lock self.stop_key: str = stop_key self.generator_progress_key: str = generator_progress_key @@ -89,12 +96,31 @@ def __init__( self.last_tag: str = "IndexingCallback.__init__" self.last_lock_reacquire: datetime = datetime.now(timezone.utc) + self.last_parent_check = time.monotonic() + def should_stop(self) -> bool: if self.redis_client.exists(self.stop_key): return True + return False def progress(self, tag: str, amount: int) -> None: + # rkuo: this shouldn't be necessary yet because we spawn the process this runs inside + # with daemon = True. It seems likely some indexing tasks will need to spawn other processes eventually + # so leave this code in until we're ready to test it. + + # if self.parent_pid: + # # check if the parent pid is alive so we aren't running as a zombie + # now = time.monotonic() + # if now - self.last_parent_check > IndexingCallback.PARENT_CHECK_INTERVAL: + # try: + # # this is unintuitive, but it checks if the parent pid is still running + # os.kill(self.parent_pid, 0) + # except Exception: + # logger.exception("IndexingCallback - parent pid check exceptioned") + # raise + # self.last_parent_check = now + try: self.redis_lock.reacquire() self.last_tag = tag @@ -772,7 +798,6 @@ def connector_indexing_proxy_task( return task_logger.info( - f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" @@ -789,23 +814,26 @@ def connector_indexing_proxy_task( # if the job is done, clean up and break if job.done(): - if job.status == "error": - ignore_exitcode = False - - exit_code: int | None = None - if job.process: - exit_code = job.process.exitcode - - # seeing odd behavior where spawned tasks usually return exit code 1 in the cloud, - # even though logging clearly indicates that they completed successfully - # to work around this, we ignore the job error state if the completion signal is OK - status_int = redis_connector_index.get_completion() - if status_int: - status_enum = HTTPStatus(status_int) - if status_enum == HTTPStatus.OK: - ignore_exitcode = True - - if ignore_exitcode: + try: + if job.status == "error": + ignore_exitcode = False + + exit_code: int | None = None + if job.process: + exit_code = job.process.exitcode + + # seeing odd behavior where spawned tasks usually return exit code 1 in the cloud, + # even though logging clearly indicates successful completion + # to work around this, we ignore the job error state if the completion signal is OK + status_int = redis_connector_index.get_completion() + if status_int: + status_enum = HTTPStatus(status_int) + if status_enum == HTTPStatus.OK: + ignore_exitcode = True + + if not ignore_exitcode: + raise RuntimeError("Spawned task exceptioned.") + task_logger.warning( "Indexing watchdog - spawned task has non-zero exit code " "but completion signal is OK. Continuing...: " @@ -815,18 +843,21 @@ def connector_indexing_proxy_task( f"search_settings={search_settings_id} " f"exit_code={exit_code}" ) - else: - task_logger.error( - "Indexing watchdog - spawned task exceptioned: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"exit_code={exit_code} " - f"error={job.exception()}" - ) + except Exception: + task_logger.error( + "Indexing watchdog - spawned task exceptioned: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"exit_code={exit_code} " + f"error={job.exception()}" + ) + + raise + finally: + job.release() - job.release() break # if a termination signal is detected, clean up and break @@ -911,7 +942,7 @@ def connector_indexing_task_wrapper( tenant_id, is_ee, ) - except: + except Exception: logger.exception( f"connector_indexing_task exceptioned: " f"tenant={tenant_id} " @@ -919,7 +950,14 @@ def connector_indexing_task_wrapper( f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) - raise + + # There is a cloud related bug outside of our code + # where spawned tasks return with an exit code of 1. + # Unfortunately, exceptions also return with an exit code of 1, + # so just raising an exception isn't informative + # Exiting with 255 makes it possible to distinguish between normal exits + # and exceptions. + sys.exit(255) return result @@ -991,7 +1029,17 @@ def connector_indexing_task( f"fence={redis_connector.stop.fence_key}" ) + # this wait is needed to avoid a race condition where + # the primary worker sends the task and it is immediately executed + # before the primary worker can finalize the fence + start = time.monotonic() while True: + if time.monotonic() - start > CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT: + raise ValueError( + f"connector_indexing_task - timed out waiting for fence to be ready: " + f"fence={redis_connector.permissions.fence_key}" + ) + if not redis_connector_index.fenced: # The fence must exist raise ValueError( f"connector_indexing_task - fence not found: fence={redis_connector_index.fence_key}" @@ -1032,7 +1080,9 @@ def connector_indexing_task( if not acquired: logger.warning( f"Indexing task already running, exiting...: " - f"index_attempt={index_attempt_id} cc_pair={cc_pair_id} search_settings={search_settings_id}" + f"index_attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" ) return None @@ -1068,6 +1118,7 @@ def connector_indexing_task( # define a callback class callback = IndexingCallback( + os.getppid(), redis_connector.stop.fence_key, redis_connector_index.generator_progress_key, lock, @@ -1101,8 +1152,19 @@ def connector_indexing_task( f"search_settings={search_settings_id}" ) if attempt_found: - with get_session_with_tenant(tenant_id) as db_session: - mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e)) + try: + with get_session_with_tenant(tenant_id) as db_session: + mark_attempt_failed( + index_attempt_id, db_session, failure_reason=str(e) + ) + except Exception: + logger.exception( + "Indexing watchdog - transient exception looking up index attempt: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) raise e finally: diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index d4cca6e1cdf..46b985abfe0 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -283,6 +283,7 @@ def connector_pruning_generator_task( ) callback = IndexingCallback( + 0, redis_connector.stop.fence_key, redis_connector.prune.generator_progress_key, lock, diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 7892dafafd7..738b088fbcc 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -83,6 +83,9 @@ # if we can get callbacks as object bytes download, we could lower this a lot. CELERY_INDEXING_LOCK_TIMEOUT = 3 * 60 * 60 # 60 min +# how long a task should wait for associated fence to be ready +CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT = 5 * 60 # 5 min + # needs to be long enough to cover the maximum time it takes to download an object # if we can get callbacks as object bytes download, we could lower this a lot. CELERY_PRUNING_LOCK_TIMEOUT = 300 # 5 min