Skip to content

Commit 7f1e4a0

Browse files
Feature/kill indexing (#3213)
* checkpoint * add celery termination of the task * rename to RedisConnectorPermissionSyncPayload, add RedisLock to more places, add get_active_search_settings * rename payload * pretty sure these weren't named correctly * testing in progress * cleanup * remove space * merge fix * three dots animation on Pausing * improve messaging when connector is stopped or killed and animate buttons --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com>
1 parent 5be7d27 commit 7f1e4a0

File tree

21 files changed

+539
-126
lines changed

21 files changed

+539
-126
lines changed

backend/danswer/background/celery/tasks/connector_deletion/tasks.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from celery import shared_task
66
from celery import Task
77
from celery.exceptions import SoftTimeLimitExceeded
8-
from redis import Redis
98
from redis.lock import Lock as RedisLock
109
from sqlalchemy.orm import Session
1110

@@ -37,7 +36,7 @@ class TaskDependencyError(RuntimeError):
3736
def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> None:
3837
r = get_redis_client(tenant_id=tenant_id)
3938

40-
lock_beat = r.lock(
39+
lock_beat: RedisLock = r.lock(
4140
DanswerRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK,
4241
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
4342
)
@@ -60,7 +59,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
6059
redis_connector = RedisConnector(tenant_id, cc_pair_id)
6160
try:
6261
try_generate_document_cc_pair_cleanup_tasks(
63-
self.app, cc_pair_id, db_session, r, lock_beat, tenant_id
62+
self.app, cc_pair_id, db_session, lock_beat, tenant_id
6463
)
6564
except TaskDependencyError as e:
6665
# this means we wanted to start deleting but dependent tasks were running
@@ -86,7 +85,6 @@ def try_generate_document_cc_pair_cleanup_tasks(
8685
app: Celery,
8786
cc_pair_id: int,
8887
db_session: Session,
89-
r: Redis,
9088
lock_beat: RedisLock,
9189
tenant_id: str | None,
9290
) -> int | None:

backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from celery import Task
99
from celery.exceptions import SoftTimeLimitExceeded
1010
from redis import Redis
11+
from redis.lock import Lock as RedisLock
1112

1213
from danswer.access.models import DocExternalAccess
1314
from danswer.background.celery.apps.app_base import task_logger
@@ -27,7 +28,7 @@
2728
from danswer.db.users import batch_add_ext_perm_user_if_not_exists
2829
from danswer.redis.redis_connector import RedisConnector
2930
from danswer.redis.redis_connector_doc_perm_sync import (
30-
RedisConnectorPermissionSyncData,
31+
RedisConnectorPermissionSyncPayload,
3132
)
3233
from danswer.redis.redis_pool import get_redis_client
3334
from danswer.utils.logger import doc_permission_sync_ctx
@@ -138,7 +139,7 @@ def try_creating_permissions_sync_task(
138139

139140
LOCK_TIMEOUT = 30
140141

141-
lock = r.lock(
142+
lock: RedisLock = r.lock(
142143
DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_generate_permissions_sync_tasks",
143144
timeout=LOCK_TIMEOUT,
144145
)
@@ -162,7 +163,7 @@ def try_creating_permissions_sync_task(
162163

163164
custom_task_id = f"{redis_connector.permissions.generator_task_key}_{uuid4()}"
164165

165-
app.send_task(
166+
result = app.send_task(
166167
"connector_permission_sync_generator_task",
167168
kwargs=dict(
168169
cc_pair_id=cc_pair_id,
@@ -174,8 +175,8 @@ def try_creating_permissions_sync_task(
174175
)
175176

176177
# set a basic fence to start
177-
payload = RedisConnectorPermissionSyncData(
178-
started=None,
178+
payload = RedisConnectorPermissionSyncPayload(
179+
started=None, celery_task_id=result.id
179180
)
180181

181182
redis_connector.permissions.set_fence(payload)
@@ -247,9 +248,11 @@ def connector_permission_sync_generator_task(
247248

248249
logger.info(f"Syncing docs for {source_type} with cc_pair={cc_pair_id}")
249250

250-
payload = RedisConnectorPermissionSyncData(
251-
started=datetime.now(timezone.utc),
252-
)
251+
payload = redis_connector.permissions.payload
252+
if not payload:
253+
raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}")
254+
255+
payload.started = datetime.now(timezone.utc)
253256
redis_connector.permissions.set_fence(payload)
254257

255258
document_external_accesses: list[DocExternalAccess] = doc_sync_func(cc_pair)

backend/danswer/background/celery/tasks/external_group_syncing/tasks.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from celery import Task
99
from celery.exceptions import SoftTimeLimitExceeded
1010
from redis import Redis
11+
from redis.lock import Lock as RedisLock
1112

1213
from danswer.background.celery.apps.app_base import task_logger
1314
from danswer.configs.app_configs import JOB_TIMEOUT
@@ -24,6 +25,9 @@
2425
from danswer.db.enums import ConnectorCredentialPairStatus
2526
from danswer.db.models import ConnectorCredentialPair
2627
from danswer.redis.redis_connector import RedisConnector
28+
from danswer.redis.redis_connector_ext_group_sync import (
29+
RedisConnectorExternalGroupSyncPayload,
30+
)
2731
from danswer.redis.redis_pool import get_redis_client
2832
from danswer.utils.logger import setup_logger
2933
from ee.danswer.db.connector_credential_pair import get_all_auto_sync_cc_pairs
@@ -107,7 +111,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
107111
cc_pair_ids_to_sync.append(cc_pair.id)
108112

109113
for cc_pair_id in cc_pair_ids_to_sync:
110-
tasks_created = try_creating_permissions_sync_task(
114+
tasks_created = try_creating_external_group_sync_task(
111115
self.app, cc_pair_id, r, tenant_id
112116
)
113117
if not tasks_created:
@@ -125,7 +129,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
125129
lock_beat.release()
126130

127131

128-
def try_creating_permissions_sync_task(
132+
def try_creating_external_group_sync_task(
129133
app: Celery,
130134
cc_pair_id: int,
131135
r: Redis,
@@ -156,7 +160,7 @@ def try_creating_permissions_sync_task(
156160

157161
custom_task_id = f"{redis_connector.external_group_sync.taskset_key}_{uuid4()}"
158162

159-
_ = app.send_task(
163+
result = app.send_task(
160164
"connector_external_group_sync_generator_task",
161165
kwargs=dict(
162166
cc_pair_id=cc_pair_id,
@@ -166,8 +170,13 @@ def try_creating_permissions_sync_task(
166170
task_id=custom_task_id,
167171
priority=DanswerCeleryPriority.HIGH,
168172
)
169-
# set a basic fence to start
170-
redis_connector.external_group_sync.set_fence(True)
173+
174+
payload = RedisConnectorExternalGroupSyncPayload(
175+
started=datetime.now(timezone.utc),
176+
celery_task_id=result.id,
177+
)
178+
179+
redis_connector.external_group_sync.set_fence(payload)
171180

172181
except Exception:
173182
task_logger.exception(
@@ -203,7 +212,7 @@ def connector_external_group_sync_generator_task(
203212

204213
r = get_redis_client(tenant_id=tenant_id)
205214

206-
lock = r.lock(
215+
lock: RedisLock = r.lock(
207216
DanswerRedisLocks.CONNECTOR_EXTERNAL_GROUP_SYNC_LOCK_PREFIX
208217
+ f"_{redis_connector.id}",
209218
timeout=CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT,
@@ -253,7 +262,6 @@ def connector_external_group_sync_generator_task(
253262
)
254263

255264
mark_cc_pair_as_external_group_synced(db_session, cc_pair.id)
256-
257265
except Exception as e:
258266
task_logger.exception(
259267
f"Failed to run external group sync: cc_pair={cc_pair_id}"
@@ -264,6 +272,6 @@ def connector_external_group_sync_generator_task(
264272
raise e
265273
finally:
266274
# we always want to clear the fence after the task is done or failed so it doesn't get stuck
267-
redis_connector.external_group_sync.set_fence(False)
275+
redis_connector.external_group_sync.set_fence(None)
268276
if lock.owned():
269277
lock.release()

backend/danswer/background/celery/tasks/indexing/tasks.py

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@
3939
from danswer.db.index_attempt import get_all_index_attempts_by_status
4040
from danswer.db.index_attempt import get_index_attempt
4141
from danswer.db.index_attempt import get_last_attempt_for_cc_pair
42+
from danswer.db.index_attempt import mark_attempt_canceled
4243
from danswer.db.index_attempt import mark_attempt_failed
4344
from danswer.db.models import ConnectorCredentialPair
4445
from danswer.db.models import IndexAttempt
4546
from danswer.db.models import SearchSettings
47+
from danswer.db.search_settings import get_active_search_settings
4648
from danswer.db.search_settings import get_current_search_settings
47-
from danswer.db.search_settings import get_secondary_search_settings
4849
from danswer.db.swap_index import check_index_swap
4950
from danswer.indexing.indexing_heartbeat import IndexingHeartbeatInterface
5051
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
@@ -209,17 +210,10 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
209210

210211
redis_connector = RedisConnector(tenant_id, cc_pair_id)
211212
with get_session_with_tenant(tenant_id) as db_session:
212-
# Get the primary search settings
213-
primary_search_settings = get_current_search_settings(db_session)
214-
search_settings = [primary_search_settings]
215-
216-
# Check for secondary search settings
217-
secondary_search_settings = get_secondary_search_settings(db_session)
218-
if secondary_search_settings is not None:
219-
# If secondary settings exist, add them to the list
220-
search_settings.append(secondary_search_settings)
221-
222-
for search_settings_instance in search_settings:
213+
search_settings_list: list[SearchSettings] = get_active_search_settings(
214+
db_session
215+
)
216+
for search_settings_instance in search_settings_list:
223217
redis_connector_index = redis_connector.new_index(
224218
search_settings_instance.id
225219
)
@@ -237,21 +231,21 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
237231
)
238232

239233
search_settings_primary = False
240-
if search_settings_instance.id == primary_search_settings.id:
234+
if search_settings_instance.id == search_settings_list[0].id:
241235
search_settings_primary = True
242236

243237
if not _should_index(
244238
cc_pair=cc_pair,
245239
last_index=last_attempt,
246240
search_settings_instance=search_settings_instance,
247241
search_settings_primary=search_settings_primary,
248-
secondary_index_building=len(search_settings) > 1,
242+
secondary_index_building=len(search_settings_list) > 1,
249243
db_session=db_session,
250244
):
251245
continue
252246

253247
reindex = False
254-
if search_settings_instance.id == primary_search_settings.id:
248+
if search_settings_instance.id == search_settings_list[0].id:
255249
# the indexing trigger is only checked and cleared with the primary search settings
256250
if cc_pair.indexing_trigger is not None:
257251
if cc_pair.indexing_trigger == IndexingMode.REINDEX:
@@ -284,7 +278,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
284278
f"Connector indexing queued: "
285279
f"index_attempt={attempt_id} "
286280
f"cc_pair={cc_pair.id} "
287-
f"search_settings={search_settings_instance.id} "
281+
f"search_settings={search_settings_instance.id}"
288282
)
289283
tasks_created += 1
290284

@@ -529,8 +523,11 @@ def try_creating_indexing_task(
529523
return index_attempt_id
530524

531525

532-
@shared_task(name="connector_indexing_proxy_task", acks_late=False, track_started=True)
526+
@shared_task(
527+
name="connector_indexing_proxy_task", bind=True, acks_late=False, track_started=True
528+
)
533529
def connector_indexing_proxy_task(
530+
self: Task,
534531
index_attempt_id: int,
535532
cc_pair_id: int,
536533
search_settings_id: int,
@@ -543,6 +540,10 @@ def connector_indexing_proxy_task(
543540
f"cc_pair={cc_pair_id} "
544541
f"search_settings={search_settings_id}"
545542
)
543+
544+
if not self.request.id:
545+
task_logger.error("self.request.id is None!")
546+
546547
client = SimpleJobClient()
547548

548549
job = client.submit(
@@ -571,8 +572,30 @@ def connector_indexing_proxy_task(
571572
f"search_settings={search_settings_id}"
572573
)
573574

575+
redis_connector = RedisConnector(tenant_id, cc_pair_id)
576+
redis_connector_index = redis_connector.new_index(search_settings_id)
577+
574578
while True:
575-
sleep(10)
579+
sleep(5)
580+
581+
if self.request.id and redis_connector_index.terminating(self.request.id):
582+
task_logger.warning(
583+
"Indexing proxy - termination signal detected: "
584+
f"attempt={index_attempt_id} "
585+
f"tenant={tenant_id} "
586+
f"cc_pair={cc_pair_id} "
587+
f"search_settings={search_settings_id}"
588+
)
589+
590+
with get_session_with_tenant(tenant_id) as db_session:
591+
mark_attempt_canceled(
592+
index_attempt_id,
593+
db_session,
594+
"Connector termination signal detected",
595+
)
596+
597+
job.cancel()
598+
break
576599

577600
# do nothing for ongoing jobs that haven't been stopped
578601
if not job.done():

backend/danswer/background/celery/tasks/vespa/tasks.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from danswer.db.document_set import get_document_set_by_id
4747
from danswer.db.document_set import mark_document_set_as_synced
4848
from danswer.db.engine import get_session_with_tenant
49+
from danswer.db.enums import IndexingStatus
4950
from danswer.db.index_attempt import delete_index_attempts
5051
from danswer.db.index_attempt import get_index_attempt
5152
from danswer.db.index_attempt import mark_attempt_failed
@@ -58,7 +59,7 @@
5859
from danswer.redis.redis_connector_delete import RedisConnectorDelete
5960
from danswer.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
6061
from danswer.redis.redis_connector_doc_perm_sync import (
61-
RedisConnectorPermissionSyncData,
62+
RedisConnectorPermissionSyncPayload,
6263
)
6364
from danswer.redis.redis_connector_index import RedisConnectorIndex
6465
from danswer.redis.redis_connector_prune import RedisConnectorPrune
@@ -588,17 +589,15 @@ def monitor_ccpair_permissions_taskset(
588589
if remaining > 0:
589590
return
590591

591-
payload: RedisConnectorPermissionSyncData | None = (
592+
payload: RedisConnectorPermissionSyncPayload | None = (
592593
redis_connector.permissions.payload
593594
)
594595
start_time: datetime | None = payload.started if payload else None
595596

596597
mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), start_time)
597598
task_logger.info(f"Successfully synced permissions for cc_pair={cc_pair_id}")
598599

599-
redis_connector.permissions.taskset_clear()
600-
redis_connector.permissions.generator_clear()
601-
redis_connector.permissions.set_fence(None)
600+
redis_connector.permissions.reset()
602601

603602

604603
def monitor_ccpair_indexing_taskset(
@@ -678,11 +677,15 @@ def monitor_ccpair_indexing_taskset(
678677

679678
index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
680679
if index_attempt:
681-
mark_attempt_failed(
682-
index_attempt_id=payload.index_attempt_id,
683-
db_session=db_session,
684-
failure_reason=msg,
685-
)
680+
if (
681+
index_attempt.status != IndexingStatus.CANCELED
682+
and index_attempt.status != IndexingStatus.FAILED
683+
):
684+
mark_attempt_failed(
685+
index_attempt_id=payload.index_attempt_id,
686+
db_session=db_session,
687+
failure_reason=msg,
688+
)
686689

687690
redis_connector_index.reset()
688691
return
@@ -692,6 +695,7 @@ def monitor_ccpair_indexing_taskset(
692695
task_logger.info(
693696
f"Connector indexing finished: cc_pair={cc_pair_id} "
694697
f"search_settings={search_settings_id} "
698+
f"progress={progress} "
695699
f"status={status_enum.name} "
696700
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
697701
)
@@ -724,7 +728,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
724728

725729
# print current queue lengths
726730
r_celery = self.app.broker_connection().channel().client # type: ignore
727-
n_celery = celery_get_queue_length("celery", r)
731+
n_celery = celery_get_queue_length("celery", r_celery)
728732
n_indexing = celery_get_queue_length(
729733
DanswerCeleryQueues.CONNECTOR_INDEXING, r_celery
730734
)

0 commit comments

Comments
 (0)