Skip to content

Commit 96cb081

Browse files
evan-onyxaponcedeleonch
authored andcommitted
feat: connector indexing decoupling (onyx-dot-app#4893)
* WIP * renamed and moved tasks (WIP) * minio migration * bug fixes and finally add document batch storage * WIP: can suceed but status is error * WIP * import fixes * working v1 of decoupled * catastrophe handling * refactor * remove unused db session in prep for new approach * renaming and docstrings (untested) * renames * WIP with no more indexing fences * robustness improvements * clean up rebase * migration and salesforce rate limits * minor tweaks * test fix * connector pausing behavior * correct checkpoint resumption logic * cleanups in docfetching * add heartbeat file * update template jsonc * deployment fixes * fix vespa httpx pool * error handling * cosmetic fixes * dumb * logging improvements and non checkpointed connector fixes * didnt save * misc fixes * fix import * fix deletion of old files * add in attempt prefix * fix attempt prefix * tiny log improvement * minor changes * fixed resumption behavior * passing int tests * fix unit test * fixed unit tests * trying timeout bump to see if int tests pass * trying timeout bump to see if int tests pass * fix autodiscovery * helm chart fixes * helm and logging
1 parent 004b517 commit 96cb081

File tree

107 files changed

+4980
-2605
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+4980
-2605
lines changed

.vscode/launch.template.jsonc

Lines changed: 51 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
"Celery primary",
4747
"Celery light",
4848
"Celery heavy",
49-
"Celery indexing",
49+
"Celery docfetching",
50+
"Celery docprocessing",
5051
"Celery user files indexing",
5152
"Celery beat",
5253
"Celery monitoring"
@@ -226,35 +227,66 @@
226227
"consoleTitle": "Celery heavy Console"
227228
},
228229
{
229-
"name": "Celery indexing",
230+
"name": "Celery docfetching",
230231
"type": "debugpy",
231232
"request": "launch",
232233
"module": "celery",
233234
"cwd": "${workspaceFolder}/backend",
234235
"envFile": "${workspaceFolder}/.vscode/.env",
235236
"env": {
236-
"ENABLE_MULTIPASS_INDEXING": "false",
237-
"LOG_LEVEL": "DEBUG",
238-
"PYTHONUNBUFFERED": "1",
239-
"PYTHONPATH": "."
237+
"LOG_LEVEL": "DEBUG",
238+
"PYTHONUNBUFFERED": "1",
239+
"PYTHONPATH": "."
240240
},
241241
"args": [
242-
"-A",
243-
"onyx.background.celery.versioned_apps.indexing",
244-
"worker",
245-
"--pool=threads",
246-
"--concurrency=1",
247-
"--prefetch-multiplier=1",
248-
"--loglevel=INFO",
249-
"--hostname=indexing@%n",
250-
"-Q",
251-
"connector_indexing"
242+
"-A",
243+
"onyx.background.celery.versioned_apps.docfetching",
244+
"worker",
245+
"--pool=threads",
246+
"--concurrency=1",
247+
"--prefetch-multiplier=1",
248+
"--loglevel=INFO",
249+
"--hostname=docfetching@%n",
250+
"-Q",
251+
"connector_doc_fetching,user_files_indexing"
252252
],
253253
"presentation": {
254-
"group": "2"
254+
"group": "2"
255255
},
256-
"consoleTitle": "Celery indexing Console"
257-
},
256+
"consoleTitle": "Celery docfetching Console",
257+
"justMyCode": false
258+
},
259+
{
260+
"name": "Celery docprocessing",
261+
"type": "debugpy",
262+
"request": "launch",
263+
"module": "celery",
264+
"cwd": "${workspaceFolder}/backend",
265+
"envFile": "${workspaceFolder}/.vscode/.env",
266+
"env": {
267+
"ENABLE_MULTIPASS_INDEXING": "false",
268+
"LOG_LEVEL": "DEBUG",
269+
"PYTHONUNBUFFERED": "1",
270+
"PYTHONPATH": "."
271+
},
272+
"args": [
273+
"-A",
274+
"onyx.background.celery.versioned_apps.docprocessing",
275+
"worker",
276+
"--pool=threads",
277+
"--concurrency=6",
278+
"--prefetch-multiplier=1",
279+
"--loglevel=INFO",
280+
"--hostname=docprocessing@%n",
281+
"-Q",
282+
"docprocessing"
283+
],
284+
"presentation": {
285+
"group": "2"
286+
},
287+
"consoleTitle": "Celery docprocessing Console",
288+
"justMyCode": false
289+
},
258290
{
259291
"name": "Celery monitoring",
260292
"type": "debugpy",
@@ -303,35 +335,6 @@
303335
},
304336
"consoleTitle": "Celery beat Console"
305337
},
306-
{
307-
"name": "Celery user files indexing",
308-
"type": "debugpy",
309-
"request": "launch",
310-
"module": "celery",
311-
"cwd": "${workspaceFolder}/backend",
312-
"envFile": "${workspaceFolder}/.vscode/.env",
313-
"env": {
314-
"LOG_LEVEL": "DEBUG",
315-
"PYTHONUNBUFFERED": "1",
316-
"PYTHONPATH": "."
317-
},
318-
"args": [
319-
"-A",
320-
"onyx.background.celery.versioned_apps.indexing",
321-
"worker",
322-
"--pool=threads",
323-
"--concurrency=1",
324-
"--prefetch-multiplier=1",
325-
"--loglevel=INFO",
326-
"--hostname=user_files_indexing@%n",
327-
"-Q",
328-
"user_files_indexing"
329-
],
330-
"presentation": {
331-
"group": "2"
332-
},
333-
"consoleTitle": "Celery user files indexing Console"
334-
},
335338
{
336339
"name": "Pytest",
337340
"consoleName": "Pytest",

backend/alembic/versions/12635f6655b7_drive_canonical_ids.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def get_google_drive_documents_from_database() -> list[dict]:
9696
result = bind.execute(
9797
sa.text(
9898
"""
99-
SELECT d.id, cc.id as cc_pair_id
99+
SELECT d.id
100100
FROM document d
101101
JOIN document_by_connector_credential_pair dcc ON d.id = dcc.id
102102
JOIN connector_credential_pair cc ON dcc.connector_id = cc.connector_id
@@ -109,7 +109,7 @@ def get_google_drive_documents_from_database() -> list[dict]:
109109

110110
documents = []
111111
for row in result:
112-
documents.append({"document_id": row.id, "cc_pair_id": row.cc_pair_id})
112+
documents.append({"document_id": row.id})
113113

114114
return documents
115115

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""add_indexing_coordination
2+
3+
Revision ID: 2f95e36923e6
4+
Revises: 0816326d83aa
5+
Create Date: 2025-07-10 16:17:57.762182
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "2f95e36923e6"
15+
down_revision = "0816326d83aa"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade() -> None:
21+
# Add database-based coordination fields (replacing Redis fencing)
22+
op.add_column(
23+
"index_attempt", sa.Column("celery_task_id", sa.String(), nullable=True)
24+
)
25+
op.add_column(
26+
"index_attempt",
27+
sa.Column(
28+
"cancellation_requested",
29+
sa.Boolean(),
30+
nullable=False,
31+
server_default="false",
32+
),
33+
)
34+
35+
# Add batch coordination fields (replacing FileStore state)
36+
op.add_column(
37+
"index_attempt", sa.Column("total_batches", sa.Integer(), nullable=True)
38+
)
39+
op.add_column(
40+
"index_attempt",
41+
sa.Column(
42+
"completed_batches", sa.Integer(), nullable=False, server_default="0"
43+
),
44+
)
45+
op.add_column(
46+
"index_attempt",
47+
sa.Column(
48+
"total_failures_batch_level",
49+
sa.Integer(),
50+
nullable=False,
51+
server_default="0",
52+
),
53+
)
54+
op.add_column(
55+
"index_attempt",
56+
sa.Column("total_chunks", sa.Integer(), nullable=False, server_default="0"),
57+
)
58+
59+
# Progress tracking for stall detection
60+
op.add_column(
61+
"index_attempt",
62+
sa.Column("last_progress_time", sa.DateTime(timezone=True), nullable=True),
63+
)
64+
op.add_column(
65+
"index_attempt",
66+
sa.Column(
67+
"last_batches_completed_count",
68+
sa.Integer(),
69+
nullable=False,
70+
server_default="0",
71+
),
72+
)
73+
74+
# Heartbeat tracking for worker liveness detection
75+
op.add_column(
76+
"index_attempt",
77+
sa.Column(
78+
"heartbeat_counter", sa.Integer(), nullable=False, server_default="0"
79+
),
80+
)
81+
op.add_column(
82+
"index_attempt",
83+
sa.Column(
84+
"last_heartbeat_value", sa.Integer(), nullable=False, server_default="0"
85+
),
86+
)
87+
op.add_column(
88+
"index_attempt",
89+
sa.Column("last_heartbeat_time", sa.DateTime(timezone=True), nullable=True),
90+
)
91+
92+
# Add index for coordination queries
93+
op.create_index(
94+
"ix_index_attempt_active_coordination",
95+
"index_attempt",
96+
["connector_credential_pair_id", "search_settings_id", "status"],
97+
)
98+
99+
100+
def downgrade() -> None:
101+
# Remove the new index
102+
op.drop_index("ix_index_attempt_active_coordination", table_name="index_attempt")
103+
104+
# Remove the new columns
105+
op.drop_column("index_attempt", "last_batches_completed_count")
106+
op.drop_column("index_attempt", "last_progress_time")
107+
op.drop_column("index_attempt", "last_heartbeat_time")
108+
op.drop_column("index_attempt", "last_heartbeat_value")
109+
op.drop_column("index_attempt", "heartbeat_counter")
110+
op.drop_column("index_attempt", "total_chunks")
111+
op.drop_column("index_attempt", "total_failures_batch_level")
112+
op.drop_column("index_attempt", "completed_batches")
113+
op.drop_column("index_attempt", "total_batches")
114+
op.drop_column("index_attempt", "cancellation_requested")
115+
op.drop_column("index_attempt", "celery_task_id")

backend/alembic/versions/c9e2cd766c29_add_s3_file_store_table.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def _migrate_files_to_postgres() -> None:
159159

160160
# only create external store if we have files to migrate. This line
161161
# makes it so we need to have S3/MinIO configured to run this migration.
162-
external_store = get_s3_file_store(db_session=session)
162+
external_store = get_s3_file_store()
163163

164164
for i, file_id in enumerate(files_to_migrate, 1):
165165
print(f"Migrating file {i}/{total_files}: {file_id}")
@@ -219,7 +219,7 @@ def _migrate_files_to_external_storage() -> None:
219219
# Get database session
220220
bind = op.get_bind()
221221
session = Session(bind=bind)
222-
external_store = get_s3_file_store(db_session=session)
222+
external_store = get_s3_file_store()
223223

224224
# Find all files currently stored in PostgreSQL (lobj_oid is not null)
225225
result = session.execute(

backend/ee/onyx/background/celery/apps/heavy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def export_query_history_task(
9191
with get_session_with_current_tenant() as db_session:
9292
try:
9393
stream.seek(0)
94-
get_default_file_store(db_session).save_file(
94+
get_default_file_store().save_file(
9595
content=stream,
9696
display_name=report_name,
9797
file_origin=FileOrigin.QUERY_HISTORY_CSV,

backend/ee/onyx/background/celery/tasks/doc_permission_syncing/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def connector_permission_sync_generator_task(
422422

423423
lock: RedisLock = r.lock(
424424
OnyxRedisLocks.CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX
425-
+ f"_{redis_connector.id}",
425+
+ f"_{redis_connector.cc_pair_id}",
426426
timeout=CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT,
427427
thread_local=False,
428428
)

backend/ee/onyx/background/celery/tasks/external_group_syncing/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ def connector_external_group_sync_generator_task(
383383

384384
lock: RedisLock = r.lock(
385385
OnyxRedisLocks.CONNECTOR_EXTERNAL_GROUP_SYNC_LOCK_PREFIX
386-
+ f"_{redis_connector.id}",
386+
+ f"_{redis_connector.cc_pair_id}",
387387
timeout=CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT,
388388
)
389389

backend/ee/onyx/db/usage_export.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ def get_all_usage_reports(db_session: Session) -> list[UsageReportMetadata]:
114114

115115

116116
def get_usage_report_data(
117-
db_session: Session,
118117
report_display_name: str,
119118
) -> IO:
120119
"""
@@ -128,7 +127,7 @@ def get_usage_report_data(
128127
Returns:
129128
The usage report data.
130129
"""
131-
file_store = get_default_file_store(db_session)
130+
file_store = get_default_file_store()
132131
# usage report may be very large, so don't load it all into memory
133132
return file_store.read_file(
134133
file_id=report_display_name, mode="b", use_tempfile=True

backend/ee/onyx/server/enterprise_settings/api.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,14 @@ def ee_fetch_settings() -> EnterpriseSettings:
134134
def put_logo(
135135
file: UploadFile,
136136
is_logotype: bool = False,
137-
db_session: Session = Depends(get_session),
138137
_: User | None = Depends(current_admin_user),
139138
) -> None:
140-
upload_logo(file=file, db_session=db_session, is_logotype=is_logotype)
139+
upload_logo(file=file, is_logotype=is_logotype)
141140

142141

143142
def fetch_logo_helper(db_session: Session) -> Response:
144143
try:
145-
file_store = get_default_file_store(db_session)
144+
file_store = get_default_file_store()
146145
onyx_file = file_store.get_file_with_mime_type(get_logo_filename())
147146
if not onyx_file:
148147
raise ValueError("get_onyx_file returned None!")
@@ -158,7 +157,7 @@ def fetch_logo_helper(db_session: Session) -> Response:
158157

159158
def fetch_logotype_helper(db_session: Session) -> Response:
160159
try:
161-
file_store = get_default_file_store(db_session)
160+
file_store = get_default_file_store()
162161
onyx_file = file_store.get_file_with_mime_type(get_logotype_filename())
163162
if not onyx_file:
164163
raise ValueError("get_onyx_file returned None!")

backend/ee/onyx/server/enterprise_settings/store.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
from fastapi import HTTPException
88
from fastapi import UploadFile
9-
from sqlalchemy.orm import Session
109

1110
from ee.onyx.server.enterprise_settings.models import AnalyticsScriptUpload
1211
from ee.onyx.server.enterprise_settings.models import EnterpriseSettings
@@ -99,9 +98,7 @@ def guess_file_type(filename: str) -> str:
9998
return "application/octet-stream"
10099

101100

102-
def upload_logo(
103-
db_session: Session, file: UploadFile | str, is_logotype: bool = False
104-
) -> bool:
101+
def upload_logo(file: UploadFile | str, is_logotype: bool = False) -> bool:
105102
content: IO[Any]
106103

107104
if isinstance(file, str):
@@ -129,7 +126,7 @@ def upload_logo(
129126
display_name = file.filename
130127
file_type = file.content_type or "image/jpeg"
131128

132-
file_store = get_default_file_store(db_session)
129+
file_store = get_default_file_store()
133130
file_store.save_file(
134131
content=content,
135132
display_name=display_name,

0 commit comments

Comments
 (0)