Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/ee/onyx/background/celery/tasks/cleanup/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

from celery import shared_task

from ee.onyx.db.query_history import get_all_query_history_export_tasks
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import TaskStatus
from onyx.db.tasks import delete_task_with_id
from onyx.db.tasks import get_all_query_history_export_tasks
from onyx.utils.logger import setup_logger


Expand Down
104 changes: 104 additions & 0 deletions backend/ee/onyx/background/celery/tasks/cloud/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import time

from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis.lock import Lock as RedisLock

from ee.onyx.server.tenants.product_gating import get_gated_tenants
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_all_tenant_ids
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST


@shared_task(
name=OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
ignore_result=True,
trail=False,
bind=True,
)
def cloud_beat_task_generator(
self: Task,
task_name: str,
queue: str = OnyxCeleryTask.DEFAULT,
priority: int = OnyxCeleryPriority.MEDIUM,
expires: int = BEAT_EXPIRES_DEFAULT,
) -> bool | None:
"""a lightweight task used to kick off individual beat tasks per tenant."""
time_start = time.monotonic()

redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)

lock_beat: RedisLock = redis_client.lock(
f"{OnyxRedisLocks.CLOUD_BEAT_TASK_GENERATOR_LOCK}:{task_name}",
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,
)

# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

last_lock_time = time.monotonic()
tenant_ids: list[str] = []
num_processed_tenants = 0

try:
tenant_ids = get_all_tenant_ids()
gated_tenants = get_gated_tenants()
for tenant_id in tenant_ids:
if tenant_id in gated_tenants:
continue

current_time = time.monotonic()
if current_time - last_lock_time >= (CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4):
lock_beat.reacquire()
last_lock_time = current_time

# needed in the cloud
if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST:
continue

self.app.send_task(
task_name,
kwargs=dict(
tenant_id=tenant_id,
),
queue=queue,
priority=priority,
expires=expires,
ignore_result=True,
)

num_processed_tenants += 1
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception("Unexpected exception during cloud_beat_task_generator")
finally:
if not lock_beat.owned():
task_logger.error(
"cloud_beat_task_generator - Lock not owned on completion"
)
redis_lock_dump(lock_beat, redis_client)
else:
lock_beat.release()

time_elapsed = time.monotonic() - time_start
task_logger.info(
f"cloud_beat_task_generator finished: "
f"task={task_name} "
f"num_processed_tenants={num_processed_tenants} "
f"num_tenants={len(tenant_ids)} "
f"elapsed={time_elapsed:.2f}"
)
return True
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from redis import Redis
from redis.lock import Lock as RedisLock

from ee.onyx.background.celery.tasks.external_group_syncing.group_sync_utils import (
mark_all_relevant_cc_pairs_as_external_group_synced,
)
from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs
from ee.onyx.db.connector_credential_pair import get_cc_pairs_by_source
from ee.onyx.db.external_perm import ExternalUserGroup
Expand All @@ -26,9 +29,6 @@
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.tasks.external_group_syncing.group_sync_utils import (
mark_all_relevant_cc_pairs_as_external_group_synced,
)
from onyx.background.error_logging import emit_background_error
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT
Expand Down
17 changes: 0 additions & 17 deletions backend/ee/onyx/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,11 @@
SUPER_USERS = json.loads(os.environ.get("SUPER_USERS", "[]"))
SUPER_CLOUD_API_KEY = os.environ.get("SUPER_CLOUD_API_KEY", "api_key")

