From 6baec6eb9a55f43448892b848051303e376daf84 Mon Sep 17 00:00:00 2001 From: Evan Lohn Date: Tue, 6 May 2025 20:00:19 -0700 Subject: [PATCH 1/4] fix slowness --- .../onyx/connectors/google_drive/connector.py | 84 +++++++++++++------ 1 file changed, 60 insertions(+), 24 deletions(-) diff --git a/backend/onyx/connectors/google_drive/connector.py b/backend/onyx/connectors/google_drive/connector.py index 3687596c9a0..e028f44bf50 100644 --- a/backend/onyx/connectors/google_drive/connector.py +++ b/backend/onyx/connectors/google_drive/connector.py @@ -10,6 +10,7 @@ from typing import Protocol from urllib.parse import urlparse +from google.auth.exceptions import RefreshError # type: ignore from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore from googleapiclient.errors import HttpError # type: ignore @@ -66,13 +67,16 @@ from onyx.utils.retry_wrapper import retry_builder from onyx.utils.threadpool_concurrency import parallel_yield from onyx.utils.threadpool_concurrency import run_functions_tuples_in_parallel +from onyx.utils.threadpool_concurrency import run_with_timeout from onyx.utils.threadpool_concurrency import ThreadSafeDict logger = setup_logger() # TODO: Improve this by using the batch utility: https://googleapis.github.io/google-api-python-client/docs/batch.html # All file retrievals could be batched and made at once -BATCHES_PER_CHECKPOINT = 10 +BATCHES_PER_CHECKPOINT = 1 + +DRIVE_BATCH_SIZE = 80 def _extract_str_list_from_comma_str(string: str | None) -> list[str]: @@ -184,8 +188,6 @@ def __init__( "shared_folder_urls, or my_drive_emails" ) - self.batch_size = batch_size - specific_requests_made = False if bool(shared_drive_urls) or bool(my_drive_emails) or bool(shared_folder_urls): specific_requests_made = True @@ -306,14 +308,14 @@ def _get_all_user_emails(self) -> list[str]: return user_emails def get_all_drive_ids(self) -> set[str]: - primary_drive_service = get_drive_service( - creds=self.creds, - user_email=self.primary_admin_email, - ) + return self._get_all_drives_for_user(self.primary_admin_email) + + def _get_all_drives_for_user(self, user_email: str) -> set[str]: + drive_service = get_drive_service(self.creds, user_email) is_service_account = isinstance(self.creds, ServiceAccountCredentials) - all_drive_ids = set() + all_drive_ids: set[str] = set() for drive in execute_paginated_retrieval( - retrieval_function=primary_drive_service.drives().list, + retrieval_function=drive_service.drives().list, list_key="drives", useDomainAdminAccess=is_service_account, fields="drives(id),nextPageToken", @@ -373,6 +375,10 @@ def record_drive_processing(drive_id: str) -> None: if drive_id in self._retrieved_folder_and_drive_ids else DriveIdStatus.AVAILABLE ) + logger.debug( + f"Drive id status: {len(drive_id_status)}, user email: {thread_id}," + f"processed drive ids: {len(completion.processed_drive_ids)}" + ) # wake up other threads waiting for work cv.notify_all() @@ -423,6 +429,7 @@ def _impersonate_user_for_retrieval( curr_stage = checkpoint.completion_map[user_email] resuming = True if curr_stage.stage == DriveRetrievalStage.START: + logger.info(f"Setting stage to {DriveRetrievalStage.MY_DRIVE_FILES.value}") curr_stage.stage = DriveRetrievalStage.MY_DRIVE_FILES resuming = False drive_service = get_drive_service(self.creds, user_email) @@ -430,9 +437,12 @@ def _impersonate_user_for_retrieval( # validate that the user has access to the drive APIs by performing a simple # request and checking for a 401 try: + logger.debug(f"Getting root folder id for user {user_email}") # default is ~17mins of retries, don't do that here for cases so we don't # waste 17mins everytime we run into a user without access to drive APIs - retry_builder(tries=3, delay=1)(get_root_folder_id)(drive_service) + retry_builder(tries=3, delay=1)( + lambda: run_with_timeout(30, get_root_folder_id, drive_service) + )() except HttpError as e: if e.status_code == 401: # fail gracefully, let the other impersonations continue @@ -445,14 +455,31 @@ def _impersonate_user_for_retrieval( curr_stage.stage = DriveRetrievalStage.DONE return raise - + except TimeoutError: + logger.warning( + f"User '{user_email}' timed out when trying to access the drive APIs." + ) + # mark this user as done so we don't try to retrieve anything for them + # again + curr_stage.stage = DriveRetrievalStage.DONE + return + except RefreshError as e: + logger.warning( + f"User '{user_email}' could not refresh their token. Error: {e}" + ) + # mark this user as done so we don't try to retrieve anything for them + # again + curr_stage.stage = DriveRetrievalStage.DONE + return # if we are including my drives, try to get the current user's my # drive if any of the following are true: # - include_my_drives is true # - the current user's email is in the requested emails if curr_stage.stage == DriveRetrievalStage.MY_DRIVE_FILES: if self.include_my_drives or user_email in self._requested_my_drive_emails: - logger.info(f"Getting all files in my drive as '{user_email}'") + logger.info( + f"Getting all files in my drive as '{user_email}. Resuming: {resuming}" + ) yield from add_retrieval_info( get_all_files_in_my_drive_and_shared( @@ -505,7 +532,7 @@ def _yield_from_drive( for drive_id in concurrent_drive_itr(user_email): logger.info( - f"Getting files in shared drive '{drive_id}' as '{user_email}'" + f"Getting files in shared drive '{drive_id}' as '{user_email}. Resuming: {resuming}" ) curr_stage.completed_until = 0 curr_stage.current_folder_or_drive_id = drive_id @@ -593,6 +620,7 @@ def _manage_service_account_retrieval( sorted_drive_ids, sorted_folder_ids = self._determine_retrieval_ids( checkpoint, is_slim, DriveRetrievalStage.MY_DRIVE_FILES ) + all_drive_ids = set(sorted_drive_ids) # Setup initial completion map on first connector run for email in all_org_emails: @@ -602,6 +630,8 @@ def _manage_service_account_retrieval( checkpoint.completion_map[email] = StageCompletion( stage=DriveRetrievalStage.START, completed_until=0, + processed_drive_ids=all_drive_ids + - self._get_all_drives_for_user(email), ) # we've found all users and drives, now time to actually start @@ -627,7 +657,7 @@ def _manage_service_account_retrieval( # to the drive APIs. Without this, we could loop through these emails for # more than 3 hours, causing a timeout and stalling progress. email_batch_takes_us_to_completion = True - MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = 50 + MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = MAX_DRIVE_WORKERS if len(non_completed_org_emails) > MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING: non_completed_org_emails = non_completed_org_emails[ :MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING @@ -871,6 +901,10 @@ def _checkpointed_retrieval( return for file in drive_files: + logger.debug( + f"Updating checkpoint for file: {file.drive_file.get('name')}. " + f"Seen: {file.drive_file.get('id') in checkpoint.all_retrieved_file_ids}" + ) checkpoint.completion_map[file.user_email].update( stage=file.completion_stage, completed_until=datetime.fromisoformat( @@ -1047,24 +1081,22 @@ def _yield_batch( continue files_batch.append(retrieved_file) - if len(files_batch) < self.batch_size: + if len(files_batch) < DRIVE_BATCH_SIZE: continue - logger.info( - f"Yielding batch of {len(files_batch)} files; num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}" - ) yield from _yield_batch(files_batch) files_batch = [] - if batches_complete > BATCHES_PER_CHECKPOINT: - checkpoint.retrieved_folder_and_drive_ids = ( - self._retrieved_folder_and_drive_ids - ) - return # create a new checkpoint - + logger.info( + f"Processing remaining files: {[file.drive_file.get('name') for file in files_batch]}" + ) # Process any remaining files if files_batch: yield from _yield_batch(files_batch) + checkpoint.retrieved_folder_and_drive_ids = ( + self._retrieved_folder_and_drive_ids + ) + except Exception as e: logger.exception(f"Error extracting documents from Google Drive: {e}") raise e @@ -1083,6 +1115,10 @@ def load_from_checkpoint( "Credentials missing, should not call this method before calling load_credentials" ) + logger.info( + f"Loading from checkpoint with completion stage: {checkpoint.completion_stage}," + f"num retrieved ids: {len(checkpoint.all_retrieved_file_ids)}" + ) checkpoint = copy.deepcopy(checkpoint) self._retrieved_folder_and_drive_ids = checkpoint.retrieved_folder_and_drive_ids try: From 14522be1162975a586ecd6611828d71da807ecac Mon Sep 17 00:00:00 2001 From: Evan Lohn Date: Wed, 7 May 2025 09:27:07 -0700 Subject: [PATCH 2/4] no more silent failing for users --- .../onyx/connectors/google_drive/connector.py | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/backend/onyx/connectors/google_drive/connector.py b/backend/onyx/connectors/google_drive/connector.py index e028f44bf50..15215870ce1 100644 --- a/backend/onyx/connectors/google_drive/connector.py +++ b/backend/onyx/connectors/google_drive/connector.py @@ -67,7 +67,6 @@ from onyx.utils.retry_wrapper import retry_builder from onyx.utils.threadpool_concurrency import parallel_yield from onyx.utils.threadpool_concurrency import run_functions_tuples_in_parallel -from onyx.utils.threadpool_concurrency import run_with_timeout from onyx.utils.threadpool_concurrency import ThreadSafeDict logger = setup_logger() @@ -440,9 +439,7 @@ def _impersonate_user_for_retrieval( logger.debug(f"Getting root folder id for user {user_email}") # default is ~17mins of retries, don't do that here for cases so we don't # waste 17mins everytime we run into a user without access to drive APIs - retry_builder(tries=3, delay=1)( - lambda: run_with_timeout(30, get_root_folder_id, drive_service) - )() + retry_builder(tries=3, delay=1)(get_root_folder_id)(drive_service) except HttpError as e: if e.status_code == 401: # fail gracefully, let the other impersonations continue @@ -455,20 +452,20 @@ def _impersonate_user_for_retrieval( curr_stage.stage = DriveRetrievalStage.DONE return raise - except TimeoutError: - logger.warning( - f"User '{user_email}' timed out when trying to access the drive APIs." - ) - # mark this user as done so we don't try to retrieve anything for them - # again - curr_stage.stage = DriveRetrievalStage.DONE - return except RefreshError as e: logger.warning( f"User '{user_email}' could not refresh their token. Error: {e}" ) # mark this user as done so we don't try to retrieve anything for them # again + yield RetrievedDriveFile( + completion_stage=DriveRetrievalStage.DONE, + drive_file={ + "refresh_creds_error": f"Error refreshing credentials for user {user_email}" + }, + user_email=user_email, + error=e, + ) curr_stage.stage = DriveRetrievalStage.DONE return # if we are including my drives, try to get the current user's my @@ -620,7 +617,6 @@ def _manage_service_account_retrieval( sorted_drive_ids, sorted_folder_ids = self._determine_retrieval_ids( checkpoint, is_slim, DriveRetrievalStage.MY_DRIVE_FILES ) - all_drive_ids = set(sorted_drive_ids) # Setup initial completion map on first connector run for email in all_org_emails: @@ -630,8 +626,7 @@ def _manage_service_account_retrieval( checkpoint.completion_map[email] = StageCompletion( stage=DriveRetrievalStage.START, completed_until=0, - processed_drive_ids=all_drive_ids - - self._get_all_drives_for_user(email), + processed_drive_ids=set(), ) # we've found all users and drives, now time to actually start From 6827dc3371c89f0c6bb5d38a4fec59c7128089ee Mon Sep 17 00:00:00 2001 From: Evan Lohn Date: Wed, 7 May 2025 13:11:03 -0700 Subject: [PATCH 3/4] nits --- backend/onyx/connectors/google_drive/connector.py | 11 ++++++++++- .../onyx/connectors/google_drive/doc_conversion.py | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/backend/onyx/connectors/google_drive/connector.py b/backend/onyx/connectors/google_drive/connector.py index 15215870ce1..431b4f68a88 100644 --- a/backend/onyx/connectors/google_drive/connector.py +++ b/backend/onyx/connectors/google_drive/connector.py @@ -461,7 +461,8 @@ def _impersonate_user_for_retrieval( yield RetrievedDriveFile( completion_stage=DriveRetrievalStage.DONE, drive_file={ - "refresh_creds_error": f"Error refreshing credentials for user {user_email}" + "error_type": "refresh_creds_error", + "error_message": f"Error refreshing credentials for user {user_email}", }, user_email=user_email, error=e, @@ -601,6 +602,14 @@ def _manage_service_account_retrieval( start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None, ) -> Iterator[RetrievedDriveFile]: + """ + The current implementation of the service account retrieval does some + initial setup work using the primary admin email, then runs MAX_DRIVE_WORKERS + concurrent threads, each of which impersonates a different user and retrieves + files for that user. Technically, the actual work each thread does is "yield the + next file retrieved by the user", at which point it returns to the thread pool; + see parallel_yield for more details. + """ if checkpoint.completion_stage == DriveRetrievalStage.START: checkpoint.completion_stage = DriveRetrievalStage.USER_EMAILS diff --git a/backend/onyx/connectors/google_drive/doc_conversion.py b/backend/onyx/connectors/google_drive/doc_conversion.py index 22604b2d781..4f2b9820b9a 100644 --- a/backend/onyx/connectors/google_drive/doc_conversion.py +++ b/backend/onyx/connectors/google_drive/doc_conversion.py @@ -327,12 +327,16 @@ def convert_drive_item_to_document( doc_or_failure = _convert_drive_item_to_document( creds, allow_images, size_threshold, retriever_email, file ) + + # There are a variety of permissions-based errors that occasionally occur + # when retrieving files. Often when these occur, there is another user + # that can successfully retrieve the file, so we try the next user. if ( doc_or_failure is None or isinstance(doc_or_failure, Document) or not ( isinstance(doc_or_failure.exception, HttpError) - and doc_or_failure.exception.status_code in [403, 404] + and doc_or_failure.exception.status_code in [401, 403, 404] ) ): return doc_or_failure From 1ae822cde63c0f650116cbb7ed5fb62b5a77cab9 Mon Sep 17 00:00:00 2001 From: Evan Lohn Date: Wed, 7 May 2025 14:50:32 -0700 Subject: [PATCH 4/4] no silly info transfer --- backend/onyx/connectors/google_drive/connector.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/backend/onyx/connectors/google_drive/connector.py b/backend/onyx/connectors/google_drive/connector.py index 431b4f68a88..1bf726a9845 100644 --- a/backend/onyx/connectors/google_drive/connector.py +++ b/backend/onyx/connectors/google_drive/connector.py @@ -460,10 +460,7 @@ def _impersonate_user_for_retrieval( # again yield RetrievedDriveFile( completion_stage=DriveRetrievalStage.DONE, - drive_file={ - "error_type": "refresh_creds_error", - "error_message": f"Error refreshing credentials for user {user_email}", - }, + drive_file={}, user_email=user_email, error=e, )