6
6
The main process loop for `psimulate` runs.
7
7
8
8
"""
9
- import atexit
9
+ from collections import defaultdict
10
10
from pathlib import Path
11
11
from time import sleep , time
12
12
@@ -34,10 +34,11 @@ def process_job_results(
34
34
existing_outputs : pd .DataFrame ,
35
35
output_directory : Path ,
36
36
no_batch : bool ,
37
- ) -> None :
37
+ ) -> defaultdict :
38
38
written_results = existing_outputs
39
39
unwritten_results = []
40
40
batch_size = 0 if no_batch else 200
41
+ status = defaultdict (int )
41
42
42
43
logger .info ("Entering main processing loop." )
43
44
start_time = time ()
@@ -54,7 +55,7 @@ def process_job_results(
54
55
batch_size ,
55
56
)
56
57
57
- registry_manager .update_and_report ()
58
+ status = registry_manager .update_and_report ()
58
59
logger .info (f"Unwritten results: { len (unwritten_results )} " )
59
60
logger .info (f"Elapsed time: { (time () - start_time )/ 60 :.1f} minutes." )
60
61
finally :
@@ -68,6 +69,7 @@ def process_job_results(
68
69
)
69
70
logger .info (f"Unwritten results: { len (unwritten_results )} " )
70
71
logger .info (f"Elapsed time: { (time () - start_time ) / 60 :.1f} minutes." )
72
+ return status
71
73
72
74
73
75
def load_existing_outputs (result_path : Path , restart : bool ) -> pd .DataFrame :
@@ -238,7 +240,7 @@ def main(
238
240
# Enter the main monitoring and processing loop, which will check on
239
241
# all the queues periodically, report status updates, and gather
240
242
# and write results when they are available.
241
- process_job_results (
243
+ status = process_job_results (
242
244
registry_manager = registry_manager ,
243
245
existing_outputs = existing_outputs ,
244
246
output_directory = output_paths .root ,
@@ -248,4 +250,14 @@ def main(
248
250
# Spit out a performance report for the workers.
249
251
try_run_vipin (output_paths .worker_logging_root )
250
252
251
- logger .info (f"Jobs completed. Results written to: { str (output_paths .root )} " )
253
+ # Emit warning if any jobs failed
254
+ if status ["failed" ] > 0 :
255
+ logger .warning (
256
+ f"*** NOTE: There { 'was' if status ['failed' ] == 1 else 'were' } "
257
+ f"{ status ['failed' ]} failed job{ '' if status ['failed' ] == 1 else 's' } . ***"
258
+ )
259
+
260
+ logger .info (
261
+ f"{ status ['finished' ]} of { status ['total' ]} jobs completed successfully. "
262
+ f"Results written to: { str (output_paths .root )} "
263
+ )
0 commit comments