@@ -391,7 +391,12 @@ def connector_indexing_proxy_task(
391
391
tenant_id : str | None ,
392
392
) -> None :
393
393
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
394
-
394
+ task_logger .info (
395
+ f"Indexing proxy - starting: attempt={ index_attempt_id } "
396
+ f"tenant={ tenant_id } "
397
+ f"cc_pair={ cc_pair_id } "
398
+ f"search_settings={ search_settings_id } "
399
+ )
395
400
client = SimpleJobClient ()
396
401
397
402
job = client .submit (
@@ -405,29 +410,56 @@ def connector_indexing_proxy_task(
405
410
)
406
411
407
412
if not job :
413
+ task_logger .info (
414
+ f"Indexing proxy - spawn failed: attempt={ index_attempt_id } "
415
+ f"tenant={ tenant_id } "
416
+ f"cc_pair={ cc_pair_id } "
417
+ f"search_settings={ search_settings_id } "
418
+ )
408
419
return
409
420
421
+ task_logger .info (
422
+ f"Indexing proxy - spawn succeeded: attempt={ index_attempt_id } "
423
+ f"tenant={ tenant_id } "
424
+ f"cc_pair={ cc_pair_id } "
425
+ f"search_settings={ search_settings_id } "
426
+ )
427
+
410
428
while True :
411
429
sleep (10 )
412
- with get_session_with_tenant (tenant_id ) as db_session :
413
- index_attempt = get_index_attempt (
414
- db_session = db_session , index_attempt_id = index_attempt_id
415
- )
416
430
417
- # do nothing for ongoing jobs that haven't been stopped
418
- if not job .done ():
431
+ # do nothing for ongoing jobs that haven't been stopped
432
+ if not job .done ():
433
+ with get_session_with_tenant (tenant_id ) as db_session :
434
+ index_attempt = get_index_attempt (
435
+ db_session = db_session , index_attempt_id = index_attempt_id
436
+ )
437
+
419
438
if not index_attempt :
420
439
continue
421
440
422
441
if not index_attempt .is_finished ():
423
442
continue
424
443
425
- if job .status == "error" :
426
- logger .error (job .exception ())
444
+ if job .status == "error" :
445
+ task_logger .error (
446
+ f"Indexing proxy - spawned task exceptioned: "
447
+ f"attempt={ index_attempt_id } "
448
+ f"tenant={ tenant_id } "
449
+ f"cc_pair={ cc_pair_id } "
450
+ f"search_settings={ search_settings_id } "
451
+ f"error={ job .exception ()} "
452
+ )
427
453
428
- job .release ()
429
- break
454
+ job .release ()
455
+ break
430
456
457
+ task_logger .info (
458
+ f"Indexing proxy - finished: attempt={ index_attempt_id } "
459
+ f"tenant={ tenant_id } "
460
+ f"cc_pair={ cc_pair_id } "
461
+ f"search_settings={ search_settings_id } "
462
+ )
431
463
return
432
464
433
465
@@ -449,7 +481,17 @@ def connector_indexing_task(
449
481
450
482
Returns None if the task did not run (possibly due to a conflict).
451
483
Otherwise, returns an int >= 0 representing the number of indexed docs.
484
+
485
+ NOTE: if an exception is raised out of this task, the primary worker will detect
486
+ that the task transitioned to a "READY" state but the generator_complete_key doesn't exist.
487
+ This will cause the primary worker to abort the indexing attempt and clean up.
452
488
"""
489
+ logger .info (
490
+ f"Indexing spawned task starting: attempt={ index_attempt_id } "
491
+ f"tenant={ tenant_id } "
492
+ f"cc_pair={ cc_pair_id } "
493
+ f"search_settings={ search_settings_id } "
494
+ )
453
495
454
496
attempt = None
455
497
n_final_progress = 0
@@ -488,19 +530,19 @@ def connector_indexing_task(
488
530
cast (str , fence_json )
489
531
)
490
532
except ValueError :
491
- task_logger .exception (
533
+ logger .exception (
492
534
f"connector_indexing_task: fence_data not decodeable: fence={ rci .fence_key } "
493
535
)
494
536
raise
495
537
496
538
if fence_data .index_attempt_id is None or fence_data .celery_task_id is None :
497
- task_logger .info (
539
+ logger .info (
498
540
f"connector_indexing_task - Waiting for fence: fence={ rci .fence_key } "
499
541
)
500
542
sleep (1 )
501
543
continue
502
544
503
- task_logger .info (
545
+ logger .info (
504
546
f"connector_indexing_task - Fence found, continuing...: fence={ rci .fence_key } "
505
547
)
506
548
break
@@ -512,7 +554,7 @@ def connector_indexing_task(
512
554
513
555
acquired = lock .acquire (blocking = False )
514
556
if not acquired :
515
- task_logger .warning (
557
+ logger .warning (
516
558
f"Indexing task already running, exiting...: "
517
559
f"cc_pair={ cc_pair_id } search_settings={ search_settings_id } "
518
560
)
@@ -555,6 +597,13 @@ def connector_indexing_task(
555
597
rcs .fence_key , rci .generator_progress_key , lock , r
556
598
)
557
599
600
+ logger .info (
601
+ f"Indexing spawned task running entrypoint: attempt={ index_attempt_id } "
602
+ f"tenant={ tenant_id } "
603
+ f"cc_pair={ cc_pair_id } "
604
+ f"search_settings={ search_settings_id } "
605
+ )
606
+
558
607
run_indexing_entrypoint (
559
608
index_attempt_id ,
560
609
tenant_id ,
@@ -573,7 +622,12 @@ def connector_indexing_task(
573
622
574
623
r .set (rci .generator_complete_key , HTTPStatus .OK .value )
575
624
except Exception as e :
576
- task_logger .exception (f"Indexing failed: cc_pair={ cc_pair_id } " )
625
+ logger .exception (
626
+ f"Indexing spawned task failed: attempt={ index_attempt_id } "
627
+ f"tenant={ tenant_id } "
628
+ f"cc_pair={ cc_pair_id } "
629
+ f"search_settings={ search_settings_id } "
630
+ )
577
631
if attempt :
578
632
with get_session_with_tenant (tenant_id ) as db_session :
579
633
mark_attempt_failed (attempt , db_session , failure_reason = str (e ))
@@ -587,4 +641,10 @@ def connector_indexing_task(
587
641
if lock .owned ():
588
642
lock .release ()
589
643
644
+ logger .info (
645
+ f"Indexing spawned task finished: attempt={ index_attempt_id } "
646
+ f"tenant={ tenant_id } "
647
+ f"cc_pair={ cc_pair_id } "
648
+ f"search_settings={ search_settings_id } "
649
+ )
590
650
return n_final_progress
0 commit comments