diff --git a/openeogeotrellis/deploy/batch_job.py b/openeogeotrellis/deploy/batch_job.py index fc080b699..9b4d6b2e8 100644 --- a/openeogeotrellis/deploy/batch_job.py +++ b/openeogeotrellis/deploy/batch_job.py @@ -80,6 +80,7 @@ log_memory, to_jsonable, wait_till_path_available, + unzip, ) logger = logging.getLogger('openeogeotrellis.deploy.batch_job') @@ -272,6 +273,7 @@ def run_job( result_metadata = None tracker_metadata = {} + items = [] try: # We actually expect type Path, but in reality paths as strings tend to @@ -389,12 +391,16 @@ def run_job( ml_model_metadata = result.get_model_metadata(str(output_file)) logger.info("Extracted ml model metadata from %s" % output_file) - def result_write_assets(result_arg) -> dict: - return result_arg.write_assets(str(output_file)) + def result_write_assets(result_arg) -> (dict, dict): + items = result_arg.write_assets(str(output_file)) + assets = { + asset_key: asset for item in items.values() for asset_key, asset in item.get("assets", {}).items() + } + return assets, items concurrent_save_results = int(job_options.get("concurrent-save-results", 1)) if concurrent_save_results == 1: - assets_metadata = list(map(result_write_assets, results)) + assets_metadata, results_items = unzip(*map(result_write_assets, results)) elif concurrent_save_results > 1: with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_save_results) as executor: futures = [] @@ -403,9 +409,13 @@ def result_write_assets(result_arg) -> dict: for _ in concurrent.futures.as_completed(futures): continue - assets_metadata = list(map(lambda f: f.result(), futures)) + assets_metadata, results_items = unzip(*map(lambda f: f.result(), futures)) else: raise ValueError(f"Invalid concurrent_save_results: {concurrent_save_results}") + assets_metadata = list(assets_metadata) + + # flattens items for each results into one list + items = [item for result in results_items for item in result.values()] for the_assets_metadata in assets_metadata: for name, asset in the_assets_metadata.items(): @@ -495,7 +505,7 @@ def result_write_assets(result_arg) -> dict: raise ValueError( f"sar_backscatter: Too many soft errors ({soft_errors} > {max_soft_errors_ratio})" ) - write_metadata({**result_metadata, **tracker_metadata}, metadata_file) + write_metadata({**result_metadata, **tracker_metadata, **{"items": items}}, metadata_file) logger.debug("Starting GDAL-based retrieval of asset metadata") result_metadata = _assemble_result_metadata( tracer=tracer, @@ -525,7 +535,7 @@ def result_write_assets(result_arg) -> dict: finally: if tracker_metadata is None: tracker_metadata = _get_tracker_metadata("") - write_metadata({**result_metadata, **tracker_metadata}, metadata_file) + write_metadata({**result_metadata, **tracker_metadata, **{"items": items}}, metadata_file) def write_metadata(metadata: dict, metadata_file: Path): diff --git a/openeogeotrellis/deploy/batch_job_metadata.py b/openeogeotrellis/deploy/batch_job_metadata.py index b791864b7..cf57250df 100644 --- a/openeogeotrellis/deploy/batch_job_metadata.py +++ b/openeogeotrellis/deploy/batch_job_metadata.py @@ -36,7 +36,7 @@ def _assemble_result_metadata( output_file: Path, unique_process_ids: Set[str], apply_gdal, - asset_metadata: Dict = None, + asset_metadata: Dict = None, # TODO: include "items" instead of "assets" ml_model_metadata: Dict = None, ) -> dict: metadata = extract_result_metadata(tracer) diff --git a/openeogeotrellis/geopysparkdatacube.py b/openeogeotrellis/geopysparkdatacube.py index 4ca22487d..272f462a6 100644 --- a/openeogeotrellis/geopysparkdatacube.py +++ b/openeogeotrellis/geopysparkdatacube.py @@ -7,6 +7,7 @@ import pathlib import subprocess import tempfile +import uuid from datetime import datetime, date from functools import partial from typing import Dict, List, Union, Tuple, Iterable, Callable, Optional @@ -1759,7 +1760,8 @@ def _to_xarray(self): @callsite def save_result(self, filename: Union[str, pathlib.Path], format: str, format_options: dict = None) -> str: result = self.write_assets(filename, format, format_options) - return result.popitem()[1]['href'] + assets = [asset for item in result.values() for asset in item["assets"].values()] + return assets[0]["href"] def write_assets(self, filename: Union[str, pathlib.Path], format: str, format_options: dict = None) -> Dict: """ @@ -1804,27 +1806,31 @@ def to_latlng_bbox(bbox: "Extent") -> Tuple[float, float, float, float]: return latlng_extent.xmin, latlng_extent.ymin, latlng_extent.xmax, latlng_extent.ymax - def return_netcdf_assets(asset_paths, bands, nodata): - assets = {} - for asset in asset_paths: - if isinstance(asset, str): # TODO: for backwards compatibility, remove eventually (#646) - path = asset - extent = None - else: - path = asset._1() - extent = asset._2() - name = os.path.basename(path) - assets[name] = { - "href": str(path), - "type": "application/x-netcdf", - "roles": ["data"], + def return_netcdf_items(java_items, bands, nodata) -> dict: + items = {} + + for java_item in java_items: + assets = {} + extent = java_item.bbox() + + for asset_key, asset in java_item.assets().items(): + assets[asset_key] = { + "href": asset.path(), + "type": "application/x-netcdf", + "roles": ["data"], + "nodata": nodata, + } + if bands is not None: + assets[asset_key]["bands"] = bands + + items[java_item.id()] = { + "id": java_item.id(), "bbox": to_latlng_bbox(extent) if extent else None, "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(extent))) if extent else None, - "nodata": nodata, + "assets": assets, } - if bands is not None: - assets[name]["bands"] = bands - return assets + + return items if self.metadata.spatial_extent and strict_cropping: bbox = self.metadata.spatial_extent @@ -1931,36 +1937,70 @@ def add_gdalinfo_objects(assets_original): elif stitch: if tile_grid: _log.info("save_result save_stitched_tile_grid") - tiles = self._save_stitched_tile_grid(max_level, save_filename, tile_grid, crop_bounds, - zlevel=zlevel, filename_prefix=filename_prefix) - - # noinspection PyProtectedMember - return add_gdalinfo_objects( - { - str(pathlib.Path(tile._1()).name): { - "href": tile._1(), - "bbox": to_latlng_bbox(tile._2()), - "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(tile._2()))), - "type": "image/tiff; application=geotiff", - "roles": ["data"], - } - for tile in tiles - } + java_items = self._save_stitched_tile_grid( + max_level, + save_filename, + tile_grid, + crop_bounds, + zlevel=zlevel, + filename_prefix=filename_prefix, ) - else: - _log.info("save_result save_stitched") - bbox = self._save_stitched(max_level, save_filename, crop_bounds, zlevel=zlevel) - return add_gdalinfo_objects( - { - str(pathlib.Path(filename).name): { - "href": save_filename, + + items = {} + + for java_item in java_items: + bbox = java_item.bbox() + assets = {} + + for asset_key, asset in java_item.assets().items(): + assets[asset_key] = { + "href": asset.path(), "bbox": to_latlng_bbox(bbox), "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), "type": "image/tiff; application=geotiff", "roles": ["data"], } + + assets = add_gdalinfo_objects(assets) + + item = { + "id": java_item.id(), + "properties": {"datetime": java_item.datetime()}, + "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), + "bbox": to_latlng_bbox(bbox), + "assets": assets, } - ) + + items[java_item.id()] = item + + return items + else: + _log.info("save_result save_stitched") + java_item = self._save_stitched(max_level, save_filename, crop_bounds, zlevel=zlevel) + + bbox = java_item.bbox() + assets = {} + + for asset_key, asset in java_item.assets().items(): + assets[asset_key] = { + "href": save_filename, + "bbox": to_latlng_bbox(bbox), + "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), + "type": "image/tiff; application=geotiff", + "roles": ["data"], + } + + assets = add_gdalinfo_objects(assets) + + item = { + "id": java_item.id(), + "properties": {"datetime": java_item.datetime()}, + "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), + "bbox": to_latlng_bbox(bbox), + "assets": assets, + } + + return {java_item.id(): item} else: _log.info("save_result: saveRDD") gtiff_options = get_jvm().org.openeo.geotrellis.geotiff.GTiffOptions() @@ -2028,11 +2068,14 @@ def add_gdalinfo_objects(assets_original): compression = get_jvm().geotrellis.raster.io.geotiff.compression.DeflateCompression( zlevel) - band_indices_per_file = None if tile_grid: - timestamped_paths = (get_jvm() - .org.openeo.geotrellis.geotiff.package.saveStitchedTileGridTemporal( - max_level_rdd, save_directory, tile_grid, compression, filename_prefix)) + java_items = get_jvm().org.openeo.geotrellis.geotiff.package.saveStitchedTileGridTemporal( + max_level_rdd, + save_directory, + tile_grid, + compression, + filename_prefix, + ) elif sample_by_feature: if separate_asset_per_band: raise OpenEOApiException( @@ -2045,11 +2088,11 @@ def add_gdalinfo_objects(assets_original): geometries = GeometryCollection(geometries.geoms) projected_polygons = to_projected_polygons(get_jvm(), geometries) labels = self.get_labels(geometries,feature_id_property) - timestamped_paths = get_jvm().org.openeo.geotrellis.geotiff.package.saveSamples( + java_items = get_jvm().org.openeo.geotrellis.geotiff.package.saveSamples( max_level_rdd, save_directory, projected_polygons, labels, compression, filename_prefix) else: - timestamped_paths = ( + java_items = ( get_jvm().org.openeo.geotrellis.geotiff.package.saveRDDTemporalAllowAssetPerBand( max_level_rdd, save_directory, @@ -2058,50 +2101,58 @@ def add_gdalinfo_objects(assets_original): gtiff_options, ) ) - band_indices_per_file = [tup._4() for tup in timestamped_paths] - assets = {} + # TODO: introduce feature flag + items = {} - # noinspection PyProtectedMember - # TODO: contains a bbox so rename - timestamped_paths = [(timestamped_path._1(), timestamped_path._2(), timestamped_path._3()) - for timestamped_path in timestamped_paths] - for index, (path, timestamp, bbox) in enumerate(timestamped_paths): - assets[str(pathlib.Path(path).name)] = { - "href": str(path), - "type": "image/tiff; application=geotiff", - "roles": ["data"], - "bands": ( - [band for i, band in enumerate(bands) if i in band_indices_per_file[index]] - if band_indices_per_file - else bands - ), - "nodata": nodata, - "datetime": timestamp, - "bbox": to_latlng_bbox(bbox), + for java_item in java_items: + assets = {} + + stac_datetime = java_item.datetime() + bbox = java_item.bbox() + + for asset_key, asset in java_item.assets().items(): + path = asset.path() + band_indices = asset.bandIndices() + assets[asset_key] = { + "href": str(path), + "type": "image/tiff; application=geotiff", + "roles": ["data"], + "bands": ( + [band for i, band in enumerate(bands) if i in band_indices] + if band_indices + else bands + ), + "nodata": nodata, + "datetime": stac_datetime, + "bbox": to_latlng_bbox(bbox), + "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), + } + assets = add_gdalinfo_objects(assets) + + item = { + "id": java_item.id(), + "properties": {"datetime": stac_datetime}, "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), + "bbox": to_latlng_bbox(bbox), + "assets": assets, } - return add_gdalinfo_objects(assets) + + items[java_item.id()] = item + + return items # TODO: retain backwards compatibility else: if tile_grid: - tiles = self._save_stitched_tile_grid(max_level, str(save_filename), tile_grid, crop_bounds, - zlevel=zlevel, filename_prefix=filename_prefix) - - # noinspection PyProtectedMember - return add_gdalinfo_objects( - { - str(pathlib.Path(tile._1()).name): { - "href": tile._1(), - "bbox": to_latlng_bbox(tile._2()), - "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(tile._2()))), - "type": "image/tiff; application=geotiff", - "roles": ["data"], - } - for tile in tiles - } + java_items = self._save_stitched_tile_grid( + max_level, + str(save_filename), + tile_grid, + crop_bounds, + zlevel=zlevel, + filename_prefix=filename_prefix, ) else: - paths_tuples = get_jvm().org.openeo.geotrellis.geotiff.package.saveRDDAllowAssetPerBand( + java_items = get_jvm().org.openeo.geotrellis.geotiff.package.saveRDDAllowAssetPerBand( max_level_rdd, band_count, str(save_filename), @@ -2110,22 +2161,41 @@ def add_gdalinfo_objects(assets_original): gtiff_options, ) + items = {} + for java_item in java_items: assets = {} - for path, bbox, band_indices in map(to_tuple, paths_tuples): - file_name = str(pathlib.Path(path).relative_to(save_directory)) - assets[file_name] = { + + bbox = java_item.bbox() + + for asset_key, asset in java_item.assets().items(): + path = asset.path() + band_indices = asset.bandIndices() + + assets[asset_key] = { "href": str(path), "type": "image/tiff; application=geotiff", "roles": ["data"], - "bands": [band for i, band in enumerate(bands) if i in band_indices], "nodata": nodata, } + if band_indices is not None: + assets[asset_key]["bands"] = [ + band for i, band in enumerate(bands) if i in band_indices + ], if bbox: - assets[file_name]["bbox"] = to_latlng_bbox(bbox) - assets[file_name]["geometry"] = mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))) + assets[asset_key]["bbox"] = to_latlng_bbox(bbox) + assets[asset_key]["geometry"] = mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))) + + assets = add_gdalinfo_objects(assets) + item = { + "id": java_item.id(), + "geometry": mapping(Polygon.from_bounds(*to_latlng_bbox(bbox))), + "bbox": to_latlng_bbox(bbox), + "assets": assets, + } - return add_gdalinfo_objects(assets) + items[java_item.id()] = item + return items else: if not save_filename.endswith(".png"): save_filename = save_filename + ".png" @@ -2139,11 +2209,18 @@ def add_gdalinfo_objects(assets_original): get_jvm().org.openeo.geotrellis.png.package.saveStitched(max_level.srdd.rdd(), save_filename, crop_extent, png_options) else: get_jvm().org.openeo.geotrellis.png.package.saveStitched(max_level.srdd.rdd(), save_filename, png_options) + + item_id = str(uuid.uuid4()) return { - str(pathlib.Path(save_filename).name): { - "href": save_filename, - "type": "image/png", - "roles": ["data"] + item_id: { + "id": item_id, + "assets": { + "openEO": { + "href": save_filename, + "type": "image/png", + "roles": ["data"] + } + } } } @@ -2168,9 +2245,9 @@ def add_gdalinfo_objects(assets_original): geometries = GeometryCollection(geometries.geoms) projected_polygons = to_projected_polygons(get_jvm(), geometries) labels = self.get_labels(geometries,feature_id_property) - if(max_level.layer_type != gps.LayerType.SPATIAL): + if max_level.layer_type != gps.LayerType.SPATIAL: _log.debug(f"projected_polygons carries {len(projected_polygons.polygons())} polygons") - asset_paths = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSamples( + java_items = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSamples( max_level.srdd.rdd(), save_directory, projected_polygons, @@ -2181,7 +2258,7 @@ def add_gdalinfo_objects(assets_original): filename_prefix, ) else: - asset_paths = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSamplesSpatial( + java_items = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSamplesSpatial( max_level.srdd.rdd(), save_directory, projected_polygons, @@ -2192,7 +2269,7 @@ def add_gdalinfo_objects(assets_original): filename_prefix, ) - return return_netcdf_assets(asset_paths, bands, nodata) + return return_netcdf_items(java_items, bands, nodata) else: originalName = pathlib.Path(filename) filename_tmp = format_options.get("filename_prefix", "openEO") + ".nc" if originalName.name == "out" else originalName.name @@ -2205,25 +2282,23 @@ def add_gdalinfo_objects(assets_original): options.setAttributes(global_metadata) options.setZLevel(zlevel) options.setCropBounds(crop_extent) - asset_paths = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters( + java_items = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.writeRasters( max_level.srdd.rdd(), filename,options ) else: - if(max_level.layer_type != gps.LayerType.SPATIAL): - asset_paths = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSingleNetCDF(max_level.srdd.rdd(), - filename, - band_names, - dim_names,global_metadata,zlevel + if max_level.layer_type != gps.LayerType.SPATIAL: + java_items = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSingleNetCDF( + max_level.srdd.rdd(), filename, band_names, dim_names, global_metadata, zlevel ) else: - asset_paths = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSingleNetCDFSpatial( + java_items = get_jvm().org.openeo.geotrellis.netcdf.NetCDFRDDWriter.saveSingleNetCDFSpatial( max_level.srdd.rdd(), filename, band_names, dim_names, global_metadata, zlevel ) - return return_netcdf_assets(asset_paths, bands, nodata) + return return_netcdf_items(java_items, bands, nodata) else: if not tiled: @@ -2236,11 +2311,20 @@ def add_gdalinfo_objects(assets_original): asset = { "href": filename, "roles": ["data"], - "type": "application/x-netcdf" + "type": "application/x-netcdf", } if bands is not None: asset["bands"] = bands - return {filename_tmp: asset} + + item_id = str(uuid.uuid4()) + return { + item_id: { + "id": item_id, + "assets": { + "openEO": asset, + }, + } + } elif format == "JSON": # saving to json, this is potentially big in memory @@ -2270,7 +2354,22 @@ def add_gdalinfo_objects(assets_original): message="Format {f!r} is not supported".format(f=format), code="FormatUnsupported", status_code=400 ) - return {str(os.path.basename(filename)): {"href": filename, "roles": ["data"]}} + + item_id = str(uuid.uuid4()) + return { + item_id: { + "id": item_id, + "properties": {"datetime": None}, + "geometry": None, + "bbox": None, + "assets": { + "openEO": { + "href": filename, + "roles": ["data"], + }, + }, + }, + } def get_labels(self, geometries, feature_id_property=None): # TODO: return more descriptive labels/ids than these autoincrement strings (when possible)? diff --git a/openeogeotrellis/integrations/gdal.py b/openeogeotrellis/integrations/gdal.py index 326a7f8a7..7840858b3 100644 --- a/openeogeotrellis/integrations/gdal.py +++ b/openeogeotrellis/integrations/gdal.py @@ -170,7 +170,7 @@ def _extract_gdal_asset_raster_metadata( asset_md, job_dir, ) - for asset_path, asset_md in asset_metadata.items() + for asset_path, asset_md in asset_metadata.items() # FIXME: this is asset key rather than asset path if "roles" not in asset_md or "data" in asset_md.get("roles") ] results = exec_parallel_with_fallback(_get_metadata_callback, argument_tuples) diff --git a/scripts/get-jars.py b/scripts/get-jars.py index 5164d2f75..327eb943e 100755 --- a/scripts/get-jars.py +++ b/scripts/get-jars.py @@ -62,8 +62,8 @@ def main(): for url in [ # TODO: list these URLs in a simple text/CSV file so it can be consumed by other tools too? - "https://artifactory.vgt.vito.be/artifactory/libs-snapshot-public/org/openeo/geotrellis-extensions/2.4.0_2.12-SNAPSHOT/geotrellis-extensions-2.4.0_2.12-SNAPSHOT.jar", - "https://artifactory.vgt.vito.be/artifactory/libs-snapshot-public/org/openeo/openeo-logging/2.4.0_2.12-SNAPSHOT/openeo-logging-2.4.0_2.12-SNAPSHOT.jar", + "https://artifactory.vgt.vito.be/artifactory/libs-release-public/org/openeo/geotrellis-extensions/PR-410/geotrellis-extensions-PR-410.jar", + "https://artifactory.vgt.vito.be/artifactory/libs-release-public/org/openeo/openeo-logging/PR-410/openeo-logging-PR-410.jar", ]: download_jar(jar_dir, url=url, force=force_download) diff --git a/tests/test_batch_result.py b/tests/test_batch_result.py index 9963f4515..158eb4d58 100644 --- a/tests/test_batch_result.py +++ b/tests/test_batch_result.py @@ -78,7 +78,7 @@ def test_png_export(tmp_path): assert metadata["start_datetime"] == "2021-01-05T00:00:00Z" assets = metadata["assets"] assert len(assets) == 1 - assert assets["out.png"] + assert assets["openEO"] for asset in assets: theAsset = assets[asset] @@ -177,11 +177,9 @@ def test_ep3899_netcdf_no_bands(tmp_path): with metadata_file.open() as f: metadata = json.load(f) assert metadata["start_datetime"] == "2021-01-01T00:00:00Z" - assets = metadata["assets"] + assets = [asset for item in metadata["items"] for asset in item["assets"].values()] assert len(assets) == 1 - for asset in assets: - theAsset = assets[asset] - + for theAsset in assets: assert 'application/x-netcdf' == theAsset['type'] href = theAsset['href'] from osgeo.gdal import Info @@ -194,7 +192,6 @@ def test_ep3899_netcdf_no_bands(tmp_path): @pytest.mark.parametrize("prefix", [None, "prefixTest"]) def test_ep3874_sample_by_feature_filter_spatial_inline_geojson(prefix, tmp_path): - print("tmp_path: ", tmp_path) job_spec = {"process_graph":{ "lc": { "process_id": "load_collection", @@ -237,30 +234,32 @@ def test_ep3874_sample_by_feature_filter_spatial_inline_geojson(prefix, tmp_path "result": True, } }} - metadata_file = tmp_path / "metadata.json" + job_dir = tmp_path + metadata_file = job_dir / "metadata.json" run_job( job_spec, - output_file=tmp_path / "out", + output_file=job_dir / "out", metadata_file=metadata_file, api_version="1.0.0", - job_dir=ensure_dir(tmp_path / "job_dir"), + job_dir=job_dir, dependencies={}, user_id="jenkins", ) with metadata_file.open() as f: metadata = json.load(f) assert metadata["start_datetime"] == "2021-01-04T00:00:00Z" - assets = metadata["assets"] + assets = [(asset_key, asset) for item in metadata["items"] for asset_key, asset in item["assets"].items()] assert len(assets) == 2 + assert assets[0][0] == assets[1][0] == "openEO" + asset_filenames = [Path(asset["href"]).name for _, asset in assets] if prefix: - assert assets[prefix + "_22.nc"] - assert assets[prefix + "_myTextId.nc"] + assert f"{prefix}_22.nc" in asset_filenames + assert f"{prefix}_myTextId.nc" in asset_filenames else: - assert assets["openEO_22.nc"] - assert assets["openEO_myTextId.nc"] + assert "openEO_22.nc" in asset_filenames + assert "openEO_myTextId.nc" in asset_filenames - for asset in assets: - theAsset = assets[asset] + for _, theAsset in assets: bands = [Band(**b) for b in theAsset["bands"]] assert len(bands) == 1 da = xarray.open_dataset(theAsset['href'], engine='h5netcdf') @@ -269,7 +268,7 @@ def test_ep3874_sample_by_feature_filter_spatial_inline_geojson(prefix, tmp_path @pytest.mark.parametrize( - ["from_node", "expected_names"], + ["from_node", "expected_filenames"], [ ( "loadcollection_sentinel2", @@ -283,7 +282,7 @@ def test_ep3874_sample_by_feature_filter_spatial_inline_geojson(prefix, tmp_path ("reducedimension_temporal", {"openEO_TileRow.tif", "openEO_TileCol.tif"}), ], ) -def test_separate_asset_per_band(tmp_path, from_node, expected_names): +def test_separate_asset_per_band(tmp_path, from_node, expected_filenames): job_spec = { "process_graph": { "loadcollection_sentinel2": { @@ -337,13 +336,12 @@ def test_separate_asset_per_band(tmp_path, from_node, expected_names): with metadata_file.open() as f: metadata = json.load(f) assert metadata["start_datetime"] == "2021-06-01T00:00:00Z" - assets = metadata["assets"] + assets = [(asset_key, asset) for item in metadata["items"] for asset_key, asset in item["assets"].items()] # get file names as set: - asset_names = set(assets.keys()) - assert asset_names == expected_names + asset_filenames = {Path(asset["href"]).name for _, asset in assets} + assert asset_filenames == expected_filenames - for asset_key in assets: - asset = assets[asset_key] + for _, asset in assets: assert len(asset["bands"]) == 1 assert len(asset["raster:bands"]) == 1 assert asset["bands"][0]["name"] == asset["raster:bands"][0]["name"] @@ -375,7 +373,9 @@ def test_separate_asset_per_band_throw(tmp_path): "parameters": [], } metadata_file = tmp_path / "metadata.json" - with pytest.raises(OpenEOApiException): + with pytest.raises( + OpenEOApiException, match="separate_asset_per_band is only supported with format GTIFF. Was: NETCDF" + ): run_job( job_spec, output_file=tmp_path / "out", @@ -442,15 +442,15 @@ def test_sample_by_feature_filter_spatial_vector_cube_from_load_url(tmp_path): # Check result metadata with metadata_file.open() as f: result_metadata = json.load(f) - assets = result_metadata["assets"] + assets = [asset for item in result_metadata["items"] for asset in item["assets"].values()] assert len(assets) == 4 # Check asset contents asset_minima = {} - for name, asset_metadata in assets.items(): + for asset_metadata in assets: assert asset_metadata["bands"] == [{"name": "Longitude"}] ds = xarray.open_dataset(asset_metadata["href"]) - asset_minima[name] = ds["Longitude"].min().item() + asset_minima[Path(asset_metadata["href"]).name] = ds["Longitude"].min().item() assert asset_minima == { "openEO_0.nc": 1.0, @@ -790,17 +790,20 @@ def test_spatial_cube_to_netcdf_sample_by_feature(tmp_path): assert metadata["start_datetime"] == "2021-01-04T00:00:00Z" assert metadata["end_datetime"] == "2021-01-06T00:00:00Z" - # expected: 2 assets with bboxes that correspond to the input features - assets = metadata["assets"] - assert len(assets) == 2 + # expected: 2 items with bboxes that correspond to the input features + items = metadata["items"] + assert len(items) == 2 - assert assets["openEO_0.nc"]["bbox"] == [0.1, 0.1, 1.8, 1.8] - assert (shape(assets["openEO_0.nc"]["geometry"]).normalize() - .almost_equals(Polygon.from_bounds(0.1, 0.1, 1.8, 1.8).normalize())) + item0 = [item for item in items for asset in item["assets"].values() if asset["href"].endswith("/openEO_0.nc")][0] - assert assets["openEO_1.nc"]["bbox"] == [0.725, -1.29, 2.99, 1.724] - assert (shape(assets["openEO_1.nc"]["geometry"]).normalize() - .almost_equals(Polygon.from_bounds(0.725, -1.29, 2.99, 1.724).normalize())) + assert item0["bbox"] == [0.1, 0.1, 1.8, 1.8] + assert shape(item0["geometry"]).normalize().almost_equals(Polygon.from_bounds(0.1, 0.1, 1.8, 1.8).normalize()) + + item1 = [item for item in items for asset in item["assets"].values() if asset["href"].endswith("/openEO_1.nc")][0] + assert item1["bbox"] == [0.725, -1.29, 2.99, 1.724] + assert ( + shape(item1["geometry"]).normalize().almost_equals(Polygon.from_bounds(0.725, -1.29, 2.99, 1.724).normalize()) + ) def test_multiple_time_series_results(tmp_path): @@ -2604,3 +2607,639 @@ def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: assert block_shape == (expected_tile_size, expected_tile_size) assert_cog(output_tiff) + + +@pytest.mark.parametrize( + ["separate_asset_per_band", "expected_tiff_files", "expected_asset_keys"], + [ + (False, {"openEO_2025-04-05Z.tif", "openEO_2025-04-15Z.tif"}, {"openEO"}), + ( + True, + { + "openEO_2025-04-05Z_Flat:0.tif", + "openEO_2025-04-05Z_Flat:1.tif", + "openEO_2025-04-05Z_Flat:2.tif", + "openEO_2025-04-15Z_Flat:0.tif", + "openEO_2025-04-15Z_Flat:1.tif", + "openEO_2025-04-15Z_Flat:2.tif", + }, + {"openEO_Flat:0", "openEO_Flat:1", "openEO_Flat:2"}, + ), + ], +) +def test_unified_asset_keys_spatiotemporal_geotiff( + tmp_path, separate_asset_per_band, expected_tiff_files, expected_asset_keys +): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 5, + "north": 55, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "load2"}, + "format": "GTIFF", + "options": {"separate_asset_per_band": separate_asset_per_band}, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tif")} + assert tiff_files == expected_tiff_files + + with open(metadata_file) as f: + items = json.load(f)["items"] + + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 2 + assert len({item["id"] for item in items}) == 2 + assert {item["properties"]["datetime"] for item in items} == {"2025-04-05T00:00:00Z", "2025-04-15T00:00:00Z"} + + for item in items: + assert set(item["assets"].keys()) == expected_asset_keys + + +def test_unified_asset_keys_tile_grid(tmp_path): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 2, + "north": 51, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "load2"}, + "format": "GTIFF", + "options": {"tile_grid": "wgs84-1degree"}, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tif")} + assert tiff_files == { + "openEO_2025-04-05Z_N50E000.tif", + "openEO_2025-04-05Z_N50E001.tif", + "openEO_2025-04-15Z_N50E000.tif", + "openEO_2025-04-15Z_N50E001.tif", + } + + with open(metadata_file) as f: + items = json.load(f)["items"] + + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 4 + assert len({item["id"] for item in items}) == 4 + assert {item["properties"]["datetime"] for item in items} == {"2025-04-05T00:00:00Z", "2025-04-15T00:00:00Z"} + + for item in items: + assert set(item["assets"].keys()) == {"openEO"} + + +def test_unified_asset_keys_tile_grid_spatial(tmp_path): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 2, + "north": 51, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "reducedimension1": { + "process_id": "reduce_dimension", + "arguments": { + "data": {"from_node": "load2"}, + "dimension": "t", + "reducer": { + "process_graph": { + "first1": { + "process_id": "first", + "arguments": {"data": {"from_parameter": "data"}}, + "result": True, + } + } + }, + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "reducedimension1"}, + "format": "GTIFF", + "options": {"tile_grid": "wgs84-1degree"}, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tiff")} + assert tiff_files == { + "openEO-N50E000.tiff", + "openEO-N50E001.tiff", + } + + with open(metadata_file) as f: + job_metadata = json.load(f) + + items = job_metadata["items"] + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 2 + assert len({item["id"] for item in items}) == 2 + + # at job-level rather than on Item + assert {item.get("properties", {}).get("datetime") for item in items} == {None} + assert job_metadata["start_datetime"] == "2025-04-01T00:00:00Z" + assert job_metadata["end_datetime"] == "2025-04-21T00:00:00Z" + + for item in items: + assert set(item["assets"].keys()) == {"openEO"} + + +def test_unified_asset_keys_sample_by_feature(tmp_path): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "filterspatial1": { + "process_id": "filter_spatial", + "arguments": { + "data": {"from_node": "load2"}, + "geometries": { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "properties": {}, + "geometry": { + "type": "Polygon", + "coordinates": [[[0, 50], [0, 51], [1, 51], [1, 50], [0, 50]]], + }, + }, + { + "type": "Feature", + "properties": {}, + "geometry": { + "type": "Polygon", + "coordinates": [[[1, 50], [1, 51], [2, 51], [2, 50], [1, 50]]], + }, + }, + ], + }, + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "filterspatial1"}, + "format": "GTIFF", + "options": {"sample_by_feature": True}, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tif")} + assert tiff_files == { + "openEO_2025-04-05Z_0.tif", + "openEO_2025-04-05Z_1.tif", + "openEO_2025-04-15Z_0.tif", + "openEO_2025-04-15Z_1.tif", + } + + with open(metadata_file) as f: + items = json.load(f)["items"] + + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 4 + assert len({item["id"] for item in items}) == 4 + assert {item["properties"]["datetime"] for item in items} == {"2025-04-05T00:00:00Z", "2025-04-15T00:00:00Z"} + + for item in items: + assert set(item["assets"].keys()) == {"openEO"} + + +@pytest.mark.parametrize( + ["separate_asset_per_band", "expected_tiff_files", "expected_asset_keys"], + [ + (False, {"openEO.tif"}, {"openEO"}), + ( + True, + { + "openEO_Flat:0.tif", + "openEO_Flat:1.tif", + "openEO_Flat:2.tif", + }, + {"openEO_Flat:0", "openEO_Flat:1", "openEO_Flat:2"}, + ), + ], +) +def test_unified_asset_keys_spatial_geotiff( + tmp_path, separate_asset_per_band, expected_tiff_files, expected_asset_keys +): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 5, + "north": 55, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "reducedimension1": { + "process_id": "reduce_dimension", + "arguments": { + "data": {"from_node": "load2"}, + "dimension": "t", + "reducer": { + "process_graph": { + "first1": { + "process_id": "first", + "arguments": {"data": {"from_parameter": "data"}}, + "result": True, + } + } + }, + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "reducedimension1"}, + "format": "GTIFF", + "options": {"separate_asset_per_band": separate_asset_per_band}, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tif")} + assert tiff_files == expected_tiff_files + + with open(metadata_file) as f: + job_metadata = json.load(f) + + items = job_metadata["items"] + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 1 + # single item ID can be anything (no spatial or temporal references) + + # at job-level rather than on Item + assert items[0].get("properties", {}).get("datetime") is None + assert job_metadata["start_datetime"] == "2025-04-01T00:00:00Z" + assert job_metadata["end_datetime"] == "2025-04-21T00:00:00Z" + + assert set(items[0]["assets"].keys()) == expected_asset_keys + + +def test_unified_asset_keys_stitch_geotiff(tmp_path): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 5, + "north": 55, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "load2"}, + "format": "GTIFF", + "options": {"stitch": True}, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + assert "out" in os.listdir(job_dir) + + with open(metadata_file) as f: + job_metadata = json.load(f) + + items = job_metadata["items"] + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 1 + # single item ID can be anything (no spatial or temporal references) + + # at job-level rather than on Item + assert items[0].get("properties", {}).get("datetime") is None + assert job_metadata["start_datetime"] == "2025-04-01T00:00:00Z" + assert job_metadata["end_datetime"] == "2025-04-21T00:00:00Z" + + assert set(items[0]["assets"].keys()) == {"openEO"} + + +def test_unified_asset_keys_stitch_tile_grid(tmp_path): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 2, + "north": 51, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "load2"}, + "format": "GTIFF", + "options": { + "stitch": True, + "tile_grid": "wgs84-1degree", + }, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "out", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tiff")} + assert tiff_files == { + "openEO-N50E000.tiff", + "openEO-N50E001.tiff", + } + + with open(metadata_file) as f: + job_metadata = json.load(f) + + items = job_metadata["items"] + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 2 + assert len({item["id"] for item in items}) == 2 + + # at job-level rather than on Item + assert {item.get("properties", {}).get("datetime") for item in items} == {None} + assert job_metadata["start_datetime"] == "2025-04-01T00:00:00Z" + assert job_metadata["end_datetime"] == "2025-04-21T00:00:00Z" + + for item in items: + assert set(item["assets"].keys()) == {"openEO"} + + +def test_unified_asset_keys_catalog(tmp_path): + process_graph = { + "load2": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "Flat:0", + "Flat:1", + "Flat:2", + ], + "id": "TestCollection-LonLat16x16", + "spatial_extent": { + "west": 0, + "south": 50, + "east": 2, + "north": 51, + }, + "temporal_extent": ["2025-04-01", "2025-04-21"], + }, + }, + "save1": { + "process_id": "save_result", + "arguments": { + "data": {"from_node": "load2"}, + "format": "GTIFF", + "options": { + "parameters": {"catalog": True}, + }, + }, + "result": True, + }, + } + + process = { + "process_graph": process_graph, + } + + job_dir = tmp_path + metadata_file = job_dir / "job_metadata.json" + + run_job( + process, + output_file=job_dir / "catalog.tiff", + metadata_file=metadata_file, + api_version="2.0.0", + job_dir=job_dir, + dependencies=[], + ) + + tiff_files = {file for file in os.listdir(job_dir) if file.endswith(".tiff")} + assert tiff_files == {"catalog.tiff"} + + with open(metadata_file) as f: + job_metadata = json.load(f) + + items = job_metadata["items"] + print(f"items={json.dumps(items, indent=2)}") + + assert len(items) == 1 + item = items[0] + + # at job-level rather than on Item + assert item.get("properties", {}).get("datetime") == None + assert job_metadata["start_datetime"] == "2025-04-01T00:00:00Z" + assert job_metadata["end_datetime"] == "2025-04-21T00:00:00Z" + + assert set(item["assets"].keys()) == {"openEO"} diff --git a/tests/test_download.py b/tests/test_download.py index 64c21fea0..4c4089b35 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -232,11 +232,10 @@ def test_write_assets_samples_tile_grid_batch(self, tmp_path): @pytest.mark.parametrize("space_type", ["spacetime", "spatial"]) @pytest.mark.parametrize("stitch", [False, True]) @pytest.mark.parametrize("catalog", [False, True]) - @pytest.mark.parametrize("sample_by_feature", [False, True]) @pytest.mark.parametrize("format_arg", ["netCDF"]) # "GTIFF" behaves different from "netCDF", so not testing now def test_write_assets_parameterize_batch(self, tmp_path, imagecollection_with_two_bands_and_three_dates, imagecollection_with_two_bands_spatial_only, - format_arg, sample_by_feature, catalog, stitch, space_type, + format_arg, catalog, stitch, space_type, tile_grid, filename_prefix): d = locals() d = {i: d[i] for i in d if i != 'self' and i != "tmp_path" and i != "d"} @@ -248,7 +247,7 @@ def test_write_assets_parameterize_batch(self, tmp_path, imagecollection_with_tw imagecollection = imagecollection_with_two_bands_spatial_only geometries = geojson_to_geometry(self.features) - assets = imagecollection.write_assets( + items = imagecollection.write_assets( str(tmp_path / "ignored<\0>.extension"), # null byte to cause error if filename would be written to fs format=format_arg, format_options={ @@ -261,6 +260,8 @@ def test_write_assets_parameterize_batch(self, tmp_path, imagecollection_with_tw "tile_grid": tile_grid, } ) + + assets = [(asset_key, asset) for item in items.values() for asset_key, asset in item["assets"].items()] with open(self.test_write_assets_parameterize_batch_path + test_name + ".json", 'w') as fp: json.dump(assets, fp, indent=2) @@ -269,13 +270,15 @@ def test_write_assets_parameterize_batch(self, tmp_path, imagecollection_with_tw else: extension = ".tif" assert len(assets) >= 3 - assert len(assets) <= geometries.length + assert len(assets) <= geometries.length # a netCDF asset contains all dates + assert {asset_key for asset_key, _ in assets} == {"openEO"} + asset_filenames = {Path(asset["href"]).name for _, asset in assets} if format_arg == "netCDF": if filename_prefix: - assert assets[filename_prefix + "_0" + extension] + assert filename_prefix + "_0" + extension in asset_filenames else: - assert assets["openEO_0" + extension] - name, asset = next(iter(assets.items())) + assert "openEO_0" + extension in asset_filenames + _, asset = assets[0] assert Path(asset['href']).parent == tmp_path if filename_prefix: assert filename_prefix in asset['href'] @@ -340,7 +343,7 @@ def test_write_assets_parameterize(self, tmp_path, imagecollection_with_two_band assert False filename = "test_download_result" + extension geometries = geojson_to_geometry(self.features) - assets_all = imagecollection.write_assets( + items_all = imagecollection.write_assets( str(tmp_path / filename), format=format_arg, format_options={ @@ -365,10 +368,11 @@ def test_write_assets_parameterize(self, tmp_path, imagecollection_with_two_band # with open(self.test_write_assets_parameterize_path + test_name + ".json", 'w') as fp: # json.dump(assets, fp, indent=2) - assets_data = {k: v for (k, v) in assets_all.items() if "data" in v["roles"]} - name, asset = next(iter(assets_data.items())) + assets_all = [(asset_key, asset) for item in items_all.values() for asset_key, asset in item["assets"].items()] + assets_data = [(asset_key, asset) for asset_key, asset in assets_all if "data" in asset["roles"]] + name, asset = assets_data[0] print("href of first asset: " + asset["href"]) - assets_metadata = {k: v for (k, v) in assets_all.items() if "data" not in v["roles"]} + assets_metadata = [(asset_key, asset) for asset_key, asset in assets_all if "data" not in asset["roles"]] if format_arg == "GTIFF" and not catalog: if attach_gdalinfo_assets: assert len(assets_metadata) == len(assets_data) @@ -376,7 +380,7 @@ def test_write_assets_parameterize(self, tmp_path, imagecollection_with_two_band assert len(assets_metadata) == 0 if len(assets_data) == 1: - assert assets_data[filename] + assert assets_data[0][0] == "openEO" assert filename in asset['href'] else: if filename_prefix: