From bf8100c535816d06dac032186184c5418b75859a Mon Sep 17 00:00:00 2001 From: Weves Date: Tue, 7 Jan 2025 13:33:56 -0800 Subject: [PATCH 1/7] Initial scaffolding for metrics --- .../onyx/background/celery/apps/monitoring.py | 95 +++++++++++++++++++ .../background/celery/configs/monitoring.py | 21 ++++ .../background/celery/tasks/beat_schedule.py | 11 +++ .../celery/tasks/monitoring/tasks.py | 82 ++++++++++++++++ .../celery/versioned_apps/monitoring.py | 15 +++ backend/onyx/configs/constants.py | 10 ++ backend/supervisord.conf | 12 +++ 7 files changed, 246 insertions(+) create mode 100644 backend/onyx/background/celery/apps/monitoring.py create mode 100644 backend/onyx/background/celery/configs/monitoring.py create mode 100644 backend/onyx/background/celery/tasks/monitoring/tasks.py create mode 100644 backend/onyx/background/celery/versioned_apps/monitoring.py diff --git a/backend/onyx/background/celery/apps/monitoring.py b/backend/onyx/background/celery/apps/monitoring.py new file mode 100644 index 00000000000..b0113b457f9 --- /dev/null +++ b/backend/onyx/background/celery/apps/monitoring.py @@ -0,0 +1,95 @@ +import multiprocessing +from typing import Any + +from celery import Celery +from celery import signals +from celery import Task +from celery.signals import celeryd_init +from celery.signals import worker_init +from celery.signals import worker_ready +from celery.signals import worker_shutdown + +import onyx.background.celery.apps.app_base as app_base +from onyx.configs.constants import POSTGRES_CELERY_WORKER_MONITORING_APP_NAME +from onyx.db.engine import SqlEngine +from onyx.utils.logger import setup_logger +from shared_configs.configs import MULTI_TENANT + + +logger = setup_logger() + +celery_app = Celery(__name__) +celery_app.config_from_object("onyx.background.celery.configs.monitoring") + + +@signals.task_prerun.connect +def on_task_prerun( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple | None = None, + kwargs: dict | None = None, + **kwds: Any, +) -> None: + app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds) + + +@signals.task_postrun.connect +def on_task_postrun( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple | None = None, + kwargs: dict | None = None, + retval: Any | None = None, + state: str | None = None, + **kwds: Any, +) -> None: + app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds) + + +@celeryd_init.connect +def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: + app_base.on_celeryd_init(sender, conf, **kwargs) + + +@worker_init.connect +def on_worker_init(sender: Any, **kwargs: Any) -> None: + logger.info("worker_init signal received.") + logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + + SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_MONITORING_APP_NAME) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) + + app_base.wait_for_redis(sender, **kwargs) + app_base.wait_for_db(sender, **kwargs) + + # Less startup checks in multi-tenant case + if MULTI_TENANT: + return + + app_base.on_secondary_worker_init(sender, **kwargs) + + +@worker_ready.connect +def on_worker_ready(sender: Any, **kwargs: Any) -> None: + app_base.on_worker_ready(sender, **kwargs) + + +@worker_shutdown.connect +def on_worker_shutdown(sender: Any, **kwargs: Any) -> None: + app_base.on_worker_shutdown(sender, **kwargs) + + +@signals.setup_logging.connect +def on_setup_logging( + loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any +) -> None: + app_base.on_setup_logging(loglevel, logfile, format, colorize, **kwargs) + + +celery_app.autodiscover_tasks( + [ + "onyx.background.celery.tasks.monitoring", + ] +) diff --git a/backend/onyx/background/celery/configs/monitoring.py b/backend/onyx/background/celery/configs/monitoring.py new file mode 100644 index 00000000000..5116fbd8c0a --- /dev/null +++ b/backend/onyx/background/celery/configs/monitoring.py @@ -0,0 +1,21 @@ +import onyx.background.celery.configs.base as shared_config + +broker_url = shared_config.broker_url +broker_connection_retry_on_startup = shared_config.broker_connection_retry_on_startup +broker_pool_limit = shared_config.broker_pool_limit +broker_transport_options = shared_config.broker_transport_options + +redis_socket_keepalive = shared_config.redis_socket_keepalive +redis_retry_on_timeout = shared_config.redis_retry_on_timeout +redis_backend_health_check_interval = shared_config.redis_backend_health_check_interval + +result_backend = shared_config.result_backend +result_expires = shared_config.result_expires # 86400 seconds is the default + +task_default_priority = shared_config.task_default_priority +task_acks_late = shared_config.task_acks_late + +# Monitoring worker specific settings +worker_concurrency = 1 # Single worker is sufficient for monitoring +worker_pool = "threads" +worker_prefetch_multiplier = 1 diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 8b08e77877c..f6026b5fd6d 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -3,6 +3,7 @@ from onyx.configs.app_configs import LLM_MODEL_UPDATE_API_URL from onyx.configs.constants import OnyxCeleryPriority +from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask # choosing 15 minutes because it roughly gives us enough time to process many tasks @@ -68,6 +69,16 @@ "expires": BEAT_EXPIRES_DEFAULT, }, }, + { + "name": "monitor-background-processes", + "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, + "schedule": timedelta(seconds=60), + "options": { + "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, + "queue": OnyxCeleryQueues.MONITORING, + }, + }, { "name": "check-for-doc-permissions-sync", "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py new file mode 100644 index 00000000000..f3066403c4f --- /dev/null +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -0,0 +1,82 @@ +from datetime import datetime +from datetime import timezone + +from celery import shared_task +from celery import Task +from redis import Redis + +from onyx.background.celery.apps.app_base import task_logger +from onyx.background.celery.tasks.vespa.tasks import celery_get_queue_length +from onyx.configs.app_configs import JOB_TIMEOUT +from onyx.configs.constants import OnyxCeleryQueues +from onyx.configs.constants import OnyxCeleryTask +from onyx.utils.telemetry import optional_telemetry +from onyx.utils.telemetry import RecordType + + +def _collect_queue_metrics(r_celery: Redis) -> dict[str, int]: + """Collect metrics about queue lengths for different Celery queues""" + return { + "celery": celery_get_queue_length("celery", r_celery), + "indexing": celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery + ), + "sync": celery_get_queue_length(OnyxCeleryQueues.VESPA_METADATA_SYNC, r_celery), + "deletion": celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_DELETION, r_celery + ), + "pruning": celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery + ), + "permissions_sync": celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery + ), + "external_group_sync": celery_get_queue_length( + OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery + ), + "permissions_upsert": celery_get_queue_length( + OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery + ), + } + + +@shared_task( + name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, + soft_time_limit=JOB_TIMEOUT, + queue="monitoring", + bind=True, +) +def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: + """Collect and emit metrics about background processes. + This task runs periodically to gather metrics about: + - Queue lengths for different Celery queues + - Worker status and task counts + - Memory usage + - Task latencies + """ + task_logger.info("Starting background process monitoring") + + try: + # Get Redis client for Celery broker + r_celery = self.app.broker_connection().channel().client # type: ignore + + # Collect queue metrics + queue_metrics = _collect_queue_metrics(r_celery) + task_logger.info(f"Queue metrics: {queue_metrics}") + + # Emit metrics via telemetry + metrics = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "queues": queue_metrics, + } + + optional_telemetry( + record_type=RecordType.USAGE, + data={"background_metrics": metrics}, + ) + + task_logger.info("Successfully emitted background process metrics") + + except Exception as e: + task_logger.exception("Error collecting background process metrics") + raise e diff --git a/backend/onyx/background/celery/versioned_apps/monitoring.py b/backend/onyx/background/celery/versioned_apps/monitoring.py new file mode 100644 index 00000000000..29093d1ba3a --- /dev/null +++ b/backend/onyx/background/celery/versioned_apps/monitoring.py @@ -0,0 +1,15 @@ +"""Factory stub for running celery worker / celery beat.""" +from celery import Celery + +from onyx.utils.variable_functionality import set_is_ee_based_on_env_variable + +set_is_ee_based_on_env_variable() + + +def get_app() -> Celery: + from onyx.background.celery.apps.monitoring import celery_app + + return celery_app + + +app = get_app() diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 5a6ba4c6eda..3fee9ae3a8c 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -47,6 +47,12 @@ POSTGRES_CELERY_WORKER_LIGHT_APP_NAME = "celery_worker_light" POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy" POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing" +POSTGRES_CELERY_WORKER_MONITORING_APP_NAME = "celery_worker_monitoring" +POSTGRES_CELERY_BEAT_APP_NAME = "celery_beat" +POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME = "celery_worker_primary" +POSTGRES_CELERY_WORKER_LIGHT_APP_NAME = "celery_worker_light" +POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy" +POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing" POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME = "celery_worker_indexing_child" POSTGRES_PERMISSIONS_APP_NAME = "permissions" POSTGRES_UNKNOWN_APP_NAME = "unknown" @@ -260,6 +266,9 @@ class OnyxCeleryQueues: # Indexing queue CONNECTOR_INDEXING = "connector_indexing" + # Monitoring queue + MONITORING = "monitoring" + class OnyxRedisLocks: PRIMARY_WORKER = "da_lock:primary_worker" @@ -308,6 +317,7 @@ class OnyxCeleryTask: CHECK_FOR_EXTERNAL_GROUP_SYNC = "check_for_external_group_sync" CHECK_FOR_LLM_MODEL_UPDATE = "check_for_llm_model_update" MONITOR_VESPA_SYNC = "monitor_vespa_sync" + MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes" KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task" CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = ( "connector_permission_sync_generator_task" diff --git a/backend/supervisord.conf b/backend/supervisord.conf index 1a17f5d17d0..78d5679bae7 100644 --- a/backend/supervisord.conf +++ b/backend/supervisord.conf @@ -65,6 +65,18 @@ autorestart=true startsecs=10 stopasgroup=true +[program:celery_worker_monitoring] +command=celery -A onyx.background.celery.versioned_apps.monitoring worker + --loglevel=INFO + --hostname=monitoring@%%n + -Q monitoring +stdout_logfile=/var/log/celery_worker_monitoring.log +stdout_logfile_maxbytes=16MB +redirect_stderr=true +autorestart=true +startsecs=10 +stopasgroup=true + # Job scheduler for periodic tasks [program:celery_beat] command=celery -A onyx.background.celery.versioned_apps.beat beat From 77088745b35693402755c100b8298ce7222e12e4 Mon Sep 17 00:00:00 2001 From: Weves Date: Tue, 7 Jan 2025 18:06:46 -0800 Subject: [PATCH 2/7] iterate --- .../celery/tasks/monitoring/tasks.py | 295 +++++++++++++++--- 1 file changed, 256 insertions(+), 39 deletions(-) diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index f3066403c4f..80b86b833a4 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -1,44 +1,261 @@ +import json +from collections.abc import Callable from datetime import datetime +from datetime import timedelta from datetime import timezone +from typing import Any from celery import shared_task from celery import Task +from pydantic import BaseModel from redis import Redis +from sqlalchemy import select +from sqlalchemy import text +from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.tasks.vespa.tasks import celery_get_queue_length from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.db.engine import get_session_with_tenant +from onyx.db.enums import IndexingStatus +from onyx.db.models import ConnectorCredentialPair +from onyx.db.models import IndexAttempt +from onyx.redis.redis_pool import get_redis_client from onyx.utils.telemetry import optional_telemetry from onyx.utils.telemetry import RecordType -def _collect_queue_metrics(r_celery: Redis) -> dict[str, int]: +_CONNECTOR_INDEX_ATTEMPT_KEY_FMT = ( + "monitoring_connector_index_attempt:{cc_pair_id}:{index_attempt_id}" +) + + +def _mark_metric_as_emitted(redis_client: Redis, key: str) -> None: + """Mark a metric as having been emitted by setting a Redis key with expiration""" + redis_client.set(key, "1", ex=3600) # Expire after 1 hour + + +def _has_metric_been_emitted(redis_client: Redis, key: str) -> bool: + """Check if a metric has been emitted by checking for existence of Redis key""" + return bool(redis_client.exists(key)) + + +class Metric(BaseModel): + key: str + name: str + value: Any + tags: dict[str, str] + + def log(self) -> None: + """Log the metric in a standardized format""" + task_logger.info( + json.dumps( + { + "metric": self.name, + "key": self.key, + "value": self.value, + "tags": self.tags, + } + ) + ) + + def emit(self) -> None: + optional_telemetry( + record_type=RecordType.USAGE, + data={ + "metric": self.name, + "key": self.key, + "value": self.value, + "tags": self.tags, + }, + ) + + +def _collect_queue_metrics(r_celery: Redis) -> list[Metric]: """Collect metrics about queue lengths for different Celery queues""" - return { - "celery": celery_get_queue_length("celery", r_celery), - "indexing": celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery - ), - "sync": celery_get_queue_length(OnyxCeleryQueues.VESPA_METADATA_SYNC, r_celery), - "deletion": celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_DELETION, r_celery - ), - "pruning": celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery - ), - "permissions_sync": celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, r_celery - ), - "external_group_sync": celery_get_queue_length( - OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery - ), - "permissions_upsert": celery_get_queue_length( - OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery - ), + metrics = [] + queue_mappings = { + "celery": "celery", + "indexing": OnyxCeleryQueues.CONNECTOR_INDEXING, + "sync": OnyxCeleryQueues.VESPA_METADATA_SYNC, + "deletion": OnyxCeleryQueues.CONNECTOR_DELETION, + "pruning": OnyxCeleryQueues.CONNECTOR_PRUNING, + "permissions_sync": OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, + "external_group_sync": OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, + "permissions_upsert": OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, } + for name, queue in queue_mappings.items(): + metrics.append( + Metric( + key="celery_queue_length", + name=name, + value=celery_get_queue_length(queue, r_celery), + tags={"queue": name}, + ) + ) + + return metrics + + +def _collect_connector_metrics( + db_session: Session, redis_client: Redis +) -> list[dict[str, Any]]: + """Collect metrics about connector runs from the past hour""" + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + + # Get all connector credential pairs + cc_pairs = db_session.scalars(select(ConnectorCredentialPair)).all() + + metrics = [] + for cc_pair in cc_pairs: + base_tags = { + "source": cc_pair.connector.source, + "connector_id": cc_pair.connector.id, + } + + # Get most recent attempt in the last hour + recent_attempts = ( + db_session.query(IndexAttempt) + .filter( + IndexAttempt.connector_credential_pair_id == cc_pair.id, + IndexAttempt.time_created >= one_hour_ago, + ) + .order_by(IndexAttempt.time_created.desc()) + .limit(2) + .all() + ) + recent_attempt = recent_attempts[0] if recent_attempts else None + second_most_recent_attempt = ( + recent_attempts[1] if len(recent_attempts) > 1 else None + ) + + # if no metric to emit, skip + if not recent_attempt or not recent_attempt.time_started: + continue + + # check if we already emitted a metric for this index attempt + if redis_client.exists( + _CONNECTOR_INDEX_ATTEMPT_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=recent_attempt.id, + ) + ): + continue + + # Connector start latency + # first run case - we should start as soon as it's created + if not second_most_recent_attempt: + desired_start_time = cc_pair.connector.time_created + else: + if not cc_pair.connector.refresh_freq: + task_logger.error( + "Found non-initial index attempt for connector " + "without refresh_freq. This should never happen." + ) + continue + + desired_start_time = second_most_recent_attempt.time_updated + timedelta( + seconds=cc_pair.connector.refresh_freq + ) + + start_latency = ( + recent_attempt.time_started - desired_start_time + ).total_seconds() + + metrics.append( + { + "metric": "connector_start_latency", + "value": start_latency, + "tags": { + **base_tags, + "index_attempt_id": recent_attempt.id, + }, + } + ) + + # Connector run success/failure + if recent_attempt.status in [ + IndexingStatus.SUCCESS, + IndexingStatus.FAILED, + IndexingStatus.CANCELED, + ]: + metrics.append( + { + "metric": "connector_run_succeeded", + "value": ( + 1 if recent_attempt.status == IndexingStatus.SUCCESS else 0 + ), + "tags": { + **base_tags, + "index_attempt_id": recent_attempt.id, + }, + } + ) + + return metrics + + +def _collect_sync_metrics(db_session: Session) -> list[dict[str, Any]]: + """Collect metrics about document set and group syncing speed""" + # Get average sync speed for document sets + doc_sync_speed = db_session.execute( + text( + """ + SELECT + AVG(EXTRACT(EPOCH FROM (time_completed - time_started))) as avg_duration, + COUNT(*) as doc_count + FROM document_set_sync_attempt + WHERE time_completed IS NOT NULL + AND time_started >= NOW() - INTERVAL '1 hour' + """ + ) + ).first() + + # Get average sync speed for group syncs + group_sync_speed = db_session.execute( + text( + """ + SELECT + AVG(EXTRACT(EPOCH FROM (time_completed - time_started))) as avg_duration, + COUNT(*) as group_count + FROM external_group_sync_attempt + WHERE time_completed IS NOT NULL + AND time_started >= NOW() - INTERVAL '1 hour' + """ + ) + ).first() + + metrics = [] + + if doc_sync_speed and doc_sync_speed.doc_count > 0: + metrics.append( + { + "metric": "syncing_speed", + "value": doc_sync_speed.avg_duration, + "tags": { + "sync_type": "document_set", + "count": doc_sync_speed.doc_count, + }, + } + ) + + if group_sync_speed and group_sync_speed.group_count > 0: + metrics.append( + { + "metric": "syncing_speed", + "value": group_sync_speed.avg_duration, + "tags": { + "sync_type": "external_group", + "count": group_sync_speed.group_count, + }, + } + ) + + return metrics + @shared_task( name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, @@ -50,32 +267,32 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: """Collect and emit metrics about background processes. This task runs periodically to gather metrics about: - Queue lengths for different Celery queues + - Connector run metrics (start latency, success rate) + - Syncing speed metrics - Worker status and task counts - - Memory usage - - Task latencies """ task_logger.info("Starting background process monitoring") try: # Get Redis client for Celery broker r_celery = self.app.broker_connection().channel().client # type: ignore + r_std = get_redis_client() - # Collect queue metrics - queue_metrics = _collect_queue_metrics(r_celery) - task_logger.info(f"Queue metrics: {queue_metrics}") - - # Emit metrics via telemetry - metrics = { - "timestamp": datetime.now(timezone.utc).isoformat(), - "queues": queue_metrics, - } - - optional_telemetry( - record_type=RecordType.USAGE, - data={"background_metrics": metrics}, - ) + # Define metric collection functions and their dependencies + metric_functions: list[Callable[[], list[Metric]]] = [ + lambda: _collect_queue_metrics(r_celery), + lambda: _collect_connector_metrics(db_session, r_std), + lambda: _collect_sync_metrics(db_session), + ] + # Collect and log each metric + with get_session_with_tenant(tenant_id) as db_session: + for metric_fn in metric_functions: + metrics = metric_fn() + for metric in metrics: + metric.log() + metric.emit() - task_logger.info("Successfully emitted background process metrics") + task_logger.info("Successfully collected background process metrics") except Exception as e: task_logger.exception("Error collecting background process metrics") From 498d353829e1c892670f991fb6d3c2055db9e814 Mon Sep 17 00:00:00 2001 From: Weves Date: Thu, 9 Jan 2025 15:07:26 -0800 Subject: [PATCH 3/7] more --- .../celery/tasks/monitoring/tasks.py | 162 ++++++------------ 1 file changed, 54 insertions(+), 108 deletions(-) diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 80b86b833a4..edaa55220cd 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -10,7 +10,6 @@ from pydantic import BaseModel from redis import Redis from sqlalchemy import select -from sqlalchemy import text from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger @@ -32,48 +31,44 @@ ) -def _mark_metric_as_emitted(redis_client: Redis, key: str) -> None: +def _mark_metric_as_emitted(redis_std: Redis, key: str) -> None: """Mark a metric as having been emitted by setting a Redis key with expiration""" - redis_client.set(key, "1", ex=3600) # Expire after 1 hour + redis_std.set(key, "1", ex=24 * 60 * 60) # Expire after 1 day -def _has_metric_been_emitted(redis_client: Redis, key: str) -> bool: +def _has_metric_been_emitted(redis_std: Redis, key: str) -> bool: """Check if a metric has been emitted by checking for existence of Redis key""" - return bool(redis_client.exists(key)) + return bool(redis_std.exists(key)) class Metric(BaseModel): - key: str + key: str | None # only required if we need to store that we have emitted this metric name: str value: Any tags: dict[str, str] def log(self) -> None: """Log the metric in a standardized format""" - task_logger.info( - json.dumps( - { - "metric": self.name, - "key": self.key, - "value": self.value, - "tags": self.tags, - } - ) - ) + data = { + "metric": self.name, + "value": self.value, + "tags": self.tags, + } + task_logger.info(json.dumps(data)) def emit(self) -> None: + data = { + "metric": self.name, + "value": self.value, + "tags": self.tags, + } optional_telemetry( record_type=RecordType.USAGE, - data={ - "metric": self.name, - "key": self.key, - "value": self.value, - "tags": self.tags, - }, + data=data, ) -def _collect_queue_metrics(r_celery: Redis) -> list[Metric]: +def _collect_queue_metrics(redis_celery: Redis) -> list[Metric]: """Collect metrics about queue lengths for different Celery queues""" metrics = [] queue_mappings = { @@ -90,9 +85,9 @@ def _collect_queue_metrics(r_celery: Redis) -> list[Metric]: for name, queue in queue_mappings.items(): metrics.append( Metric( - key="celery_queue_length", + key=None, name=name, - value=celery_get_queue_length(queue, r_celery), + value=celery_get_queue_length(queue, redis_celery), tags={"queue": name}, ) ) @@ -100,9 +95,7 @@ def _collect_queue_metrics(r_celery: Redis) -> list[Metric]: return metrics -def _collect_connector_metrics( - db_session: Session, redis_client: Redis -) -> list[dict[str, Any]]: +def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: """Collect metrics about connector runs from the past hour""" one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) @@ -113,7 +106,7 @@ def _collect_connector_metrics( for cc_pair in cc_pairs: base_tags = { "source": cc_pair.connector.source, - "connector_id": cc_pair.connector.id, + "connector_id": str(cc_pair.connector.id), } # Get most recent attempt in the last hour @@ -137,12 +130,16 @@ def _collect_connector_metrics( continue # check if we already emitted a metric for this index attempt - if redis_client.exists( - _CONNECTOR_INDEX_ATTEMPT_KEY_FMT.format( - cc_pair_id=cc_pair.id, - index_attempt_id=recent_attempt.id, + metric_key = _CONNECTOR_INDEX_ATTEMPT_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=recent_attempt.id, + ) + if _has_metric_been_emitted(redis_std, metric_key): + task_logger.info( + f"Skipping metric for connector {cc_pair.connector.id} " + f"index attempt {recent_attempt.id} because it has already been " + "emitted" ) - ): continue # Connector start latency @@ -166,14 +163,15 @@ def _collect_connector_metrics( ).total_seconds() metrics.append( - { - "metric": "connector_start_latency", - "value": start_latency, - "tags": { + Metric( + key=metric_key, + name="connector_start_latency", + value=start_latency, + tags={ **base_tags, - "index_attempt_id": recent_attempt.id, + "index_attempt_id": str(recent_attempt.id), }, - } + ) ) # Connector run success/failure @@ -183,78 +181,24 @@ def _collect_connector_metrics( IndexingStatus.CANCELED, ]: metrics.append( - { - "metric": "connector_run_succeeded", - "value": ( - 1 if recent_attempt.status == IndexingStatus.SUCCESS else 0 - ), - "tags": { + Metric( + key=metric_key, + name="connector_run_succeeded", + value=(1 if recent_attempt.status == IndexingStatus.SUCCESS else 0), + tags={ **base_tags, - "index_attempt_id": recent_attempt.id, + "index_attempt_id": str(recent_attempt.id), }, - } + ) ) return metrics -def _collect_sync_metrics(db_session: Session) -> list[dict[str, Any]]: +def _collect_sync_metrics(db_session: Session) -> list[Metric]: """Collect metrics about document set and group syncing speed""" - # Get average sync speed for document sets - doc_sync_speed = db_session.execute( - text( - """ - SELECT - AVG(EXTRACT(EPOCH FROM (time_completed - time_started))) as avg_duration, - COUNT(*) as doc_count - FROM document_set_sync_attempt - WHERE time_completed IS NOT NULL - AND time_started >= NOW() - INTERVAL '1 hour' - """ - ) - ).first() - - # Get average sync speed for group syncs - group_sync_speed = db_session.execute( - text( - """ - SELECT - AVG(EXTRACT(EPOCH FROM (time_completed - time_started))) as avg_duration, - COUNT(*) as group_count - FROM external_group_sync_attempt - WHERE time_completed IS NOT NULL - AND time_started >= NOW() - INTERVAL '1 hour' - """ - ) - ).first() - - metrics = [] - - if doc_sync_speed and doc_sync_speed.doc_count > 0: - metrics.append( - { - "metric": "syncing_speed", - "value": doc_sync_speed.avg_duration, - "tags": { - "sync_type": "document_set", - "count": doc_sync_speed.doc_count, - }, - } - ) - - if group_sync_speed and group_sync_speed.group_count > 0: - metrics.append( - { - "metric": "syncing_speed", - "value": group_sync_speed.avg_duration, - "tags": { - "sync_type": "external_group", - "count": group_sync_speed.group_count, - }, - } - ) - - return metrics + # TODO: Implement this + return [] @shared_task( @@ -275,13 +219,13 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: try: # Get Redis client for Celery broker - r_celery = self.app.broker_connection().channel().client # type: ignore - r_std = get_redis_client() + redis_celery = self.app.broker_connection().channel().client # type: ignore + redis_std = get_redis_client(tenant_id=tenant_id) # Define metric collection functions and their dependencies metric_functions: list[Callable[[], list[Metric]]] = [ - lambda: _collect_queue_metrics(r_celery), - lambda: _collect_connector_metrics(db_session, r_std), + lambda: _collect_queue_metrics(redis_celery), + lambda: _collect_connector_metrics(db_session, redis_std), lambda: _collect_sync_metrics(db_session), ] # Collect and log each metric @@ -291,6 +235,8 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: for metric in metrics: metric.log() metric.emit() + if metric.key: + _mark_metric_as_emitted(redis_std, metric.key) task_logger.info("Successfully collected background process metrics") From 2920ae01f47e01b1a6ced4666d7f746bc90d32b8 Mon Sep 17 00:00:00 2001 From: Weves Date: Sun, 12 Jan 2025 16:02:12 -0800 Subject: [PATCH 4/7] More metrics + SyncRecord concept --- .../versions/97dbb53fa8c8_add_syncrecord.py | 56 ++++ ...7bf7_add_time_updated_to_usergroup_and_.py | 41 +++ .../background/celery/tasks/vespa/tasks.py | 64 +++- backend/ee/onyx/db/user_group.py | 6 +- .../celery/tasks/connector_deletion/tasks.py | 15 + .../celery/tasks/monitoring/tasks.py | 296 +++++++++++++----- .../background/celery/tasks/vespa/tasks.py | 76 ++++- backend/onyx/db/document_set.py | 3 +- backend/onyx/db/enums.py | 21 +- backend/onyx/db/models.py | 37 ++- backend/onyx/db/sync_record.py | 114 +++++++ backend/onyx/utils/telemetry.py | 1 + 12 files changed, 639 insertions(+), 91 deletions(-) create mode 100644 backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py create mode 100644 backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py create mode 100644 backend/onyx/db/sync_record.py diff --git a/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py b/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py new file mode 100644 index 00000000000..871d8032867 --- /dev/null +++ b/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py @@ -0,0 +1,56 @@ +"""Add SyncRecord + +Revision ID: 97dbb53fa8c8 +Revises: 369644546676 +Create Date: 2025-01-11 19:39:50.426302 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "97dbb53fa8c8" +down_revision = "369644546676" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "sync_record", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("entity_id", sa.Integer(), nullable=False), + sa.Column( + "sync_type", + sa.Enum( + "DOCUMENT_SET", + "USER_GROUP", + "CONNECTOR_DELETION", + name="synctype", + native_enum=False, + length=40, + ), + nullable=False, + ), + sa.Column( + "sync_status", + sa.Enum( + "IN_PROGRESS", + "SUCCESS", + "FAILED", + "CANCELED", + name="syncstatus", + native_enum=False, + length=40, + ), + nullable=False, + ), + sa.Column("num_docs_synced", sa.Integer(), nullable=False), + sa.Column("sync_start_time", sa.DateTime(timezone=True), nullable=False), + sa.Column("sync_end_time", sa.DateTime(timezone=True), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + +def downgrade() -> None: + op.drop_table("sync_record") diff --git a/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py b/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py new file mode 100644 index 00000000000..aa3bbf4ecd1 --- /dev/null +++ b/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py @@ -0,0 +1,41 @@ +"""Add time_updated to UserGroup and DocumentSet + +Revision ID: fec3db967bf7 +Revises: 97dbb53fa8c8 +Create Date: 2025-01-12 15:49:02.289100 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "fec3db967bf7" +down_revision = "97dbb53fa8c8" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "document_set", + sa.Column( + "time_updated", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + ) + op.add_column( + "user_group", + sa.Column( + "time_updated", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + ) + + +def downgrade() -> None: + op.drop_column("user_group", "time_updated") + op.drop_column("document_set", "time_updated") diff --git a/backend/ee/onyx/background/celery/tasks/vespa/tasks.py b/backend/ee/onyx/background/celery/tasks/vespa/tasks.py index bd6cd4c9f0b..45c65b73ce9 100644 --- a/backend/ee/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/ee/onyx/background/celery/tasks/vespa/tasks.py @@ -8,6 +8,9 @@ from ee.onyx.db.user_group import mark_user_group_as_synced from ee.onyx.db.user_group import prepare_user_group_for_deletion from onyx.background.celery.apps.app_base import task_logger +from onyx.db.enums import SyncStatus +from onyx.db.enums import SyncType +from onyx.db.sync_record import update_sync_record_status from onyx.redis.redis_usergroup import RedisUserGroup from onyx.utils.logger import setup_logger @@ -43,24 +46,59 @@ def monitor_usergroup_taskset( f"User group sync progress: usergroup_id={usergroup_id} remaining={count} initial={initial_count}" ) if count > 0: + update_sync_record_status( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + sync_status=SyncStatus.IN_PROGRESS, + num_docs_synced=count, + ) return user_group = fetch_user_group(db_session=db_session, user_group_id=usergroup_id) if user_group: usergroup_name = user_group.name - if user_group.is_up_for_deletion: - # this prepare should have been run when the deletion was scheduled, - # but run it again to be sure we're ready to go - mark_user_group_as_synced(db_session, user_group) - prepare_user_group_for_deletion(db_session, usergroup_id) - delete_user_group(db_session=db_session, user_group=user_group) - task_logger.info( - f"Deleted usergroup: name={usergroup_name} id={usergroup_id}" - ) - else: - mark_user_group_as_synced(db_session=db_session, user_group=user_group) - task_logger.info( - f"Synced usergroup. name={usergroup_name} id={usergroup_id}" + try: + if user_group.is_up_for_deletion: + # this prepare should have been run when the deletion was scheduled, + # but run it again to be sure we're ready to go + mark_user_group_as_synced(db_session, user_group) + prepare_user_group_for_deletion(db_session, usergroup_id) + delete_user_group(db_session=db_session, user_group=user_group) + + update_sync_record_status( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + sync_status=SyncStatus.SUCCESS, + num_docs_synced=initial_count, + ) + + task_logger.info( + f"Deleted usergroup: name={usergroup_name} id={usergroup_id}" + ) + else: + mark_user_group_as_synced(db_session=db_session, user_group=user_group) + + update_sync_record_status( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + sync_status=SyncStatus.SUCCESS, + num_docs_synced=initial_count, + ) + + task_logger.info( + f"Synced usergroup. name={usergroup_name} id={usergroup_id}" + ) + except Exception as e: + update_sync_record_status( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + sync_status=SyncStatus.FAILED, + num_docs_synced=initial_count, ) + raise e rug.reset() diff --git a/backend/ee/onyx/db/user_group.py b/backend/ee/onyx/db/user_group.py index 1b1fcca74b4..bae25e368b9 100644 --- a/backend/ee/onyx/db/user_group.py +++ b/backend/ee/onyx/db/user_group.py @@ -374,7 +374,7 @@ def _add_user_group__cc_pair_relationships__no_commit( def insert_user_group(db_session: Session, user_group: UserGroupCreate) -> UserGroup: - db_user_group = UserGroup(name=user_group.name) + db_user_group = UserGroup(name=user_group.name, time_updated=func.now()) db_session.add(db_user_group) db_session.flush() # give the group an ID @@ -630,6 +630,10 @@ def update_user_group( select(User).where(User.id.in_(removed_user_ids)) # type: ignore ).unique() _validate_curator_status__no_commit(db_session, list(removed_users)) + + # update "time_updated" to now + db_user_group.time_updated = func.now() + db_session.commit() return db_user_group diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 8c7647468d0..0b13b541165 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -17,7 +17,10 @@ from onyx.db.connector_credential_pair import get_connector_credential_pairs from onyx.db.engine import get_session_with_tenant from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.enums import SyncType from onyx.db.search_settings import get_all_search_settings +from onyx.db.sync_record import cleanup_sync_records +from onyx.db.sync_record import insert_sync_record from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_delete import RedisConnectorDeletePayload from onyx.redis.redis_pool import get_redis_client @@ -118,6 +121,13 @@ def try_generate_document_cc_pair_cleanup_tasks( return None if cc_pair.status != ConnectorCredentialPairStatus.DELETING: + # there should be no in-progress sync records if this is up to date + # clean it up just in case things got into a bad state + cleanup_sync_records( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.CONNECTOR_DELETION, + ) return None # set a basic fence to start @@ -127,6 +137,11 @@ def try_generate_document_cc_pair_cleanup_tasks( ) redis_connector.delete.set_fence(fence_payload) + insert_sync_record( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.CONNECTOR_DELETION, + ) try: # do not proceed if connector indexing or connector pruning are running diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index edaa55220cd..3cdc0bf29e2 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -19,15 +19,23 @@ from onyx.configs.constants import OnyxCeleryTask from onyx.db.engine import get_session_with_tenant from onyx.db.enums import IndexingStatus +from onyx.db.enums import SyncType from onyx.db.models import ConnectorCredentialPair +from onyx.db.models import DocumentSet from onyx.db.models import IndexAttempt +from onyx.db.models import SyncRecord +from onyx.db.models import UserGroup from onyx.redis.redis_pool import get_redis_client from onyx.utils.telemetry import optional_telemetry from onyx.utils.telemetry import RecordType -_CONNECTOR_INDEX_ATTEMPT_KEY_FMT = ( - "monitoring_connector_index_attempt:{cc_pair_id}:{index_attempt_id}" +_CONNECTOR_INDEX_ATTEMPT_START_LATENCY_KEY_FMT = ( + "monitoring_connector_index_attempt_start_latency:{cc_pair_id}:{index_attempt_id}" +) + +_CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT = ( + "monitoring_connector_index_attempt_run_success:{cc_pair_id}:{index_attempt_id}" ) @@ -57,13 +65,36 @@ def log(self) -> None: task_logger.info(json.dumps(data)) def emit(self) -> None: + # Convert value to appropriate type + float_value = ( + float(self.value) if isinstance(self.value, (int, float)) else None + ) + int_value = int(self.value) if isinstance(self.value, int) else None + string_value = str(self.value) if isinstance(self.value, str) else None + bool_value = bool(self.value) if isinstance(self.value, bool) else None + + if ( + float_value is None + and int_value is None + and string_value is None + and bool_value is None + ): + task_logger.error( + f"Invalid metric value type: {type(self.value)} " + f"({self.value}) for metric {self.name}." + ) + return + data = { - "metric": self.name, - "value": self.value, + "metric_name": self.name, + "float_value": float_value, + "int_value": int_value, + "string_value": string_value, + "bool_value": bool_value, "tags": self.tags, } optional_telemetry( - record_type=RecordType.USAGE, + record_type=RecordType.METRIC, data=data, ) @@ -72,14 +103,14 @@ def _collect_queue_metrics(redis_celery: Redis) -> list[Metric]: """Collect metrics about queue lengths for different Celery queues""" metrics = [] queue_mappings = { - "celery": "celery", - "indexing": OnyxCeleryQueues.CONNECTOR_INDEXING, - "sync": OnyxCeleryQueues.VESPA_METADATA_SYNC, - "deletion": OnyxCeleryQueues.CONNECTOR_DELETION, - "pruning": OnyxCeleryQueues.CONNECTOR_PRUNING, - "permissions_sync": OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, - "external_group_sync": OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, - "permissions_upsert": OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, + "celery_queue_length": "celery", + "indexing_queue_length": "indexing", + "sync_queue_length": "sync", + "deletion_queue_length": "deletion", + "pruning_queue_length": "pruning", + "permissions_sync_queue_length": OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC, + "external_group_sync_queue_length": OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, + "permissions_upsert_queue_length": OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, } for name, queue in queue_mappings.items(): @@ -95,6 +126,85 @@ def _collect_queue_metrics(redis_celery: Redis) -> list[Metric]: return metrics +def _build_start_latency_metric( + cc_pair: ConnectorCredentialPair, + recent_attempt: IndexAttempt, + second_most_recent_attempt: IndexAttempt | None, + redis_std: Redis, +) -> Metric | None: + if not recent_attempt.time_started: + return None + + # check if we already emitted a metric for this index attempt + metric_key = _CONNECTOR_INDEX_ATTEMPT_START_LATENCY_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=recent_attempt.id, + ) + if _has_metric_been_emitted(redis_std, metric_key): + task_logger.info( + f"Skipping metric for connector {cc_pair.connector.id} " + f"index attempt {recent_attempt.id} because it has already been " + "emitted" + ) + return None + + # Connector start latency + # first run case - we should start as soon as it's created + if not second_most_recent_attempt: + desired_start_time = cc_pair.connector.time_created + else: + if not cc_pair.connector.refresh_freq: + task_logger.error( + "Found non-initial index attempt for connector " + "without refresh_freq. This should never happen." + ) + return None + + desired_start_time = second_most_recent_attempt.time_updated + timedelta( + seconds=cc_pair.connector.refresh_freq + ) + + start_latency = (recent_attempt.time_started - desired_start_time).total_seconds() + + return Metric( + key=metric_key, + name="connector_start_latency", + value=start_latency, + tags={}, + ) + + +def _build_run_success_metric( + cc_pair: ConnectorCredentialPair, recent_attempt: IndexAttempt, redis_std: Redis +) -> Metric | None: + metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=recent_attempt.id, + ) + + if _has_metric_been_emitted(redis_std, metric_key): + task_logger.info( + f"Skipping metric for connector {cc_pair.connector.id} " + f"index attempt {recent_attempt.id} because it has already been " + "emitted" + ) + return None + + if recent_attempt.status in [ + IndexingStatus.SUCCESS, + IndexingStatus.FAILED, + IndexingStatus.CANCELED, + ]: + return Metric( + key=metric_key, + name="connector_run_succeeded", + value=(1 if recent_attempt.status == IndexingStatus.SUCCESS else 0), + tags={"source": str(cc_pair.connector.source)}, + ) + + return None + + def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: """Collect metrics about connector runs from the past hour""" one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) @@ -104,11 +214,6 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me metrics = [] for cc_pair in cc_pairs: - base_tags = { - "source": cc_pair.connector.source, - "connector_id": str(cc_pair.connector.id), - } - # Get most recent attempt in the last hour recent_attempts = ( db_session.query(IndexAttempt) @@ -126,81 +231,132 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me ) # if no metric to emit, skip - if not recent_attempt or not recent_attempt.time_started: + if not recent_attempt: continue - # check if we already emitted a metric for this index attempt - metric_key = _CONNECTOR_INDEX_ATTEMPT_KEY_FMT.format( - cc_pair_id=cc_pair.id, - index_attempt_id=recent_attempt.id, + # Connector start latency + start_latency_metric = _build_start_latency_metric( + cc_pair, recent_attempt, second_most_recent_attempt, redis_std + ) + if start_latency_metric: + metrics.append(start_latency_metric) + + # Connector run success/failure + run_success_metric = _build_run_success_metric( + cc_pair, recent_attempt, redis_std ) + if run_success_metric: + metrics.append(run_success_metric) + + return metrics + + +def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: + """Collect metrics about document set and group syncing speed""" + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + + # Get all sync records from the last hour + recent_sync_records = db_session.scalars( + select(SyncRecord) + .where(SyncRecord.sync_start_time >= one_hour_ago) + .order_by(SyncRecord.sync_start_time.desc()) + ).all() + + metrics = [] + for sync_record in recent_sync_records: + # Skip if no end time (sync still in progress) + if not sync_record.sync_end_time: + continue + + # Check if we already emitted a metric for this sync record + metric_key = f"sync_speed:{sync_record.sync_type}:{sync_record.entity_id}:{sync_record.id}" if _has_metric_been_emitted(redis_std, metric_key): - task_logger.info( - f"Skipping metric for connector {cc_pair.connector.id} " - f"index attempt {recent_attempt.id} because it has already been " - "emitted" + task_logger.debug( + f"Skipping metric for sync record {sync_record.id} " + "because it has already been emitted" ) continue - # Connector start latency - # first run case - we should start as soon as it's created - if not second_most_recent_attempt: - desired_start_time = cc_pair.connector.time_created + # Calculate sync duration in minutes + sync_duration_mins = ( + sync_record.sync_end_time - sync_record.sync_start_time + ).total_seconds() / 60.0 + + # Calculate sync speed (docs/min) - avoid division by zero + sync_speed = ( + sync_record.num_docs_synced / sync_duration_mins + if sync_duration_mins > 0 + else None + ) + + if sync_speed is None: + task_logger.error( + "Something went wrong with sync speed calculation. " + f"Sync record: {sync_record.id}" + ) + continue + + metrics.append( + Metric( + key=metric_key, + name="sync_speed_docs_per_min", + value=sync_speed, + tags={ + "sync_type": str(sync_record.sync_type), + "status": str(sync_record.sync_status), + }, + ) + ) + + # Add sync start latency metric + start_latency_key = f"sync_start_latency:{sync_record.sync_type}:{sync_record.entity_id}:{sync_record.id}" + if _has_metric_been_emitted(redis_std, start_latency_key): + task_logger.debug( + f"Skipping start latency metric for sync record {sync_record.id} " + "because it has already been emitted" + ) + continue + + # Get the entity's last update time based on sync type + entity: DocumentSet | UserGroup | None = None + if sync_record.sync_type == SyncType.DOCUMENT_SET: + entity = db_session.scalar( + select(DocumentSet).where(DocumentSet.id == sync_record.entity_id) + ) + elif sync_record.sync_type == SyncType.USER_GROUP: + entity = db_session.scalar( + select(UserGroup).where(UserGroup.id == sync_record.entity_id) + ) else: - if not cc_pair.connector.refresh_freq: - task_logger.error( - "Found non-initial index attempt for connector " - "without refresh_freq. This should never happen." - ) - continue - - desired_start_time = second_most_recent_attempt.time_updated + timedelta( - seconds=cc_pair.connector.refresh_freq + # Skip other sync types + continue + + if entity is None: + task_logger.error( + f"Could not find entity for sync record {sync_record.id} " + f"with type {sync_record.sync_type} and id {sync_record.entity_id}" ) + continue + # Calculate start latency in seconds start_latency = ( - recent_attempt.time_started - desired_start_time + sync_record.sync_start_time - entity.time_updated ).total_seconds() metrics.append( Metric( - key=metric_key, - name="connector_start_latency", + key=start_latency_key, + name="sync_start_latency_seconds", value=start_latency, tags={ - **base_tags, - "index_attempt_id": str(recent_attempt.id), + "sync_type": str(sync_record.sync_type), }, ) ) - # Connector run success/failure - if recent_attempt.status in [ - IndexingStatus.SUCCESS, - IndexingStatus.FAILED, - IndexingStatus.CANCELED, - ]: - metrics.append( - Metric( - key=metric_key, - name="connector_run_succeeded", - value=(1 if recent_attempt.status == IndexingStatus.SUCCESS else 0), - tags={ - **base_tags, - "index_attempt_id": str(recent_attempt.id), - }, - ) - ) - return metrics -def _collect_sync_metrics(db_session: Session) -> list[Metric]: - """Collect metrics about document set and group syncing speed""" - # TODO: Implement this - return [] - - @shared_task( name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, soft_time_limit=JOB_TIMEOUT, @@ -226,7 +382,7 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: metric_functions: list[Callable[[], list[Metric]]] = [ lambda: _collect_queue_metrics(redis_celery), lambda: _collect_connector_metrics(db_session, redis_std), - lambda: _collect_sync_metrics(db_session), + lambda: _collect_sync_metrics(db_session, redis_std), ] # Collect and log each metric with get_session_with_tenant(tenant_id) as db_session: diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 8eabeb7d8a8..6600c4378d8 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -1,6 +1,7 @@ import random import time import traceback +from collections.abc import Callable from datetime import datetime from datetime import timezone from http import HTTPStatus @@ -53,10 +54,16 @@ from onyx.db.document_set import mark_document_set_as_synced from onyx.db.engine import get_session_with_tenant from onyx.db.enums import IndexingStatus +from onyx.db.enums import SyncStatus +from onyx.db.enums import SyncType from onyx.db.index_attempt import delete_index_attempts from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import mark_attempt_failed from onyx.db.models import DocumentSet +from onyx.db.models import UserGroup +from onyx.db.sync_record import cleanup_sync_records +from onyx.db.sync_record import insert_sync_record +from onyx.db.sync_record import update_sync_record_status from onyx.document_index.document_index_utils import get_both_index_names from onyx.document_index.factory import get_default_document_index from onyx.document_index.interfaces import VespaDocumentFields @@ -283,6 +290,13 @@ def try_generate_document_set_sync_tasks( return None if document_set.is_up_to_date: + # there should be no in-progress sync records if this is up to date + # clean it up just in case things got into a bad state + cleanup_sync_records( + db_session=db_session, + entity_id=document_set_id, + sync_type=SyncType.DOCUMENT_SET, + ) return None # add tasks to celery and build up the task set to monitor in redis @@ -313,6 +327,11 @@ def try_generate_document_set_sync_tasks( # set this only after all tasks have been added rds.set_fence(tasks_generated) + insert_sync_record( + db_session=db_session, + entity_id=document_set_id, + sync_type=SyncType.DOCUMENT_SET, + ) return tasks_generated @@ -332,8 +351,9 @@ def try_generate_user_group_sync_tasks( return None # race condition with the monitor/cleanup function if we use a cached result! - fetch_user_group = fetch_versioned_implementation( - "onyx.db.user_group", "fetch_user_group" + fetch_user_group = cast( + Callable[[Session, int], UserGroup | None], + fetch_versioned_implementation("onyx.db.user_group", "fetch_user_group"), ) usergroup = fetch_user_group(db_session, usergroup_id) @@ -341,6 +361,13 @@ def try_generate_user_group_sync_tasks( return None if usergroup.is_up_to_date: + # there should be no in-progress sync records if this is up to date + # clean it up just in case things got into a bad state + cleanup_sync_records( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + ) return None # add tasks to celery and build up the task set to monitor in redis @@ -370,6 +397,11 @@ def try_generate_user_group_sync_tasks( # set this only after all tasks have been added rug.set_fence(tasks_generated) + insert_sync_record( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + ) return tasks_generated @@ -419,6 +451,13 @@ def monitor_document_set_taskset( f"remaining={count} initial={initial_count}" ) if count > 0: + update_sync_record_status( + db_session=db_session, + entity_id=document_set_id, + sync_type=SyncType.DOCUMENT_SET, + sync_status=SyncStatus.IN_PROGRESS, + num_docs_synced=count, + ) return document_set = cast( @@ -437,6 +476,13 @@ def monitor_document_set_taskset( task_logger.info( f"Successfully synced document set: document_set={document_set_id}" ) + update_sync_record_status( + db_session=db_session, + entity_id=document_set_id, + sync_type=SyncType.DOCUMENT_SET, + sync_status=SyncStatus.SUCCESS, + num_docs_synced=initial_count, + ) rds.reset() @@ -470,6 +516,14 @@ def monitor_connector_deletion_taskset( f"Connector deletion progress: cc_pair={cc_pair_id} remaining={remaining} initial={fence_data.num_tasks}" ) if remaining > 0: + with get_session_with_tenant(tenant_id) as db_session: + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.CONNECTOR_DELETION, + sync_status=SyncStatus.IN_PROGRESS, + num_docs_synced=remaining, + ) return with get_session_with_tenant(tenant_id) as db_session: @@ -545,11 +599,29 @@ def monitor_connector_deletion_taskset( ) db_session.delete(connector) db_session.commit() + + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.CONNECTOR_DELETION, + sync_status=SyncStatus.SUCCESS, + num_docs_synced=fence_data.num_tasks, + ) + except Exception as e: db_session.rollback() stack_trace = traceback.format_exc() error_message = f"Error: {str(e)}\n\nStack Trace:\n{stack_trace}" add_deletion_failure_message(db_session, cc_pair_id, error_message) + + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.CONNECTOR_DELETION, + sync_status=SyncStatus.FAILED, + num_docs_synced=fence_data.num_tasks, + ) + task_logger.exception( f"Connector deletion exceptioned: " f"cc_pair={cc_pair_id} connector={cc_pair.connector_id} credential={cc_pair.credential_id}" diff --git a/backend/onyx/db/document_set.py b/backend/onyx/db/document_set.py index dfc4f53a189..79edbcc919f 100644 --- a/backend/onyx/db/document_set.py +++ b/backend/onyx/db/document_set.py @@ -218,6 +218,7 @@ def insert_document_set( description=document_set_creation_request.description, user_id=user_id, is_public=document_set_creation_request.is_public, + time_updated=func.now(), ) db_session.add(new_document_set_row) db_session.flush() # ensure the new document set gets assigned an ID @@ -293,7 +294,7 @@ def update_document_set( document_set_row.description = document_set_update_request.description document_set_row.is_up_to_date = False document_set_row.is_public = document_set_update_request.is_public - + document_set_row.time_updated = func.now() versioned_private_doc_set_fn = fetch_versioned_implementation( "onyx.db.document_set", "make_doc_set_private" ) diff --git a/backend/onyx/db/enums.py b/backend/onyx/db/enums.py index 0ccb1470ca7..b32825298e3 100644 --- a/backend/onyx/db/enums.py +++ b/backend/onyx/db/enums.py @@ -24,12 +24,27 @@ class IndexingMode(str, PyEnum): REINDEX = "reindex" -# these may differ in the future, which is why we're okay with this duplication -class DeletionStatus(str, PyEnum): - NOT_STARTED = "not_started" +class SyncType(str, PyEnum): + DOCUMENT_SET = "document_set" + USER_GROUP = "user_group" + CONNECTOR_DELETION = "connector_deletion" + + def __str__(self) -> str: + return self.value + + +class SyncStatus(str, PyEnum): IN_PROGRESS = "in_progress" SUCCESS = "success" FAILED = "failed" + CANCELED = "canceled" + + def is_terminal(self) -> bool: + terminal_states = { + SyncStatus.SUCCESS, + SyncStatus.FAILED, + } + return self in terminal_states # Consistent with Celery task statuses diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index ff1c98d13d8..f6793aa9b19 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -44,7 +44,7 @@ from onyx.configs.constants import DocumentSource from onyx.configs.constants import FileOrigin from onyx.configs.constants import MessageType -from onyx.db.enums import AccessType, IndexingMode +from onyx.db.enums import AccessType, IndexingMode, SyncType, SyncStatus from onyx.configs.constants import NotificationType from onyx.configs.constants import SearchFeedbackType from onyx.configs.constants import TokenRateLimitScope @@ -880,6 +880,31 @@ def __repr__(self) -> str: ) +class SyncRecord(Base): + """ + Represents the status of a "sync" operation (e.g. document set, user group, deletion). + + A "sync" operation is an operation which needs to update a set of documents within + Vespa, usually to match the state of Postgres. + """ + + __tablename__ = "sync_record" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + # document set id, user group id, or deletion id + entity_id: Mapped[int] = mapped_column(Integer) + + sync_type: Mapped[SyncType] = mapped_column(Enum(SyncType, native_enum=False)) + sync_status: Mapped[SyncStatus] = mapped_column(Enum(SyncStatus, native_enum=False)) + + num_docs_synced: Mapped[int] = mapped_column(Integer, default=0) + + sync_start_time: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True)) + sync_end_time: Mapped[datetime.datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + + class DocumentByConnectorCredentialPair(Base): """Represents an indexing of a document by a specific connector / credential pair""" @@ -1283,6 +1308,11 @@ class DocumentSet(Base): # given access to it either via the `users` or `groups` relationships is_public: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + # Last time a user updated this document set + time_updated: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + connector_credential_pairs: Mapped[list[ConnectorCredentialPair]] = relationship( "ConnectorCredentialPair", secondary=DocumentSet__ConnectorCredentialPair.__table__, @@ -1762,6 +1792,11 @@ class UserGroup(Base): Boolean, nullable=False, default=False ) + # Last time a user updated this user group + time_updated: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + users: Mapped[list[User]] = relationship( "User", secondary=User__UserGroup.__table__, diff --git a/backend/onyx/db/sync_record.py b/backend/onyx/db/sync_record.py new file mode 100644 index 00000000000..3a7fcc8bd5a --- /dev/null +++ b/backend/onyx/db/sync_record.py @@ -0,0 +1,114 @@ +from datetime import datetime +from datetime import timezone + +from sqlalchemy import and_ +from sqlalchemy import desc +from sqlalchemy import select +from sqlalchemy import update +from sqlalchemy.orm import Session + +from onyx.db.enums import SyncStatus +from onyx.db.enums import SyncType +from onyx.db.models import SyncRecord + + +def insert_sync_record( + db_session: Session, + entity_id: int | None, + sync_type: SyncType, +) -> SyncRecord: + """Insert a new sync record into the database. + + Args: + db_session: The database session to use + entity_id: The ID of the entity being synced (document set ID, user group ID, etc.) + sync_type: The type of sync operation + """ + sync_record = SyncRecord( + entity_id=entity_id, + sync_type=sync_type, + sync_status=SyncStatus.IN_PROGRESS, + num_docs_synced=0, + sync_start_time=datetime.now(timezone.utc), + ) + db_session.add(sync_record) + db_session.commit() + + return sync_record + + +def fetch_latest_sync_record( + db_session: Session, + entity_id: int, + sync_type: SyncType, +) -> SyncRecord | None: + """Fetch the most recent sync record for a given entity ID and status. + + Args: + db_session: The database session to use + entity_id: The ID of the entity to fetch sync record for + sync_type: The type of sync operation + """ + stmt = ( + select(SyncRecord) + .where( + and_( + SyncRecord.entity_id == entity_id, + SyncRecord.sync_type == sync_type, + ) + ) + .order_by(desc(SyncRecord.sync_start_time)) + .limit(1) + ) + + result = db_session.execute(stmt) + return result.scalar_one_or_none() + + +def update_sync_record_status( + db_session: Session, + entity_id: int, + sync_type: SyncType, + sync_status: SyncStatus, + num_docs_synced: int | None = None, +) -> None: + """Update the status of a sync record. + + Args: + db_session: The database session to use + entity_id: The ID of the entity being synced + sync_type: The type of sync operation + sync_status: The new status to set + num_docs_synced: Optional number of documents synced to update + """ + sync_record = fetch_latest_sync_record(db_session, entity_id, sync_type) + if sync_record is None: + raise ValueError( + f"No sync record found for entity_id={entity_id} " f"sync_type={sync_type}" + ) + + sync_record.sync_status = sync_status + if num_docs_synced is not None: + sync_record.num_docs_synced = num_docs_synced + + if sync_status.is_terminal(): + sync_record.sync_end_time = datetime.now(timezone.utc) + + db_session.commit() + + +def cleanup_sync_records( + db_session: Session, entity_id: int, sync_type: SyncType +) -> None: + """Cleanup sync records for a given entity ID and sync type by marking them as failed.""" + stmt = ( + update(SyncRecord) + .where(SyncRecord.entity_id == entity_id) + .where(SyncRecord.sync_type == sync_type) + .where(SyncRecord.sync_status == SyncStatus.IN_PROGRESS) + .values( + sync_status=SyncStatus.CANCELED, sync_end_time=datetime.now(timezone.utc) + ) + ) + db_session.execute(stmt) + db_session.commit() diff --git a/backend/onyx/utils/telemetry.py b/backend/onyx/utils/telemetry.py index 23a239a5564..793bab8ce64 100644 --- a/backend/onyx/utils/telemetry.py +++ b/backend/onyx/utils/telemetry.py @@ -33,6 +33,7 @@ class RecordType(str, Enum): USAGE = "usage" LATENCY = "latency" FAILURE = "failure" + METRIC = "metric" def get_or_generate_uuid() -> str: From 896fd1883e1c5ca0df3f2790568dd1b99461f005 Mon Sep 17 00:00:00 2001 From: Weves Date: Sun, 12 Jan 2025 16:22:07 -0800 Subject: [PATCH 5/7] Add indices, standardize timing --- .../versions/97dbb53fa8c8_add_syncrecord.py | 18 ++++++++++++++++- .../background/celery/tasks/beat_schedule.py | 2 +- .../celery/tasks/monitoring/tasks.py | 20 ++++++++++++++----- backend/onyx/db/sync_record.py | 12 ++++------- 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py b/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py index 871d8032867..1504de39cbf 100644 --- a/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py +++ b/backend/alembic/versions/97dbb53fa8c8_add_syncrecord.py @@ -10,7 +10,7 @@ # revision identifiers, used by Alembic. revision = "97dbb53fa8c8" -down_revision = "369644546676" +down_revision = "be2ab2aa50ee" branch_labels = None depends_on = None @@ -51,6 +51,22 @@ def upgrade() -> None: sa.PrimaryKeyConstraint("id"), ) + # Add index for fetch_latest_sync_record query + op.create_index( + "ix_sync_record_entity_id_sync_type_sync_start_time", + "sync_record", + ["entity_id", "sync_type", "sync_start_time"], + ) + + # Add index for cleanup_sync_records query + op.create_index( + "ix_sync_record_entity_id_sync_type_sync_status", + "sync_record", + ["entity_id", "sync_type", "sync_status"], + ) + def downgrade() -> None: + op.drop_index("ix_sync_record_entity_id_sync_type_sync_status") + op.drop_index("ix_sync_record_entity_id_sync_type_sync_start_time") op.drop_table("sync_record") diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index f6026b5fd6d..58e27b91e5b 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -72,7 +72,7 @@ { "name": "monitor-background-processes", "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - "schedule": timedelta(seconds=60), + "schedule": timedelta(minutes=5), "options": { "priority": OnyxCeleryPriority.LOW, "expires": BEAT_EXPIRES_DEFAULT, diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 3cdc0bf29e2..c4c08bda60e 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -1,8 +1,6 @@ import json from collections.abc import Callable -from datetime import datetime from datetime import timedelta -from datetime import timezone from typing import Any from celery import shared_task @@ -17,6 +15,7 @@ from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.db.engine import get_db_current_time from onyx.db.engine import get_session_with_tenant from onyx.db.enums import IndexingStatus from onyx.db.enums import SyncType @@ -207,7 +206,8 @@ def _build_run_success_metric( def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: """Collect metrics about connector runs from the past hour""" - one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + # NOTE: use get_db_current_time since the IndexAttempt times are set based on DB time + one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1) # Get all connector credential pairs cc_pairs = db_session.scalars(select(ConnectorCredentialPair)).all() @@ -253,7 +253,8 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]: """Collect metrics about document set and group syncing speed""" - one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + # NOTE: use get_db_current_time since the SyncRecord times are set based on DB time + one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1) # Get all sync records from the last hour recent_sync_records = db_session.scalars( @@ -309,7 +310,10 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] ) # Add sync start latency metric - start_latency_key = f"sync_start_latency:{sync_record.sync_type}:{sync_record.entity_id}:{sync_record.id}" + start_latency_key = ( + f"sync_start_latency:{sync_record.sync_type}" + f":{sync_record.entity_id}:{sync_record.id}" + ) if _has_metric_been_emitted(redis_std, start_latency_key): task_logger.debug( f"Skipping start latency metric for sync record {sync_record.id} " @@ -329,6 +333,12 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] ) else: # Skip other sync types + task_logger.debug( + f"Skipping sync record {sync_record.id} " + f"with type {sync_record.sync_type} " + f"and id {sync_record.entity_id} " + "because it is not a document set or user group" + ) continue if entity is None: diff --git a/backend/onyx/db/sync_record.py b/backend/onyx/db/sync_record.py index 3a7fcc8bd5a..e59c0b56ce0 100644 --- a/backend/onyx/db/sync_record.py +++ b/backend/onyx/db/sync_record.py @@ -1,8 +1,6 @@ -from datetime import datetime -from datetime import timezone - from sqlalchemy import and_ from sqlalchemy import desc +from sqlalchemy import func from sqlalchemy import select from sqlalchemy import update from sqlalchemy.orm import Session @@ -29,7 +27,7 @@ def insert_sync_record( sync_type=sync_type, sync_status=SyncStatus.IN_PROGRESS, num_docs_synced=0, - sync_start_time=datetime.now(timezone.utc), + sync_start_time=func.now(), ) db_session.add(sync_record) db_session.commit() @@ -92,7 +90,7 @@ def update_sync_record_status( sync_record.num_docs_synced = num_docs_synced if sync_status.is_terminal(): - sync_record.sync_end_time = datetime.now(timezone.utc) + sync_record.sync_end_time = func.now() # type: ignore db_session.commit() @@ -106,9 +104,7 @@ def cleanup_sync_records( .where(SyncRecord.entity_id == entity_id) .where(SyncRecord.sync_type == sync_type) .where(SyncRecord.sync_status == SyncStatus.IN_PROGRESS) - .values( - sync_status=SyncStatus.CANCELED, sync_end_time=datetime.now(timezone.utc) - ) + .values(sync_status=SyncStatus.CANCELED, sync_end_time=func.now()) ) db_session.execute(stmt) db_session.commit() From 5c0ccab9b3d84223009b937f2cf6a983a399541f Mon Sep 17 00:00:00 2001 From: Weves Date: Sun, 12 Jan 2025 16:26:30 -0800 Subject: [PATCH 6/7] Small cleanup --- backend/onyx/background/celery/tasks/monitoring/tasks.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index c4c08bda60e..541e1b0a289 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -270,7 +270,10 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] continue # Check if we already emitted a metric for this sync record - metric_key = f"sync_speed:{sync_record.sync_type}:{sync_record.entity_id}:{sync_record.id}" + metric_key = ( + f"sync_speed:{sync_record.sync_type}:" + f"{sync_record.entity_id}:{sync_record.id}" + ) if _has_metric_been_emitted(redis_std, metric_key): task_logger.debug( f"Skipping metric for sync record {sync_record.id} " From d95a00d1753fb832cc7a146ac1d9b30831c82b60 Mon Sep 17 00:00:00 2001 From: Weves Date: Mon, 13 Jan 2025 11:54:19 -0800 Subject: [PATCH 7/7] Address comments --- .vscode/launch.template.jsonc | 29 ++++++++++++++- ...7bf7_add_time_updated_to_usergroup_and_.py | 8 ++--- backend/ee/onyx/db/user_group.py | 2 +- .../onyx/background/celery/apps/monitoring.py | 2 +- .../background/celery/configs/monitoring.py | 2 +- .../celery/tasks/connector_deletion/tasks.py | 4 ++- .../celery/tasks/monitoring/tasks.py | 35 +++++++++++++------ .../background/celery/tasks/vespa/tasks.py | 13 ++++--- backend/onyx/configs/constants.py | 5 --- backend/onyx/db/document_set.py | 2 +- backend/onyx/db/models.py | 19 ++++++++-- backend/onyx/db/sync_record.py | 2 +- 12 files changed, 90 insertions(+), 33 deletions(-) diff --git a/.vscode/launch.template.jsonc b/.vscode/launch.template.jsonc index c3dd6d9914d..8c965d36e80 100644 --- a/.vscode/launch.template.jsonc +++ b/.vscode/launch.template.jsonc @@ -28,6 +28,7 @@ "Celery heavy", "Celery indexing", "Celery beat", + "Celery monitoring", ], "presentation": { "group": "1", @@ -51,7 +52,8 @@ "Celery light", "Celery heavy", "Celery indexing", - "Celery beat" + "Celery beat", + "Celery monitoring", ], "presentation": { "group": "1", @@ -269,6 +271,31 @@ }, "consoleTitle": "Celery indexing Console" }, + { + "name": "Celery monitoring", + "type": "debugpy", + "request": "launch", + "module": "celery", + "cwd": "${workspaceFolder}/backend", + "envFile": "${workspaceFolder}/.vscode/.env", + "env": {}, + "args": [ + "-A", + "onyx.background.celery.versioned_apps.monitoring", + "worker", + "--pool=solo", + "--concurrency=1", + "--prefetch-multiplier=1", + "--loglevel=INFO", + "--hostname=monitoring@%n", + "-Q", + "monitoring", + ], + "presentation": { + "group": "2", + }, + "consoleTitle": "Celery monitoring Console" + }, { "name": "Celery beat", "type": "debugpy", diff --git a/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py b/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py index aa3bbf4ecd1..99bc3ffae0c 100644 --- a/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py +++ b/backend/alembic/versions/fec3db967bf7_add_time_updated_to_usergroup_and_.py @@ -19,7 +19,7 @@ def upgrade() -> None: op.add_column( "document_set", sa.Column( - "time_updated", + "time_last_modified_by_user", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now(), @@ -28,7 +28,7 @@ def upgrade() -> None: op.add_column( "user_group", sa.Column( - "time_updated", + "time_last_modified_by_user", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now(), @@ -37,5 +37,5 @@ def upgrade() -> None: def downgrade() -> None: - op.drop_column("user_group", "time_updated") - op.drop_column("document_set", "time_updated") + op.drop_column("user_group", "time_last_modified_by_user") + op.drop_column("document_set", "time_last_modified_by_user") diff --git a/backend/ee/onyx/db/user_group.py b/backend/ee/onyx/db/user_group.py index bae25e368b9..0adff8097a9 100644 --- a/backend/ee/onyx/db/user_group.py +++ b/backend/ee/onyx/db/user_group.py @@ -632,7 +632,7 @@ def update_user_group( _validate_curator_status__no_commit(db_session, list(removed_users)) # update "time_updated" to now - db_user_group.time_updated = func.now() + db_user_group.time_last_modified_by_user = func.now() db_session.commit() return db_user_group diff --git a/backend/onyx/background/celery/apps/monitoring.py b/backend/onyx/background/celery/apps/monitoring.py index b0113b457f9..49c78cafe1d 100644 --- a/backend/onyx/background/celery/apps/monitoring.py +++ b/backend/onyx/background/celery/apps/monitoring.py @@ -59,7 +59,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_MONITORING_APP_NAME) - SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=3) app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/configs/monitoring.py b/backend/onyx/background/celery/configs/monitoring.py index 5116fbd8c0a..90f7b889cce 100644 --- a/backend/onyx/background/celery/configs/monitoring.py +++ b/backend/onyx/background/celery/configs/monitoring.py @@ -17,5 +17,5 @@ # Monitoring worker specific settings worker_concurrency = 1 # Single worker is sufficient for monitoring -worker_pool = "threads" +worker_pool = "solo" worker_prefetch_multiplier = 1 diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 0b13b541165..bf7e949d3f8 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -136,12 +136,14 @@ def try_generate_document_cc_pair_cleanup_tasks( submitted=datetime.now(timezone.utc), ) - redis_connector.delete.set_fence(fence_payload) + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created insert_sync_record( db_session=db_session, entity_id=cc_pair_id, sync_type=SyncType.CONNECTOR_DELETION, ) + redis_connector.delete.set_fence(fence_payload) try: # do not proceed if connector indexing or connector pruning are running diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 541e1b0a289..a78939ef036 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -84,13 +84,18 @@ def emit(self) -> None: ) return + # don't send None values over the wire data = { - "metric_name": self.name, - "float_value": float_value, - "int_value": int_value, - "string_value": string_value, - "bool_value": bool_value, - "tags": self.tags, + k: v + for k, v in { + "metric_name": self.name, + "float_value": float_value, + "int_value": int_value, + "string_value": string_value, + "bool_value": bool_value, + "tags": self.tags, + }.items() + if v is not None } optional_telemetry( record_type=RecordType.METRIC, @@ -125,7 +130,7 @@ def _collect_queue_metrics(redis_celery: Redis) -> list[Metric]: return metrics -def _build_start_latency_metric( +def _build_connector_start_latency_metric( cc_pair: ConnectorCredentialPair, recent_attempt: IndexAttempt, second_most_recent_attempt: IndexAttempt | None, @@ -197,7 +202,7 @@ def _build_run_success_metric( return Metric( key=metric_key, name="connector_run_succeeded", - value=(1 if recent_attempt.status == IndexingStatus.SUCCESS else 0), + value=recent_attempt.status == IndexingStatus.SUCCESS, tags={"source": str(cc_pair.connector.source)}, ) @@ -235,7 +240,7 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me continue # Connector start latency - start_latency_metric = _build_start_latency_metric( + start_latency_metric = _build_connector_start_latency_metric( cc_pair, recent_attempt, second_most_recent_attempt, redis_std ) if start_latency_metric: @@ -353,8 +358,16 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] # Calculate start latency in seconds start_latency = ( - sync_record.sync_start_time - entity.time_updated + sync_record.sync_start_time - entity.time_last_modified_by_user ).total_seconds() + if start_latency < 0: + task_logger.error( + f"Start latency is negative for sync record {sync_record.id} " + f"with type {sync_record.sync_type} and id {sync_record.entity_id}." + "This is likely because the entity was updated between the time the " + "time the sync finished and this job ran. Skipping." + ) + continue metrics.append( Metric( @@ -373,7 +386,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] @shared_task( name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, soft_time_limit=JOB_TIMEOUT, - queue="monitoring", + queue=OnyxCeleryQueues.MONITORING, bind=True, ) def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 6600c4378d8..dea1981f0fa 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -325,13 +325,15 @@ def try_generate_document_set_sync_tasks( f"document_set={document_set.id} tasks_generated={tasks_generated}" ) - # set this only after all tasks have been added - rds.set_fence(tasks_generated) + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created insert_sync_record( db_session=db_session, entity_id=document_set_id, sync_type=SyncType.DOCUMENT_SET, ) + # set this only after all tasks have been added + rds.set_fence(tasks_generated) return tasks_generated @@ -395,13 +397,16 @@ def try_generate_user_group_sync_tasks( f"usergroup={usergroup.id} tasks_generated={tasks_generated}" ) - # set this only after all tasks have been added - rug.set_fence(tasks_generated) + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created insert_sync_record( db_session=db_session, entity_id=usergroup_id, sync_type=SyncType.USER_GROUP, ) + # set this only after all tasks have been added + rug.set_fence(tasks_generated) + return tasks_generated diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 3fee9ae3a8c..a3d21bdc724 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -48,11 +48,6 @@ POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy" POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing" POSTGRES_CELERY_WORKER_MONITORING_APP_NAME = "celery_worker_monitoring" -POSTGRES_CELERY_BEAT_APP_NAME = "celery_beat" -POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME = "celery_worker_primary" -POSTGRES_CELERY_WORKER_LIGHT_APP_NAME = "celery_worker_light" -POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy" -POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing" POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME = "celery_worker_indexing_child" POSTGRES_PERMISSIONS_APP_NAME = "permissions" POSTGRES_UNKNOWN_APP_NAME = "unknown" diff --git a/backend/onyx/db/document_set.py b/backend/onyx/db/document_set.py index 79edbcc919f..54e9c3fc50c 100644 --- a/backend/onyx/db/document_set.py +++ b/backend/onyx/db/document_set.py @@ -294,7 +294,7 @@ def update_document_set( document_set_row.description = document_set_update_request.description document_set_row.is_up_to_date = False document_set_row.is_public = document_set_update_request.is_public - document_set_row.time_updated = func.now() + document_set_row.time_last_modified_by_user = func.now() versioned_private_doc_set_fn = fetch_versioned_implementation( "onyx.db.document_set", "make_doc_set_private" ) diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index f6793aa9b19..044ea0f42a1 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -904,6 +904,21 @@ class SyncRecord(Base): DateTime(timezone=True), nullable=True ) + __table_args__ = ( + Index( + "ix_sync_record_entity_id_sync_type_sync_start_time", + "entity_id", + "sync_type", + "sync_start_time", + ), + Index( + "ix_sync_record_entity_id_sync_type_sync_status", + "entity_id", + "sync_type", + "sync_status", + ), + ) + class DocumentByConnectorCredentialPair(Base): """Represents an indexing of a document by a specific connector / credential pair""" @@ -1309,7 +1324,7 @@ class DocumentSet(Base): is_public: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) # Last time a user updated this document set - time_updated: Mapped[datetime.datetime] = mapped_column( + time_last_modified_by_user: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) @@ -1793,7 +1808,7 @@ class UserGroup(Base): ) # Last time a user updated this user group - time_updated: Mapped[datetime.datetime] = mapped_column( + time_last_modified_by_user: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) diff --git a/backend/onyx/db/sync_record.py b/backend/onyx/db/sync_record.py index e59c0b56ce0..81b613c271c 100644 --- a/backend/onyx/db/sync_record.py +++ b/backend/onyx/db/sync_record.py @@ -82,7 +82,7 @@ def update_sync_record_status( sync_record = fetch_latest_sync_record(db_session, entity_id, sync_type) if sync_record is None: raise ValueError( - f"No sync record found for entity_id={entity_id} " f"sync_type={sync_type}" + f"No sync record found for entity_id={entity_id} sync_type={sync_type}" ) sync_record.sync_status = sync_status