From efb3ed90262f3de5e742942d747cabaaf9ba5183 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 17 Oct 2024 15:45:49 -0700 Subject: [PATCH 1/2] always finalize the serialized transaction so that it doesn't leak outside the function --- .../background/indexing/run_indexing.py | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index c48d07ffd8..3c56e050ca 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -387,30 +387,37 @@ def _prepare_index_attempt( # after the next commit: # https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-isolation-for-individual-transactions db_session.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # type: ignore - if tenant_id is not None: - # Explicitly set the search path for the given tenant - db_session.execute(text(f'SET search_path TO "{tenant_id}"')) - # Verify the search path was set correctly - result = db_session.execute(text("SHOW search_path")) - current_search_path = result.scalar() - logger.info(f"Current search path set to: {current_search_path}") - - attempt = get_index_attempt( - db_session=db_session, - index_attempt_id=index_attempt_id, - ) + try: + if tenant_id is not None: + # Explicitly set the search path for the given tenant + db_session.execute(text(f'SET search_path TO "{tenant_id}"')) + # Verify the search path was set correctly + result = db_session.execute(text("SHOW search_path")) + current_search_path = result.scalar() + logger.info(f"Current search path set to: {current_search_path}") + + attempt = get_index_attempt( + db_session=db_session, + index_attempt_id=index_attempt_id, + ) - if attempt is None: - raise RuntimeError(f"Unable to find IndexAttempt for ID '{index_attempt_id}'") + if attempt is None: + raise RuntimeError( + f"Unable to find IndexAttempt for ID '{index_attempt_id}'" + ) - if attempt.status != IndexingStatus.NOT_STARTED: - raise RuntimeError( - f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. " - f"Current status is '{attempt.status}'." - ) + if attempt.status != IndexingStatus.NOT_STARTED: + raise RuntimeError( + f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. " + f"Current status is '{attempt.status}'." + ) + + mark_attempt_in_progress(attempt, db_session) - # only commit once, to make sure this all happens in a single transaction - mark_attempt_in_progress(attempt, db_session) + # only commit once, to make sure this all happens in a single transaction + db_session.commit() + except Exception: + db_session.rollback() return attempt From b9f27aa5fa55b286c6baba88a22d0af70ea5cc53 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 17 Oct 2024 15:52:53 -0700 Subject: [PATCH 2/2] re-raise the exception and log it --- backend/danswer/background/indexing/run_indexing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 3c56e050ca..b4cfea97a2 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -418,6 +418,8 @@ def _prepare_index_attempt( db_session.commit() except Exception: db_session.rollback() + logger.exception("_prepare_index_attempt exceptioned.") + raise return attempt