80
80
log_memory ,
81
81
to_jsonable ,
82
82
wait_till_path_available ,
83
+ unzip ,
83
84
)
84
85
85
86
logger = logging .getLogger ('openeogeotrellis.deploy.batch_job' )
@@ -367,6 +368,7 @@ def run_job(
367
368
)
368
369
# perform a first metadata write _before_ actually computing the result. This provides a bit more info, even if the job fails.
369
370
write_metadata ({** result_metadata , ** _get_tracker_metadata ("" )}, metadata_file )
371
+ items = {}
370
372
371
373
for result in results :
372
374
result .options ["batch_mode" ] = True
@@ -387,17 +389,16 @@ def run_job(
387
389
ml_model_metadata = result .get_model_metadata (str (output_file ))
388
390
logger .info ("Extracted ml model metadata from %s" % output_file )
389
391
390
- def result_write_assets (result_arg ) -> dict :
392
+ def result_write_assets (result_arg ) -> ( dict , dict ) :
391
393
items = result_arg .write_assets (str (output_file ))
392
394
assets = {
393
395
asset_key : asset for item in items .values () for asset_key , asset in item .get ("assets" , {}).items ()
394
396
}
395
- # TODO: write items to job_metadata.json
396
- return assets
397
+ return assets , items
397
398
398
399
concurrent_save_results = int (job_options .get ("concurrent-save-results" , 1 ))
399
400
if concurrent_save_results == 1 :
400
- assets_metadata = list ( map (result_write_assets , results ))
401
+ assets_metadata , results_items = unzip ( * map (result_write_assets , results ))
401
402
elif concurrent_save_results > 1 :
402
403
with concurrent .futures .ThreadPoolExecutor (max_workers = concurrent_save_results ) as executor :
403
404
futures = []
@@ -406,9 +407,13 @@ def result_write_assets(result_arg) -> dict:
406
407
407
408
for _ in concurrent .futures .as_completed (futures ):
408
409
continue
409
- assets_metadata = list ( map (lambda f : f .result (), futures ))
410
+ assets_metadata , results_items = unzip ( * map (lambda f : f .result (), futures ))
410
411
else :
411
412
raise ValueError (f"Invalid concurrent_save_results: { concurrent_save_results } " )
413
+ assets_metadata = list (assets_metadata )
414
+
415
+ # flattens items for each results into one list
416
+ items = [item for result in results_items for item in result .values ()]
412
417
413
418
for the_assets_metadata in assets_metadata :
414
419
for name , asset in the_assets_metadata .items ():
@@ -491,7 +496,7 @@ def result_write_assets(result_arg) -> dict:
491
496
ml_model_metadata = ml_model_metadata ,
492
497
)
493
498
494
- write_metadata ({** result_metadata , ** _get_tracker_metadata ("" )}, metadata_file )
499
+ write_metadata ({** result_metadata , ** _get_tracker_metadata ("" ), ** { "items" : items } }, metadata_file )
495
500
logger .debug ("Starting GDAL-based retrieval of asset metadata" )
496
501
result_metadata = _assemble_result_metadata (
497
502
tracer = tracer ,
@@ -519,7 +524,7 @@ def result_write_assets(result_arg) -> dict:
519
524
enable_merge = job_options .get ("export-workspace-enable-merge" , False ),
520
525
)
521
526
finally :
522
- write_metadata ({** result_metadata , ** _get_tracker_metadata ("" )}, metadata_file )
527
+ write_metadata ({** result_metadata , ** _get_tracker_metadata ("" ), ** { "items" : items } }, metadata_file )
523
528
524
529
525
530
def write_metadata (metadata : dict , metadata_file : Path ):
0 commit comments