|
32 | 32 | from onyx.background.indexing.checkpointing_utils import (
|
33 | 33 | get_index_attempts_with_old_checkpoints,
|
34 | 34 | )
|
| 35 | +from onyx.background.indexing.index_attempt_utils import cleanup_index_attempts |
| 36 | +from onyx.background.indexing.index_attempt_utils import get_old_index_attempts |
35 | 37 | from onyx.configs.app_configs import MANAGED_VESPA
|
36 | 38 | from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH
|
37 | 39 | from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
|
|
109 | 111 | # Heartbeat timeout: if no heartbeat received for 30 minutes, consider it dead
|
110 | 112 | # This should be much longer than INDEXING_WORKER_HEARTBEAT_INTERVAL (30s)
|
111 | 113 | HEARTBEAT_TIMEOUT_SECONDS = 30 * 60 # 30 minutes
|
| 114 | +INDEX_ATTEMPT_BATCH_SIZE = 500 |
112 | 115 |
|
113 | 116 |
|
114 | 117 | def _get_fence_validation_block_expiration() -> int:
|
@@ -987,6 +990,95 @@ def cleanup_checkpoint_task(
|
987 | 990 | )
|
988 | 991 |
|
989 | 992 |
|
| 993 | +# primary |
| 994 | +@shared_task( |
| 995 | + name=OnyxCeleryTask.CHECK_FOR_INDEX_ATTEMPT_CLEANUP, |
| 996 | + soft_time_limit=300, |
| 997 | + bind=True, |
| 998 | +) |
| 999 | +def check_for_index_attempt_cleanup(self: Task, *, tenant_id: str) -> None: |
| 1000 | + """Clean up old index attempts that are older than 7 days.""" |
| 1001 | + locked = False |
| 1002 | + redis_client = get_redis_client(tenant_id=tenant_id) |
| 1003 | + lock: RedisLock = redis_client.lock( |
| 1004 | + OnyxRedisLocks.CHECK_INDEX_ATTEMPT_CLEANUP_BEAT_LOCK, |
| 1005 | + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, |
| 1006 | + ) |
| 1007 | + |
| 1008 | + # these tasks should never overlap |
| 1009 | + if not lock.acquire(blocking=False): |
| 1010 | + task_logger.info( |
| 1011 | + f"check_for_index_attempt_cleanup - Lock not acquired: tenant={tenant_id}" |
| 1012 | + ) |
| 1013 | + return None |
| 1014 | + |
| 1015 | + try: |
| 1016 | + locked = True |
| 1017 | + batch_size = INDEX_ATTEMPT_BATCH_SIZE |
| 1018 | + with get_session_with_current_tenant() as db_session: |
| 1019 | + old_attempts = get_old_index_attempts(db_session) |
| 1020 | + # We need to batch this because during the initial run, the system might have a large number |
| 1021 | + # of index attempts since they were never deleted. After that, the number will be |
| 1022 | + # significantly lower. |
| 1023 | + if len(old_attempts) == 0: |
| 1024 | + task_logger.info( |
| 1025 | + "check_for_index_attempt_cleanup - No index attempts to cleanup" |
| 1026 | + ) |
| 1027 | + return |
| 1028 | + |
| 1029 | + for i in range(0, len(old_attempts), batch_size): |
| 1030 | + batch = old_attempts[i : i + batch_size] |
| 1031 | + task_logger.info( |
| 1032 | + f"check_for_index_attempt_cleanup - Cleaning up index attempts {len(batch)}" |
| 1033 | + ) |
| 1034 | + self.app.send_task( |
| 1035 | + OnyxCeleryTask.CLEANUP_INDEX_ATTEMPT, |
| 1036 | + kwargs={ |
| 1037 | + "index_attempt_ids": [attempt.id for attempt in batch], |
| 1038 | + "tenant_id": tenant_id, |
| 1039 | + }, |
| 1040 | + queue=OnyxCeleryQueues.INDEX_ATTEMPT_CLEANUP, |
| 1041 | + priority=OnyxCeleryPriority.MEDIUM, |
| 1042 | + ) |
| 1043 | + except Exception: |
| 1044 | + task_logger.exception("Unexpected exception during index attempt cleanup check") |
| 1045 | + return None |
| 1046 | + finally: |
| 1047 | + if locked: |
| 1048 | + if lock.owned(): |
| 1049 | + lock.release() |
| 1050 | + else: |
| 1051 | + task_logger.error( |
| 1052 | + "check_for_index_attempt_cleanup - Lock not owned on completion: " |
| 1053 | + f"tenant={tenant_id}" |
| 1054 | + ) |
| 1055 | + |
| 1056 | + |
| 1057 | +# light worker |
| 1058 | +@shared_task( |
| 1059 | + name=OnyxCeleryTask.CLEANUP_INDEX_ATTEMPT, |
| 1060 | + bind=True, |
| 1061 | +) |
| 1062 | +def cleanup_index_attempt_task( |
| 1063 | + self: Task, *, index_attempt_ids: list[int], tenant_id: str |
| 1064 | +) -> None: |
| 1065 | + """Clean up an index attempt""" |
| 1066 | + start = time.monotonic() |
| 1067 | + |
| 1068 | + try: |
| 1069 | + with get_session_with_current_tenant() as db_session: |
| 1070 | + cleanup_index_attempts(db_session, index_attempt_ids) |
| 1071 | + |
| 1072 | + finally: |
| 1073 | + elapsed = time.monotonic() - start |
| 1074 | + |
| 1075 | + task_logger.info( |
| 1076 | + f"cleanup_index_attempt_task completed: tenant_id={tenant_id} " |
| 1077 | + f"index_attempt_ids={index_attempt_ids} " |
| 1078 | + f"elapsed={elapsed:.2f}" |
| 1079 | + ) |
| 1080 | + |
| 1081 | + |
990 | 1082 | class DocumentProcessingBatch(BaseModel):
|
991 | 1083 | """Data structure for a document processing batch."""
|
992 | 1084 |
|
|
0 commit comments