13
13
from pydantic import BaseModel
14
14
from redis import Redis
15
15
from redis .lock import Lock as RedisLock
16
+ from sqlalchemy import exists
16
17
from sqlalchemy import select
17
18
from sqlalchemy .orm import Session
18
19
@@ -562,7 +563,7 @@ def check_indexing_completion(
562
563
logger .info (f"Database coordination completed for attempt { index_attempt_id } " )
563
564
564
565
565
- def _existing_index_attempts (
566
+ def active_indexing_attempt (
566
567
cc_pair_id : int ,
567
568
search_settings_id : int ,
568
569
db_session : Session ,
@@ -574,9 +575,9 @@ def _existing_index_attempts(
574
575
575
576
Returns True if there's an active indexing attempt, False otherwise.
576
577
"""
577
- existing_attempts = (
578
- db_session . execute (
579
- select ( IndexAttempt ).where (
578
+ active_indexing_attempt = db_session . execute (
579
+ select (
580
+ exists ( ).where (
580
581
IndexAttempt .connector_credential_pair_id == cc_pair_id ,
581
582
IndexAttempt .search_settings_id == search_settings_id ,
582
583
IndexAttempt .status .in_ (
@@ -587,18 +588,15 @@ def _existing_index_attempts(
587
588
),
588
589
)
589
590
)
590
- .scalars ()
591
- .all ()
592
- )
591
+ ).scalar ()
593
592
594
- if existing_attempts :
593
+ if active_indexing_attempt :
595
594
task_logger .debug (
596
- f"_existing_index_attempts - Skipping due to active indexing attempt: "
597
- f"cc_pair={ cc_pair_id } search_settings={ search_settings_id } "
598
- f"active_attempts={ [a .id for a in existing_attempts ]} "
595
+ f"active_indexing_attempt - Skipping due to active indexing attempt: "
596
+ f"cc_pair={ cc_pair_id } search_settings={ search_settings_id } "
599
597
)
600
- return True
601
- return False
598
+
599
+ return bool ( active_indexing_attempt )
602
600
603
601
604
602
def _kickoff_indexing_tasks (
@@ -621,7 +619,7 @@ def _kickoff_indexing_tasks(
621
619
lock_beat .reacquire ()
622
620
623
621
# Lightweight check prior to fetching cc pair
624
- if _existing_index_attempts (
622
+ if active_indexing_attempt (
625
623
cc_pair_id = cc_pair_id ,
626
624
search_settings_id = search_settings .id ,
627
625
db_session = db_session ,
@@ -685,7 +683,7 @@ def _kickoff_indexing_tasks(
685
683
tenant_id ,
686
684
)
687
685
688
- if attempt_id :
686
+ if attempt_id is not None :
689
687
task_logger .info (
690
688
f"Connector indexing queued: "
691
689
f"index_attempt={ attempt_id } "
@@ -694,7 +692,7 @@ def _kickoff_indexing_tasks(
694
692
)
695
693
tasks_created += 1
696
694
else :
697
- task_logger .info (
695
+ task_logger .error (
698
696
f"Failed to create indexing task: "
699
697
f"cc_pair={ cc_pair .id } "
700
698
f"search_settings={ search_settings .id } "
0 commit comments