diff --git a/backend/ee/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/ee/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index df012f7fba0..72a940436db 100644 --- a/backend/ee/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/ee/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -30,6 +30,7 @@ from onyx.background.celery.celery_redis import celery_get_queue_length from onyx.background.celery.celery_redis import celery_get_queued_task_ids from onyx.background.celery.celery_redis import celery_get_unacked_task_ids +from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT @@ -73,6 +74,7 @@ from onyx.utils.logger import setup_logger from onyx.utils.telemetry import optional_telemetry from onyx.utils.telemetry import RecordType +from shared_configs.configs import MULTI_TENANT logger = setup_logger() @@ -87,6 +89,24 @@ LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15 +def _get_fence_validation_block_expiration() -> int: + """ + Compute the expiration time for the fence validation block signal. + Base expiration is 300 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode. + """ + base_expiration = 300 # seconds + + if not MULTI_TENANT: + return base_expiration + + try: + beat_multiplier = OnyxRuntime.get_beat_multiplier() + except Exception: + beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + + return int(base_expiration * beat_multiplier) + + """Jobs / utils for kicking off doc permissions sync tasks.""" @@ -194,7 +214,11 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None "Exception while validating permission sync fences" ) - r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=300) + r.set( + OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, + 1, + ex=_get_fence_validation_block_expiration(), + ) # use a lookup table to find active fences. We still have to verify the fence # exists since it is an optimization and not the source of truth. diff --git a/backend/ee/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/ee/onyx/background/celery/tasks/external_group_syncing/tasks.py index b7b7da05445..fd71165f75c 100644 --- a/backend/ee/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/ee/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -30,6 +30,7 @@ from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.celery_redis import celery_find_task from onyx.background.celery.celery_redis import celery_get_unacked_task_ids +from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT from onyx.background.error_logging import emit_background_error from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT @@ -57,9 +58,11 @@ ) from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import get_redis_replica_client +from onyx.server.runtime.onyx_runtime import OnyxRuntime from onyx.server.utils import make_short_id from onyx.utils.logger import format_error_for_logging from onyx.utils.logger import setup_logger +from shared_configs.configs import MULTI_TENANT logger = setup_logger() @@ -67,6 +70,24 @@ _EXTERNAL_GROUP_BATCH_SIZE = 100 +def _get_fence_validation_block_expiration() -> int: + """ + Compute the expiration time for the fence validation block signal. + Base expiration is 300 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode. + """ + base_expiration = 300 # seconds + + if not MULTI_TENANT: + return base_expiration + + try: + beat_multiplier = OnyxRuntime.get_beat_multiplier() + except Exception: + beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + + return int(base_expiration * beat_multiplier) + + def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: """Returns boolean indicating if external group sync is due.""" @@ -194,7 +215,11 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None: "Exception while validating external group sync fences" ) - r.set(OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, 1, ex=300) + r.set( + OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, + 1, + ex=_get_fence_validation_block_expiration(), + ) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 99d8a35eb9f..b07a0a2133e 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -24,6 +24,7 @@ from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.celery_utils import httpx_init_vespa_pool from onyx.background.celery.memory_monitoring import emit_process_memory +from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids from onyx.background.celery.tasks.indexing.utils import IndexingCallback from onyx.background.celery.tasks.indexing.utils import is_in_repeated_error_state @@ -86,6 +87,24 @@ logger = setup_logger() +def _get_fence_validation_block_expiration() -> int: + """ + Compute the expiration time for the fence validation block signal. + Base expiration is 60 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode. + """ + base_expiration = 60 # seconds + + if not MULTI_TENANT: + return base_expiration + + try: + beat_multiplier = OnyxRuntime.get_beat_multiplier() + except Exception: + beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + + return int(base_expiration * beat_multiplier) + + class IndexingWatchdogTerminalStatus(str, Enum): """The different statuses the watchdog can finish with. @@ -630,7 +649,11 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: except Exception: task_logger.exception("Exception while validating indexing fences") - redis_client.set(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, 1, ex=60) + redis_client.set( + OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, + 1, + ex=_get_fence_validation_block_expiration(), + ) # 3/3: FINALIZE lock_beat.reacquire() diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index 42915f3ac00..855a390832b 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -21,6 +21,7 @@ from onyx.background.celery.celery_redis import celery_get_queued_task_ids from onyx.background.celery.celery_redis import celery_get_unacked_task_ids from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector +from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT from onyx.background.celery.tasks.indexing.utils import IndexingCallbackBase from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING from onyx.configs.app_configs import JOB_TIMEOUT @@ -55,15 +56,53 @@ from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import get_redis_replica_client +from onyx.server.runtime.onyx_runtime import OnyxRuntime from onyx.server.utils import make_short_id from onyx.utils.logger import format_error_for_logging from onyx.utils.logger import LoggerContextVars from onyx.utils.logger import pruning_ctx from onyx.utils.logger import setup_logger +from shared_configs.configs import MULTI_TENANT logger = setup_logger() +def _get_pruning_block_expiration() -> int: + """ + Compute the expiration time for the pruning block signal. + Base expiration is 3600 seconds (1 hour), multiplied by the beat multiplier only in MULTI_TENANT mode. + """ + base_expiration = 3600 # seconds + + if not MULTI_TENANT: + return base_expiration + + try: + beat_multiplier = OnyxRuntime.get_beat_multiplier() + except Exception: + beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + + return int(base_expiration * beat_multiplier) + + +def _get_fence_validation_block_expiration() -> int: + """ + Compute the expiration time for the fence validation block signal. + Base expiration is 300 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode. + """ + base_expiration = 300 # seconds + + if not MULTI_TENANT: + return base_expiration + + try: + beat_multiplier = OnyxRuntime.get_beat_multiplier() + except Exception: + beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT + + return int(base_expiration * beat_multiplier) + + class PruneCallback(IndexingCallbackBase): def progress(self, tag: str, amount: int) -> None: self.redis_connector.prune.set_active() @@ -162,7 +201,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None: task_logger.info( f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}" ) - r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=3600) + r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=_get_pruning_block_expiration()) # we want to run this less frequently than the overall task lock_beat.reacquire() @@ -175,7 +214,11 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None: except Exception: task_logger.exception("Exception while validating pruning fences") - r.set(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES, 1, ex=300) + r.set( + OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES, + 1, + ex=_get_fence_validation_block_expiration(), + ) # use a lookup table to find active fences. We still have to verify the fence # exists since it is an optimization and not the source of truth.