Skip to content

handle items instead of files #1141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ff8590a
saveRDDTemporalAllowAssetPerBand: handle items instead of files
bossie Apr 8, 2025
e6b5204
Merge branch 'master' into 1111-unify-asset-keys-across-stac-items_it…
bossie Apr 10, 2025
0e27071
Merge branch 'master' into 1111-unify-asset-keys-across-stac-items_it…
bossie Apr 10, 2025
b895948
expose items in job_metadata.json #1111
bossie Apr 10, 2025
f69d5f1
saveRDDTemporalAllowAssetPerBand: test band IDs and asset keys #1111
bossie Apr 10, 2025
7409ad1
saveStitchedTileGridTemporal/saveSamples: handle items instead of fil…
bossie Apr 11, 2025
33d626b
saveRDDAllowAssetPerBand: handle items instead of files #1111
bossie Apr 14, 2025
4ee8682
saveStitched: handle item instead of file #1111
bossie Apr 14, 2025
321bf7e
saveStitchedTileGrid: handle items instead of files #1111
bossie Apr 14, 2025
f504e6c
reference PR version of geotrellis-extensions to fix build #1111
bossie Apr 15, 2025
44ce15e
Merge branch 'master' into 1111-unify-asset-keys-across-stac-items_it…
bossie Apr 15, 2025
c3f51b0
catalog format: return items instead of files #1111
bossie Apr 15, 2025
dc08f70
tile_grid from spatial cube: handle items #1111
bossie Apr 15, 2025
75e69da
relax tests for unset "datetime #1111
bossie Apr 15, 2025
61ad5c5
fix PNG output #1111
bossie Apr 15, 2025
a9f73b4
fix synchronous requests #1111
bossie Apr 15, 2025
07f825e
netCDF: handle items instead of files #1111
bossie Apr 16, 2025
369eb88
netCDF: update/fix some batch job tests #1111
bossie Apr 16, 2025
b0ba816
netCDF: address TODO and update/fix more batch job tests #1111
bossie Apr 16, 2025
72479b8
update test to illustrate FIXME #1111
bossie Apr 16, 2025
73fa2bb
Merge branch 'master' into 1111-unify-asset-keys-across-stac-items_it…
bossie Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
log_memory,
to_jsonable,
wait_till_path_available,
unzip,
)

logger = logging.getLogger('openeogeotrellis.deploy.batch_job')
Expand Down Expand Up @@ -272,6 +273,7 @@ def run_job(
result_metadata = None

tracker_metadata = {}
items = []

try:
# We actually expect type Path, but in reality paths as strings tend to
Expand Down Expand Up @@ -389,12 +391,16 @@ def run_job(
ml_model_metadata = result.get_model_metadata(str(output_file))
logger.info("Extracted ml model metadata from %s" % output_file)

def result_write_assets(result_arg) -> dict:
return result_arg.write_assets(str(output_file))
def result_write_assets(result_arg) -> (dict, dict):
items = result_arg.write_assets(str(output_file))
assets = {
asset_key: asset for item in items.values() for asset_key, asset in item.get("assets", {}).items()
}
return assets, items

concurrent_save_results = int(job_options.get("concurrent-save-results", 1))
if concurrent_save_results == 1:
assets_metadata = list(map(result_write_assets, results))
assets_metadata, results_items = unzip(*map(result_write_assets, results))
elif concurrent_save_results > 1:
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_save_results) as executor:
futures = []
Expand All @@ -403,9 +409,13 @@ def result_write_assets(result_arg) -> dict:

for _ in concurrent.futures.as_completed(futures):
continue
assets_metadata = list(map(lambda f: f.result(), futures))
assets_metadata, results_items = unzip(*map(lambda f: f.result(), futures))
else:
raise ValueError(f"Invalid concurrent_save_results: {concurrent_save_results}")
assets_metadata = list(assets_metadata)

# flattens items for each results into one list
items = [item for result in results_items for item in result.values()]

for the_assets_metadata in assets_metadata:
for name, asset in the_assets_metadata.items():
Expand Down Expand Up @@ -495,7 +505,7 @@ def result_write_assets(result_arg) -> dict:
raise ValueError(
f"sar_backscatter: Too many soft errors ({soft_errors} > {max_soft_errors_ratio})"
)
write_metadata({**result_metadata, **tracker_metadata}, metadata_file)
write_metadata({**result_metadata, **tracker_metadata, **{"items": items}}, metadata_file)
logger.debug("Starting GDAL-based retrieval of asset metadata")
result_metadata = _assemble_result_metadata(
tracer=tracer,
Expand Down Expand Up @@ -525,7 +535,7 @@ def result_write_assets(result_arg) -> dict:
finally:
if tracker_metadata is None:
tracker_metadata = _get_tracker_metadata("")
write_metadata({**result_metadata, **tracker_metadata}, metadata_file)
write_metadata({**result_metadata, **tracker_metadata, **{"items": items}}, metadata_file)


def write_metadata(metadata: dict, metadata_file: Path):
Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/deploy/batch_job_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _assemble_result_metadata(
output_file: Path,
unique_process_ids: Set[str],
apply_gdal,
asset_metadata: Dict = None,
asset_metadata: Dict = None, # TODO: include "items" instead of "assets"
ml_model_metadata: Dict = None,
) -> dict:
metadata = extract_result_metadata(tracer)
Expand Down
Loading