67
67
from onyx .utils .retry_wrapper import retry_builder
68
68
from onyx .utils .threadpool_concurrency import parallel_yield
69
69
from onyx .utils .threadpool_concurrency import run_functions_tuples_in_parallel
70
- from onyx .utils .threadpool_concurrency import run_with_timeout
71
70
from onyx .utils .threadpool_concurrency import ThreadSafeDict
72
71
73
72
logger = setup_logger ()
@@ -440,9 +439,7 @@ def _impersonate_user_for_retrieval(
440
439
logger .debug (f"Getting root folder id for user { user_email } " )
441
440
# default is ~17mins of retries, don't do that here for cases so we don't
442
441
# waste 17mins everytime we run into a user without access to drive APIs
443
- retry_builder (tries = 3 , delay = 1 )(
444
- lambda : run_with_timeout (30 , get_root_folder_id , drive_service )
445
- )()
442
+ retry_builder (tries = 3 , delay = 1 )(get_root_folder_id )(drive_service )
446
443
except HttpError as e :
447
444
if e .status_code == 401 :
448
445
# fail gracefully, let the other impersonations continue
@@ -455,20 +452,20 @@ def _impersonate_user_for_retrieval(
455
452
curr_stage .stage = DriveRetrievalStage .DONE
456
453
return
457
454
raise
458
- except TimeoutError :
459
- logger .warning (
460
- f"User '{ user_email } ' timed out when trying to access the drive APIs."
461
- )
462
- # mark this user as done so we don't try to retrieve anything for them
463
- # again
464
- curr_stage .stage = DriveRetrievalStage .DONE
465
- return
466
455
except RefreshError as e :
467
456
logger .warning (
468
457
f"User '{ user_email } ' could not refresh their token. Error: { e } "
469
458
)
470
459
# mark this user as done so we don't try to retrieve anything for them
471
460
# again
461
+ yield RetrievedDriveFile (
462
+ completion_stage = DriveRetrievalStage .DONE ,
463
+ drive_file = {
464
+ "refresh_creds_error" : f"Error refreshing credentials for user { user_email } "
465
+ },
466
+ user_email = user_email ,
467
+ error = e ,
468
+ )
472
469
curr_stage .stage = DriveRetrievalStage .DONE
473
470
return
474
471
# if we are including my drives, try to get the current user's my
@@ -620,7 +617,6 @@ def _manage_service_account_retrieval(
620
617
sorted_drive_ids , sorted_folder_ids = self ._determine_retrieval_ids (
621
618
checkpoint , is_slim , DriveRetrievalStage .MY_DRIVE_FILES
622
619
)
623
- all_drive_ids = set (sorted_drive_ids )
624
620
625
621
# Setup initial completion map on first connector run
626
622
for email in all_org_emails :
@@ -630,8 +626,7 @@ def _manage_service_account_retrieval(
630
626
checkpoint .completion_map [email ] = StageCompletion (
631
627
stage = DriveRetrievalStage .START ,
632
628
completed_until = 0 ,
633
- processed_drive_ids = all_drive_ids
634
- - self ._get_all_drives_for_user (email ),
629
+ processed_drive_ids = set (),
635
630
)
636
631
637
632
# we've found all users and drives, now time to actually start
0 commit comments