OAUTH_SLACK_CLIENT_ID = os.environ.get("OAUTH_SLACK_CLIENT_ID", "")
OAUTH_SLACK_CLIENT_SECRET = os.environ.get("OAUTH_SLACK_CLIENT_SECRET", "")
OAUTH_CONFLUENCE_CLOUD_CLIENT_ID = os.environ.get(
"OAUTH_CONFLUENCE_CLOUD_CLIENT_ID", ""
)
OAUTH_CONFLUENCE_CLOUD_CLIENT_SECRET = os.environ.get(
"OAUTH_CONFLUENCE_CLOUD_CLIENT_SECRET", ""
)
OAUTH_JIRA_CLOUD_CLIENT_ID = os.environ.get("OAUTH_JIRA_CLOUD_CLIENT_ID", "")
OAUTH_JIRA_CLOUD_CLIENT_SECRET = os.environ.get("OAUTH_JIRA_CLOUD_CLIENT_SECRET", "")
OAUTH_GOOGLE_DRIVE_CLIENT_ID = os.environ.get("OAUTH_GOOGLE_DRIVE_CLIENT_ID", "")
OAUTH_GOOGLE_DRIVE_CLIENT_SECRET = os.environ.get(
"OAUTH_GOOGLE_DRIVE_CLIENT_SECRET", ""
)

# The posthog client does not accept empty API keys or hosts however it fails silently
# when the capture is called. These defaults prevent Posthog issues from breaking the Onyx app
POSTHOG_API_KEY = os.environ.get("POSTHOG_API_KEY") or "FooBar"
POSTHOG_HOST = os.environ.get("POSTHOG_HOST") or "https://us.i.posthog.com"

HUBSPOT_TRACKING_URL = os.environ.get("HUBSPOT_TRACKING_URL")

ANONYMOUS_USER_COOKIE_NAME = "onyx_anonymous_user"

GATED_TENANTS_KEY = "gated_tenants"
9 changes: 9 additions & 0 deletions backend/ee/onyx/db/query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.expression import UnaryExpression

from ee.onyx.background.task_name_builders import QUERY_HISTORY_TASK_NAME_PREFIX
from onyx.configs.constants import QAFeedbackType
from onyx.db.models import ChatMessage
from onyx.db.models import ChatMessageFeedback
from onyx.db.models import ChatSession
from onyx.db.models import TaskQueueState
from onyx.db.tasks import get_all_tasks_with_prefix


def _build_filter_conditions(
Expand Down Expand Up @@ -171,3 +174,9 @@ def fetch_chat_sessions_eagerly_by_time(
chat_sessions = query.all()

return chat_sessions


def get_all_query_history_export_tasks(
db_session: Session,
) -> list[TaskQueueState]:
return get_all_tasks_with_prefix(db_session, QUERY_HISTORY_TASK_NAME_PREFIX)
2 changes: 2 additions & 0 deletions backend/ee/onyx/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ee.onyx.configs.app_configs import OPENID_CONFIG_URL
from ee.onyx.server.analytics.api import router as analytics_router
from ee.onyx.server.auth_check import check_ee_router_auth
from ee.onyx.server.documents.cc_pair import router as ee_document_cc_pair_router
from ee.onyx.server.enterprise_settings.api import (
admin_router as enterprise_settings_admin_router,
)
Expand Down Expand Up @@ -167,6 +168,7 @@ def get_application() -> FastAPI:
include_router_with_global_prefix_prepended(application, chat_router)
include_router_with_global_prefix_prepended(application, standard_answer_router)
include_router_with_global_prefix_prepended(application, ee_oauth_router)
include_router_with_global_prefix_prepended(application, ee_document_cc_pair_router)

# Enterprise-only global settings
include_router_with_global_prefix_prepended(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from ee.onyx.db.standard_answer import fetch_standard_answer_categories_by_names
from ee.onyx.db.standard_answer import find_matching_standard_answers
from ee.onyx.server.manage.models import StandardAnswer as PydanticStandardAnswer
from onyx.configs.constants import MessageType
from onyx.configs.onyxbot_configs import DANSWER_REACT_EMOJI
from onyx.db.chat import create_chat_session
Expand All @@ -24,6 +23,7 @@
from onyx.onyxbot.slack.models import SlackMessageInfo
from onyx.onyxbot.slack.utils import respond_in_thread_or_channel
from onyx.onyxbot.slack.utils import update_emote_react
from onyx.server.manage.models import StandardAnswer as PydanticStandardAnswer
from onyx.utils.logger import OnyxLoggingAdapter
from onyx.utils.logger import setup_logger

Expand Down
Loading