|
1 |
| -import gc |
2 | 1 | import os
|
3 | 2 | from concurrent.futures import ThreadPoolExecutor
|
4 | 3 | from datetime import datetime
|
@@ -145,13 +144,6 @@ def _bulk_retrieve_from_salesforce(
|
145 | 144 | proxies=sf_client.proxies,
|
146 | 145 | session=sf_client.session,
|
147 | 146 | )
|
148 |
| - |
149 |
| - # NOTE(rkuo): there are signs this download is allocating large |
150 |
| - # amounts of memory instead of streaming the results to disk. |
151 |
| - # we're doing a gc.collect to try and mitigate this. |
152 |
| - |
153 |
| - # see https://github.com/simple-salesforce/simple-salesforce/issues/428 for a |
154 |
| - # possible solution |
155 | 147 | bulk_2_type = SFBulk2Type(
|
156 | 148 | object_name=sf_type,
|
157 | 149 | bulk2_url=bulk_2_handler.bulk2_url,
|
@@ -180,17 +172,14 @@ def _bulk_retrieve_from_salesforce(
|
180 | 172 | new_file_path = os.path.join(directory, new_filename)
|
181 | 173 | os.rename(original_file_path, new_file_path)
|
182 | 174 | all_download_paths.append(new_file_path)
|
| 175 | + logger.info(f"Downloaded {sf_type} to {all_download_paths}") |
| 176 | + return sf_type, all_download_paths |
183 | 177 | except Exception as e:
|
184 | 178 | logger.error(
|
185 | 179 | f"Failed to download salesforce csv for object type {sf_type}: {e}"
|
186 | 180 | )
|
187 | 181 | logger.warning(f"Exceptioning query for object type {sf_type}: {query}")
|
188 | 182 | return sf_type, None
|
189 |
| - finally: |
190 |
| - gc.collect() |
191 |
| - |
192 |
| - logger.info(f"Downloaded {sf_type} to {all_download_paths}") |
193 |
| - return sf_type, all_download_paths |
194 | 183 |
|
195 | 184 |
|
196 | 185 | def fetch_all_csvs_in_parallel(
|
@@ -240,8 +229,7 @@ def fetch_all_csvs_in_parallel(
|
240 | 229 | time_filter_for_each_object_type[sf_type] = last_modified_time_filter
|
241 | 230 |
|
242 | 231 | # Run the bulk retrieve in parallel
|
243 |
| - # limit to 4 to help with memory usage |
244 |
| - with ThreadPoolExecutor(max_workers=4) as executor: |
| 232 | + with ThreadPoolExecutor() as executor: |
245 | 233 | results = executor.map(
|
246 | 234 | lambda object_type: _bulk_retrieve_from_salesforce(
|
247 | 235 | sf_client=sf_client,
|
|
0 commit comments