Skip to content

Commit 4f08696

Browse files
rkuo-danswerRichard Kuo (Onyx)
authored andcommitted
restructure to signal activity while processing (onyx-dot-app#4712)
Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
1 parent 461f458 commit 4f08696

File tree

2 files changed

+41
-53
lines changed

2 files changed

+41
-53
lines changed

backend/onyx/connectors/salesforce/connector.py

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -150,56 +150,6 @@ def _download_object_csvs(
150150

151151
logger.info(f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}")
152152

153-
@staticmethod
154-
def _load_csvs_to_db(csv_directory: str, sf_db: OnyxSalesforceSQLite) -> set[str]:
155-
updated_ids: set[str] = set()
156-
157-
object_type_to_csv_path = SalesforceConnector.reconstruct_object_types(
158-
csv_directory
159-
)
160-
161-
# NOTE(rkuo): this timing note is meaningless without a reference point in terms
162-
# of number of records, etc
163-
# This takes like 10 seconds
164-
165-
# This is for testing the rest of the functionality if data has
166-
# already been fetched and put in sqlite
167-
# from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type
168-
# for object_type in self.parent_object_list:
169-
# updated_ids.update(list(find_ids_by_type(object_type)))
170-
171-
# This takes 10-70 minutes first time (idk why the range is so big)
172-
total_types = len(object_type_to_csv_path)
173-
logger.info(f"Starting to process {total_types} object types")
174-
175-
for i, (object_type, csv_paths) in enumerate(
176-
object_type_to_csv_path.items(), 1
177-
):
178-
logger.info(f"Processing object type {object_type} ({i}/{total_types})")
179-
# If path is None, it means it failed to fetch the csv
180-
if csv_paths is None:
181-
continue
182-
183-
# Go through each csv path and use it to update the db
184-
for csv_path in csv_paths:
185-
logger.debug(
186-
f"Processing CSV: object_type={object_type} "
187-
f"csv={csv_path} "
188-
f"len={Path(csv_path).stat().st_size}"
189-
)
190-
new_ids = sf_db.update_from_csv(
191-
object_type=object_type,
192-
csv_download_path=csv_path,
193-
)
194-
updated_ids.update(new_ids)
195-
logger.debug(
196-
f"Added {len(new_ids)} new/updated records for {object_type}"
197-
)
198-
199-
os.remove(csv_path)
200-
201-
return updated_ids
202-
203153
@staticmethod
204154
def _get_all_types(parent_types: list[str], sf_client: Salesforce) -> set[str]:
205155
all_types: set[str] = set(parent_types)
@@ -236,6 +186,7 @@ def _fetch_from_salesforce(
236186

237187
updated_ids: set[str] = set()
238188
docs_processed = 0
189+
docs_to_yield: list[Document] = []
239190

240191
sf_db = OnyxSalesforceSQLite(os.path.join(temp_dir, "salesforce_db.sqlite"))
241192
sf_db.connect()
@@ -266,7 +217,43 @@ def _fetch_from_salesforce(
266217
gc.collect()
267218

268219
# Step 2 - load CSV's to sqlite
269-
updated_ids = SalesforceConnector._load_csvs_to_db(temp_dir, sf_db)
220+
object_type_to_csv_paths = SalesforceConnector.reconstruct_object_types(
221+
temp_dir
222+
)
223+
224+
total_types = len(object_type_to_csv_paths)
225+
logger.info(f"Starting to process {total_types} object types")
226+
227+
for i, (object_type, csv_paths) in enumerate(
228+
object_type_to_csv_paths.items(), 1
229+
):
230+
logger.info(f"Processing object type {object_type} ({i}/{total_types})")
231+
# If path is None, it means it failed to fetch the csv
232+
if csv_paths is None:
233+
continue
234+
235+
# Go through each csv path and use it to update the db
236+
for csv_path in csv_paths:
237+
logger.debug(
238+
f"Processing CSV: object_type={object_type} "
239+
f"csv={csv_path} "
240+
f"len={Path(csv_path).stat().st_size}"
241+
)
242+
243+
# yield an empty list to keep the connector alive
244+
yield docs_to_yield
245+
246+
new_ids = sf_db.update_from_csv(
247+
object_type=object_type,
248+
csv_download_path=csv_path,
249+
)
250+
updated_ids.update(new_ids)
251+
logger.debug(
252+
f"Added {len(new_ids)} new/updated records for {object_type}"
253+
)
254+
255+
os.remove(csv_path)
256+
270257
gc.collect()
271258

272259
logger.info(f"Found {len(updated_ids)} total updated records")
@@ -276,7 +263,6 @@ def _fetch_from_salesforce(
276263

277264
# Step 3 - extract and index docs
278265
batches_processed = 0
279-
docs_to_yield: list[Document] = []
280266
docs_to_yield_bytes = 0
281267

282268
# Takes 15-20 seconds per batch

backend/onyx/file_processing/image_summarization.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ def _summarize_image(
103103
return message_to_string(llm.invoke(messages))
104104

105105
except Exception as e:
106-
raise ValueError(f"Summarization failed. Messages: {messages}") from e
106+
error_msg = f"Summarization failed. Messages: {messages}"
107+
error_msg = error_msg[:1024]
108+
raise ValueError(error_msg) from e
107109

108110

109111
def _encode_image_for_llm_prompt(image_data: bytes) -> str:

0 commit comments

Comments
 (0)