diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f59e0e8..c3e4084f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and start a new "In Progress" section above it. - Have `integrations.s3` for interaction with Object Storage that follows the S3 API. - `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232)) +- `DiskWorkspace`: support unified asset keys ([Open-EO/openeo-geopyspark-driver#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111)) ## 0.134.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 7fd448da..28322a79 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.135.0a4" +__version__ = "0.135.0a5" diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 2746e365..c150c831 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -84,35 +84,50 @@ def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str: return str(Path(parent_dir) / target.name) def item_func(item: Item, parent_dir: str) -> str: - # prevent items/assets of 2 adjacent Collection documents from interfering with each other: + # item ID can be a relative_asset_path but does not have to be + unique_item_filename = item.id.replace("/", "_") + # prevent items/assets of 2 adjacent Collection documents from interfering with each other; # unlike an object storage object, a Collection file cannot act as a parent "directory" as well - return f"{parent_dir}/{target.name}_items/{item.id}.json" + return f"{parent_dir}/{target.name}_items/{unique_item_filename}.json" return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func) - def replace_asset_href(asset_key: str, asset: Asset) -> Asset: - if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? + def replace_asset_href(asset: Asset, src_collection_path: Path) -> Asset: + # pystac will handle STAC but not underlying assets; set asset hrefs up front + asset_uri_parts = urlparse(asset.get_absolute_href()) + if asset_uri_parts.scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}") + absolute_asset_path = Path(asset_uri_parts.path) # TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs; # it ends up in the asset metadata on disk - asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() - asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path + asset.extra_fields["_original_absolute_path"] = str(absolute_asset_path) + relative_asset_path = absolute_asset_path.relative_to(src_collection_path.parent) + asset.href = str(relative_asset_path) # relative to item document return asset + new_collection_path = Path(new_collection.get_self_href()) + if not existing_collection: new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) - new_collection = new_collection.map_assets(replace_asset_href) + new_collection = new_collection.map_assets( + lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path) + ) new_collection.save(CatalogType.SELF_CONTAINED) for new_item in new_collection.get_items(): for asset in new_item.get_assets().values(): - file_operation( - asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent) - ) + relative_asset_path = asset.href + asset_parent_dir = ( + Path(new_collection.get_self_href()).parent / f"{target.name}_items" / relative_asset_path + ).parent + asset_parent_dir.mkdir(parents=True, exist_ok=True) # asset might not end up next to item + file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir)) else: merged_collection = _merge_collection_metadata(existing_collection, new_collection) - new_collection = new_collection.map_assets(replace_asset_href) + new_collection = new_collection.map_assets( + lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path) + ) for new_item in new_collection.get_items(): new_item.clear_links() # sever ties with previous collection @@ -123,9 +138,14 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset: for new_item in new_collection.get_items(): for asset in new_item.get_assets().values(): - file_operation( - asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent - ) + relative_asset_path = asset.href + asset_parent_dir = ( + Path(merged_collection.get_self_href()).parent + / f"{target.name}_items" + / relative_asset_path + ).parent + asset_parent_dir.mkdir(parents=True, exist_ok=True) + file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir)) for item in new_collection.get_items(): for asset in item.assets.values(): diff --git a/tests/test_workspace.py b/tests/test_workspace.py index d0fd9fa0..eec49014 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -66,8 +66,8 @@ def test_merge_from_disk_new(tmp_path): assert _paths_relative_to(workspace_dir) == { Path("collection.json"), + Path("collection.json_items") / "ASSET.TIF.json", Path("collection.json_items") / "asset.tif", - Path("collection.json_items") / "asset.tif.json", } assert isinstance(imported_collection, Collection) @@ -121,9 +121,9 @@ def test_merge_from_disk_into_existing(tmp_path): assert _paths_relative_to(workspace_dir) == { Path("collection.json"), Path("collection.json_items") / "asset1.tif", - Path("collection.json_items") / "asset1.tif.json", + Path("collection.json_items") / "ASSET1.TIF.json", Path("collection.json_items") / "asset2.tif", - Path("collection.json_items") / "asset2.tif.json", + Path("collection.json_items") / "ASSET2.TIF.json", } assert isinstance(imported_collection, Collection) @@ -191,18 +191,18 @@ def asset_contents(collection_filename: str): return f.read() workspace.merge(collection1, target=Path("collection1.json")) - assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n" + assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n" # put collection2 next to collection1 workspace.merge(collection2, target=Path("collection2.json")) - assert asset_contents(collection_filename="collection2.json") == "collection2-asset1.tif-asset1.tif\n" + assert asset_contents(collection_filename="collection2.json") == "collection2-ASSET1.TIF-asset1.tif\n" # separate collection files assert (workspace.root_directory / "collection1.json").exists() assert (workspace.root_directory / "collection2.json").exists() # collection2 should not overwrite collection1's items/assets - assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n" + assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n" def _collection( @@ -221,10 +221,13 @@ def _collection( # note: filepath_per_band behavior is tested in e.g. openeo-geopyspark-driver's # test_batch_result.test_export_workspace_merge_filepath_per_band - item = Item(id=asset_filename, geometry=None, bbox=None, datetime=now_utc(), properties={}) + item_id = asset_filename.upper() # different from asset_filename: unique yet predictable for the tests + item = Item(id=item_id, geometry=None, bbox=None, datetime=now_utc(), properties={}) - asset_path = root_path / item.id / asset_filename - asset = Asset(href=asset_path.name) # relative to item + # TODO: implementation assumes that relative asset path is a sibling of the collection file so this setup mirrors + # that; is it possible to assert this or even avoid entirely? + asset_path = root_path / asset_filename + asset = Asset(href=str(asset_path.absolute())) item.add_asset(key=asset_filename, asset=asset) collection.add_item(item)