Skip to content

Commit f192e07

Browse files
author
Richard Kuo (Onyx)
committed
mitigate memory usage during csv download
1 parent fff701b commit f192e07

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

backend/onyx/connectors/salesforce/connector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ def _fetch_from_salesforce(
253253
)
254254

255255
os.remove(csv_path)
256+
gc.collect()
256257

257258
gc.collect()
258259

backend/onyx/connectors/salesforce/salesforce_calls.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import gc
12
import os
23
from concurrent.futures import ThreadPoolExecutor
34
from datetime import datetime
@@ -144,6 +145,13 @@ def _bulk_retrieve_from_salesforce(
144145
proxies=sf_client.proxies,
145146
session=sf_client.session,
146147
)
148+
149+
# NOTE(rkuo): there are signs this download is allocating large
150+
# amounts of memory instead of streaming the results to disk.
151+
# we're doing a gc.collect to try and mitigate this.
152+
153+
# see https://github.com/simple-salesforce/simple-salesforce/issues/428 for a
154+
# possible solution
147155
bulk_2_type = SFBulk2Type(
148156
object_name=sf_type,
149157
bulk2_url=bulk_2_handler.bulk2_url,
@@ -172,14 +180,17 @@ def _bulk_retrieve_from_salesforce(
172180
new_file_path = os.path.join(directory, new_filename)
173181
os.rename(original_file_path, new_file_path)
174182
all_download_paths.append(new_file_path)
175-
logger.info(f"Downloaded {sf_type} to {all_download_paths}")
176-
return sf_type, all_download_paths
177183
except Exception as e:
178184
logger.error(
179185
f"Failed to download salesforce csv for object type {sf_type}: {e}"
180186
)
181187
logger.warning(f"Exceptioning query for object type {sf_type}: {query}")
182188
return sf_type, None
189+
finally:
190+
gc.collect()
191+
192+
logger.info(f"Downloaded {sf_type} to {all_download_paths}")
193+
return sf_type, all_download_paths
183194

184195

185196
def fetch_all_csvs_in_parallel(
@@ -229,7 +240,8 @@ def fetch_all_csvs_in_parallel(
229240
time_filter_for_each_object_type[sf_type] = last_modified_time_filter
230241

231242
# Run the bulk retrieve in parallel
232-
with ThreadPoolExecutor() as executor:
243+
# limit to 4 to help with memory usage
244+
with ThreadPoolExecutor(max_workers=4) as executor:
233245
results = executor.map(
234246
lambda object_type: _bulk_retrieve_from_salesforce(
235247
sf_client=sf_client,

0 commit comments

Comments
 (0)