Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/onyx/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ def _try_updating_schedule(self) -> None:
current_schedule = self.schedule.items()

# get potential new state
beat_multiplier = OnyxRuntime.get_beat_multiplier()
try:
beat_multiplier = OnyxRuntime.get_beat_multiplier()
except Exception:
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT

new_schedule = self._generate_schedule(tenant_ids, beat_multiplier)

Expand Down
10 changes: 10 additions & 0 deletions backend/onyx/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]:
"queue": OnyxCeleryQueues.MONITORING,
},
},
{
"name": "celery-beat-heartbeat",
"task": OnyxCeleryTask.CELERY_BEAT_HEARTBEAT,
"schedule": timedelta(minutes=1),
"options": {
"priority": OnyxCeleryPriority.HIGHEST,
"expires": BEAT_EXPIRES_DEFAULT,
"queue": OnyxCeleryQueues.PRIMARY,
},
},
]
)

Expand Down
15 changes: 15 additions & 0 deletions backend/onyx/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.lock import Lock as RedisLock
from tenacity import RetryError

Expand All @@ -15,6 +16,7 @@
from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT
from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import ONYX_CELERY_BEAT_HEARTBEAT_KEY
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
Expand Down Expand Up @@ -353,3 +355,16 @@ def cloud_beat_task_generator(
f"elapsed={time_elapsed:.2f}"
)
return True


@shared_task(name=OnyxCeleryTask.CELERY_BEAT_HEARTBEAT, ignore_result=True, bind=True)
def celery_beat_heartbeat(self: Task, *, tenant_id: str) -> None:
"""When this task runs, it writes a key to Redis with a TTL.

An external observer can check this key to figure out if the celery beat is still running.
"""
time_start = time.monotonic()
r: Redis = get_redis_client()
r.set(ONYX_CELERY_BEAT_HEARTBEAT_KEY, 1, ex=600)
time_elapsed = time.monotonic() - time_start
task_logger.info(f"celery_beat_heartbeat finished: " f"elapsed={time_elapsed:.2f}")
4 changes: 4 additions & 0 deletions backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ class OnyxCeleryTask:
MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes"
MONITOR_CELERY_QUEUES = "monitor_celery_queues"
MONITOR_PROCESS_MEMORY = "monitor_process_memory"
CELERY_BEAT_HEARTBEAT = "celery_beat_heartbeat"

KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task"
CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = (
Expand All @@ -444,6 +445,9 @@ class OnyxCeleryTask:
AUTOGENERATE_USAGE_REPORT_TASK = "autogenerate_usage_report_task"


# this needs to correspond to the matching entry in supervisord
ONYX_CELERY_BEAT_HEARTBEAT_KEY = "onyx:celery:beat:heartbeat"

REDIS_SOCKET_KEEPALIVE_OPTIONS = {}
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/connectors/confluence/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(
}

def set_allow_images(self, value: bool) -> None:
logger.info(f"Setting allow_images to {value}.")
self.allow_images = value

@property
Expand Down
109 changes: 109 additions & 0 deletions backend/onyx/utils/supervisord_watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python3

import argparse
import subprocess
import time

from onyx.redis.redis_pool import get_redis_client
from onyx.utils.logger import setup_logger


logger = setup_logger()

MAX_AGE_SECONDS = 900 # how old the heartbeat can be
CHECK_INTERVAL = 60 # how often to check
MAX_LOOKUP_FAILURES = 5


def main(key: str, program: str, conf: str) -> None:
"""This script will restart the watchdog'd supervisord process via supervisorctl.

This process continually looks up a specific redis key. If it is missing for a
consecutive number of times and the last successful lookup is more
than a threshold time, the specified program will be restarted.
"""
logger.info(f"supervisord_watchdog starting: program={program} conf={conf}")

r = get_redis_client()

last_heartbeat = time.monotonic()
num_lookup_failures = 0

try:
while True:
time.sleep(CHECK_INTERVAL)

now = time.monotonic()

# check for the key ... handle any exception gracefully
try:
heartbeat = r.exists(key)
except Exception:
logger.exception(
f"Exception checking for celery beat heartbeat: key={key}."
)
continue

# happy path ... just continue
if heartbeat:
logger.debug(f"Key lookup succeeded: key={key}")
last_heartbeat = time.monotonic()
num_lookup_failures = 0
continue

# if we haven't exceeded the max lookup failures, continue
num_lookup_failures += 1
if num_lookup_failures <= MAX_LOOKUP_FAILURES:
logger.warning(
f"Key lookup failed: key={key} "
f"lookup_failures={num_lookup_failures} "
f"max_lookup_failures={MAX_LOOKUP_FAILURES}"
)
continue

# if we haven't exceeded the max missing key timeout threshold, continue
elapsed = now - last_heartbeat
if elapsed <= MAX_AGE_SECONDS:
logger.warning(
f"Key lookup failed: key={key} "
f"lookup_failures={num_lookup_failures} "
f"max_lookup_failures={MAX_LOOKUP_FAILURES} "
f"elapsed={elapsed:.2f} "
f"elapsed_threshold={MAX_AGE_SECONDS}"
)
continue

# all conditions have been exceeded ... restart the process
logger.warning(
f"Key lookup failure thresholds exceeded - restarting {program}: "
f"key={key} "
f"lookup_failures={num_lookup_failures} "
f"max_lookup_failures={MAX_LOOKUP_FAILURES} "
f"elapsed={elapsed:.2f} "
f"elapsed_threshold={MAX_AGE_SECONDS}"
)

subprocess.call(["supervisorctl", "-c", conf, "restart", program])

# reset state so that we properly delay until the next restart
# instead of continually restarting
num_lookup_failures = 0
last_heartbeat = time.monotonic()
except KeyboardInterrupt:
logger.info("Caught interrupt, exiting watchdog.")

logger.info("supervisord_watchdog exiting.")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Supervisord Watchdog")
parser.add_argument("--key", help="The redis key to watch", required=True)
parser.add_argument(
"--program", help="The supervisord program to restart", required=True
)
parser.add_argument(
"--conf", type=str, help="Path to supervisord config file", required=True
)
args = parser.parse_args()

main(args.key, args.program, args.conf)
2 changes: 1 addition & 1 deletion backend/requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ asyncpg==0.27.0
atlassian-python-api==3.41.16
beautifulsoup4==4.12.3
boto3==1.36.23
celery==5.5.0b4
celery==5.5.1
chardet==5.2.0
dask==2023.8.1
ddtrace==2.6.5
Expand Down
27 changes: 27 additions & 0 deletions backend/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ nodaemon=true
user=root
logfile=/var/log/supervisord.log

# region enable supervisorctl usage
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock

[unix_http_server]
file=/tmp/supervisor.sock
chmod=0700

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
# endregion enable supervisorctl usage

# Background jobs that must be run async due to long time to completion
# NOTE: due to an issue with Celery + SQLAlchemy
# (https://github.com/celery/celery/issues/7007#issuecomment-1740139367)
Expand Down Expand Up @@ -98,6 +110,20 @@ redirect_stderr=true
startsecs=10
stopasgroup=true

# watchdog to detect and restart the beat in case of inactivity
# supervisord only restarts the process if it's dead
# make sure this key matches ONYX_CELERY_BEAT_HEARTBEAT_KEY
[program:supervisord_watchdog_celery_beat]
command=python onyx/utils/supervisord_watchdog.py
--conf /etc/supervisor/conf.d/supervisord.conf
--key "onyx:celery:beat:heartbeat"
--program celery_beat
stdout_logfile=/var/log/supervisord_watchdog_celery_beat.log
stdout_logfile_maxbytes=16MB
redirect_stderr=true
startsecs=10
stopasgroup=true

# Listens for Slack messages and responds with answers
# for all channels that the OnyxBot has been added to.
# If not setup, this will just fail 5 times and then stop.
Expand All @@ -123,6 +149,7 @@ command=tail -qF
/var/log/celery_worker_user_files_indexing.log
/var/log/celery_worker_monitoring.log
/var/log/slack_bot.log
/var/log/supervisord_watchdog_celery_beat.log
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes = 0 # must be set to 0 when stdout_logfile=/dev/stdout
autorestart=true
Loading