Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT
Expand Down Expand Up @@ -73,6 +74,7 @@
from onyx.utils.logger import setup_logger
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
from shared_configs.configs import MULTI_TENANT

logger = setup_logger()

Expand All @@ -87,6 +89,24 @@
LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15


def _get_fence_validation_block_expiration() -> int:
"""
Compute the expiration time for the fence validation block signal.
Base expiration is 300 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode.
"""
base_expiration = 300 # seconds

if not MULTI_TENANT:
return base_expiration

try:
beat_multiplier = OnyxRuntime.get_beat_multiplier()
except Exception:
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

return int(base_expiration * beat_multiplier)


"""Jobs / utils for kicking off doc permissions sync tasks."""


Expand Down Expand Up @@ -194,7 +214,11 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
"Exception while validating permission sync fences"
)

r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=300)
r.set(
OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES,
1,
ex=_get_fence_validation_block_expiration(),
)

# use a lookup table to find active fences. We still have to verify the fence
# exists since it is an optimization and not the source of truth.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.error_logging import emit_background_error
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT
Expand Down Expand Up @@ -57,16 +58,36 @@
)
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import format_error_for_logging
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT

logger = setup_logger()


_EXTERNAL_GROUP_BATCH_SIZE = 100


def _get_fence_validation_block_expiration() -> int:
"""
Compute the expiration time for the fence validation block signal.
Base expiration is 300 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode.
"""
base_expiration = 300 # seconds

if not MULTI_TENANT:
return base_expiration

try:
beat_multiplier = OnyxRuntime.get_beat_multiplier()
except Exception:
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

return int(base_expiration * beat_multiplier)


def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
"""Returns boolean indicating if external group sync is due."""

Expand Down Expand Up @@ -194,7 +215,11 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None:
"Exception while validating external group sync fences"
)

r.set(OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, 1, ex=300)
r.set(
OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES,
1,
ex=_get_fence_validation_block_expiration(),
)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
Expand Down
25 changes: 24 additions & 1 deletion backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.memory_monitoring import emit_process_memory
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids
from onyx.background.celery.tasks.indexing.utils import IndexingCallback
from onyx.background.celery.tasks.indexing.utils import is_in_repeated_error_state
Expand Down Expand Up @@ -86,6 +87,24 @@
logger = setup_logger()


def _get_fence_validation_block_expiration() -> int:
"""
Compute the expiration time for the fence validation block signal.
Base expiration is 60 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode.
"""
base_expiration = 60 # seconds

if not MULTI_TENANT:
return base_expiration

try:
beat_multiplier = OnyxRuntime.get_beat_multiplier()
except Exception:
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

return int(base_expiration * beat_multiplier)


class IndexingWatchdogTerminalStatus(str, Enum):
"""The different statuses the watchdog can finish with.

Expand Down Expand Up @@ -630,7 +649,11 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
except Exception:
task_logger.exception("Exception while validating indexing fences")

redis_client.set(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, 1, ex=60)
redis_client.set(
OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES,
1,
ex=_get_fence_validation_block_expiration(),
)

# 3/3: FINALIZE
lock_beat.reacquire()
Expand Down
47 changes: 45 additions & 2 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.celery.tasks.indexing.utils import IndexingCallbackBase
from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING
from onyx.configs.app_configs import JOB_TIMEOUT
Expand Down Expand Up @@ -55,15 +56,53 @@
from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import format_error_for_logging
from onyx.utils.logger import LoggerContextVars
from onyx.utils.logger import pruning_ctx
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT

logger = setup_logger()


def _get_pruning_block_expiration() -> int:
"""
Compute the expiration time for the pruning block signal.
Base expiration is 3600 seconds (1 hour), multiplied by the beat multiplier only in MULTI_TENANT mode.
"""
base_expiration = 3600 # seconds

if not MULTI_TENANT:
return base_expiration

try:
beat_multiplier = OnyxRuntime.get_beat_multiplier()
except Exception:
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

return int(base_expiration * beat_multiplier)


def _get_fence_validation_block_expiration() -> int:
"""
Compute the expiration time for the fence validation block signal.
Base expiration is 300 seconds, multiplied by the beat multiplier only in MULTI_TENANT mode.
"""
base_expiration = 300 # seconds

if not MULTI_TENANT:
return base_expiration

try:
beat_multiplier = OnyxRuntime.get_beat_multiplier()
except Exception:
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

return int(base_expiration * beat_multiplier)


class PruneCallback(IndexingCallbackBase):
def progress(self, tag: str, amount: int) -> None:
self.redis_connector.prune.set_active()
Expand Down Expand Up @@ -162,7 +201,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
task_logger.info(
f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}"
)
r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=3600)
r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=_get_pruning_block_expiration())

# we want to run this less frequently than the overall task
lock_beat.reacquire()
Expand All @@ -175,7 +214,11 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
except Exception:
task_logger.exception("Exception while validating pruning fences")

r.set(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES, 1, ex=300)
r.set(
OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES,
1,
ex=_get_fence_validation_block_expiration(),
)

# use a lookup table to find active fences. We still have to verify the fence
# exists since it is an optimization and not the source of truth.
Expand Down
Loading