Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 76 additions & 16 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,12 @@ def connector_indexing_proxy_task(
tenant_id: str | None,
) -> None:
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""

task_logger.info(
f"Indexing proxy - starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
client = SimpleJobClient()

job = client.submit(
Expand All @@ -402,29 +407,56 @@ def connector_indexing_proxy_task(
)

if not job:
task_logger.info(
f"Indexing proxy - spawn failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return

task_logger.info(
f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

while True:
sleep(10)
with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)

# do nothing for ongoing jobs that haven't been stopped
if not job.done():
# do nothing for ongoing jobs that haven't been stopped
if not job.done():
with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)

if not index_attempt:
continue

if not index_attempt.is_finished():
continue

if job.status == "error":
logger.error(job.exception())
if job.status == "error":
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
)

job.release()
break
job.release()
break

task_logger.info(
f"Indexing proxy - finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return


Expand All @@ -446,7 +478,17 @@ def connector_indexing_task(

Returns None if the task did not run (possibly due to a conflict).
Otherwise, returns an int >= 0 representing the number of indexed docs.

NOTE: if an exception is raised out of this task, the primary worker will detect
that the task transitioned to a "READY" state but the generator_complete_key doesn't exist.
This will cause the primary worker to abort the indexing attempt and clean up.
"""
logger.info(
f"Indexing spawned task starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

attempt = None
n_final_progress = 0
Expand Down Expand Up @@ -485,19 +527,19 @@ def connector_indexing_task(
cast(str, fence_json)
)
except ValueError:
task_logger.exception(
logger.exception(
f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}"
)
raise

if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
task_logger.info(
logger.info(
f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}"
)
sleep(1)
continue

task_logger.info(
logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}"
)
break
Expand All @@ -509,7 +551,7 @@ def connector_indexing_task(

acquired = lock.acquire(blocking=False)
if not acquired:
task_logger.warning(
logger.warning(
f"Indexing task already running, exiting...: "
f"cc_pair={cc_pair_id} search_settings={search_settings_id}"
)
Expand Down Expand Up @@ -552,6 +594,13 @@ def connector_indexing_task(
rcs.fence_key, rci.generator_progress_key, lock, r
)

logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

run_indexing_entrypoint(
index_attempt_id,
tenant_id,
Expand All @@ -570,7 +619,12 @@ def connector_indexing_task(

r.set(rci.generator_complete_key, HTTPStatus.OK.value)
except Exception as e:
task_logger.exception(f"Indexing failed: cc_pair={cc_pair_id}")
logger.exception(
f"Indexing spawned task failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
if attempt:
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(attempt, db_session, failure_reason=str(e))
Expand All @@ -584,4 +638,10 @@ def connector_indexing_task(
if lock.owned():
lock.release()

logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return n_final_progress
Loading