From 302d50f8b959545ec71a0299edce942e4cda2642 Mon Sep 17 00:00:00 2001 From: elienVandermaesenVITO Date: Mon, 16 Jun 2025 09:39:14 +0200 Subject: [PATCH 1/3] issue 1111 merge new stac --- openeo_driver/workspace.py | 26 ++++++++++++++++++-------- tests/test_workspace.py | 10 ++++++---- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 2746e365..9c51da59 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -90,29 +90,37 @@ def item_func(item: Item, parent_dir: str) -> str: return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func) - def replace_asset_href(asset_key: str, asset: Asset) -> Asset: + def replace_asset_href(asset_key: str, asset: Asset, collection_href:str) -> Asset: if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}") # TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs; # it ends up in the asset metadata on disk - asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() - asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path + asset_href = asset.get_absolute_href() + asset.extra_fields["_original_absolute_href"] = asset_href + if asset_href.startswith("s3"): + asset.href = Path(asset_href).name + else: + common_path = os.path.commonpath([asset_href,collection_href]) + asset.href = os.path.relpath(asset_href,common_path) return asset + collection_href = new_collection.get_self_href() if not existing_collection: new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) - new_collection = new_collection.map_assets(replace_asset_href) + new_collection = new_collection.map_assets(lambda k,v: replace_asset_href(k,v,collection_href)) new_collection.save(CatalogType.SELF_CONTAINED) for new_item in new_collection.get_items(): for asset in new_item.get_assets().values(): + asset_path = Path(new_item.get_self_href()).parent / Path(asset.href).parent + asset_path.mkdir(parents=True) file_operation( - asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent) + asset.extra_fields["_original_absolute_href"], str(asset_path) ) else: merged_collection = _merge_collection_metadata(existing_collection, new_collection) - new_collection = new_collection.map_assets(replace_asset_href) + new_collection = new_collection.map_assets(lambda k,v: replace_asset_href(k,v,collection_href)) for new_item in new_collection.get_items(): new_item.clear_links() # sever ties with previous collection @@ -123,13 +131,15 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset: for new_item in new_collection.get_items(): for asset in new_item.get_assets().values(): + asset_path = Path(new_item.get_self_href()).parent / Path(asset.href).parent + asset_path.mkdir(parents=True) file_operation( - asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent + asset.extra_fields["_original_absolute_href"], str(asset_path) ) for item in new_collection.get_items(): for asset in item.assets.values(): - workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}" + workspace_uri = f"file:{Path(item.get_self_href()).parent / asset.href}" asset.extra_fields["alternate"] = {"file": workspace_uri} return new_collection diff --git a/tests/test_workspace.py b/tests/test_workspace.py index c3011768..6c229846 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -67,7 +67,7 @@ def test_merge_from_disk_new(tmp_path): for asset_key, asset in item.get_assets().items() } assert asset_workspace_uris == { - "asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif'}" + "asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif' / 'asset.tif'}" } # load it again @@ -81,7 +81,8 @@ def test_merge_from_disk_new(tmp_path): for item in exported_collection.get_items(): for asset in item.get_assets().values(): - assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent + item_path_parts = Path(item.get_self_href()).parent.parts + assert item_path_parts == Path(asset.get_absolute_href()).parts[0:len(item_path_parts)] def test_merge_from_disk_into_existing(tmp_path): @@ -115,7 +116,7 @@ def test_merge_from_disk_into_existing(tmp_path): for asset_key, asset in item.get_assets().items() } assert asset_workspace_uris == { - "asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset2.tif'}", + "asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items'/ 'asset2.tif' / 'asset2.tif'}", } # load it again @@ -143,7 +144,8 @@ def test_merge_from_disk_into_existing(tmp_path): for item in exported_collection.get_items(): for asset in item.get_assets().values(): - assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent + item_path_parts = Path(item.get_self_href()).parent.parts + assert item_path_parts == Path(asset.get_absolute_href()).parts[0:len(item_path_parts)] def test_adjacent_collections_do_not_have_interfering_items_and_assets(tmp_path): From 0748d95c97698eac95c49fa4680f86a34f679165 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 8 Jul 2025 13:51:14 +0200 Subject: [PATCH 2/3] DiskWorkspace: avoid assumption that item ID/asset key == relative asset path https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111 --- openeo_driver/workspace.py | 60 ++++++++++++++++++++++---------------- tests/test_workspace.py | 31 ++++++++++---------- 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 9c51da59..c150c831 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -84,43 +84,50 @@ def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str: return str(Path(parent_dir) / target.name) def item_func(item: Item, parent_dir: str) -> str: - # prevent items/assets of 2 adjacent Collection documents from interfering with each other: + # item ID can be a relative_asset_path but does not have to be + unique_item_filename = item.id.replace("/", "_") + # prevent items/assets of 2 adjacent Collection documents from interfering with each other; # unlike an object storage object, a Collection file cannot act as a parent "directory" as well - return f"{parent_dir}/{target.name}_items/{item.id}.json" + return f"{parent_dir}/{target.name}_items/{unique_item_filename}.json" return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func) - def replace_asset_href(asset_key: str, asset: Asset, collection_href:str) -> Asset: - if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? + def replace_asset_href(asset: Asset, src_collection_path: Path) -> Asset: + # pystac will handle STAC but not underlying assets; set asset hrefs up front + asset_uri_parts = urlparse(asset.get_absolute_href()) + if asset_uri_parts.scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}") + absolute_asset_path = Path(asset_uri_parts.path) # TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs; # it ends up in the asset metadata on disk - asset_href = asset.get_absolute_href() - asset.extra_fields["_original_absolute_href"] = asset_href - if asset_href.startswith("s3"): - asset.href = Path(asset_href).name - else: - common_path = os.path.commonpath([asset_href,collection_href]) - asset.href = os.path.relpath(asset_href,common_path) + asset.extra_fields["_original_absolute_path"] = str(absolute_asset_path) + relative_asset_path = absolute_asset_path.relative_to(src_collection_path.parent) + asset.href = str(relative_asset_path) # relative to item document return asset - collection_href = new_collection.get_self_href() + new_collection_path = Path(new_collection.get_self_href()) + if not existing_collection: new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) - new_collection = new_collection.map_assets(lambda k,v: replace_asset_href(k,v,collection_href)) + new_collection = new_collection.map_assets( + lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path) + ) new_collection.save(CatalogType.SELF_CONTAINED) for new_item in new_collection.get_items(): for asset in new_item.get_assets().values(): - asset_path = Path(new_item.get_self_href()).parent / Path(asset.href).parent - asset_path.mkdir(parents=True) - file_operation( - asset.extra_fields["_original_absolute_href"], str(asset_path) - ) + relative_asset_path = asset.href + asset_parent_dir = ( + Path(new_collection.get_self_href()).parent / f"{target.name}_items" / relative_asset_path + ).parent + asset_parent_dir.mkdir(parents=True, exist_ok=True) # asset might not end up next to item + file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir)) else: merged_collection = _merge_collection_metadata(existing_collection, new_collection) - new_collection = new_collection.map_assets(lambda k,v: replace_asset_href(k,v,collection_href)) + new_collection = new_collection.map_assets( + lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path) + ) for new_item in new_collection.get_items(): new_item.clear_links() # sever ties with previous collection @@ -131,15 +138,18 @@ def replace_asset_href(asset_key: str, asset: Asset, collection_href:str) -> Ass for new_item in new_collection.get_items(): for asset in new_item.get_assets().values(): - asset_path = Path(new_item.get_self_href()).parent / Path(asset.href).parent - asset_path.mkdir(parents=True) - file_operation( - asset.extra_fields["_original_absolute_href"], str(asset_path) - ) + relative_asset_path = asset.href + asset_parent_dir = ( + Path(merged_collection.get_self_href()).parent + / f"{target.name}_items" + / relative_asset_path + ).parent + asset_parent_dir.mkdir(parents=True, exist_ok=True) + file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir)) for item in new_collection.get_items(): for asset in item.assets.values(): - workspace_uri = f"file:{Path(item.get_self_href()).parent / asset.href}" + workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}" asset.extra_fields["alternate"] = {"file": workspace_uri} return new_collection diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 92b790ba..eec49014 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -66,8 +66,8 @@ def test_merge_from_disk_new(tmp_path): assert _paths_relative_to(workspace_dir) == { Path("collection.json"), + Path("collection.json_items") / "ASSET.TIF.json", Path("collection.json_items") / "asset.tif", - Path("collection.json_items") / "asset.tif.json", } assert isinstance(imported_collection, Collection) @@ -77,7 +77,7 @@ def test_merge_from_disk_new(tmp_path): for asset_key, asset in item.get_assets().items() } assert asset_workspace_uris == { - "asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif' / 'asset.tif'}" + "asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif'}" } # load it again @@ -90,8 +90,7 @@ def test_merge_from_disk_new(tmp_path): for item in exported_collection.get_items(): for asset in item.get_assets().values(): - item_path_parts = Path(item.get_self_href()).parent.parts - assert item_path_parts == Path(asset.get_absolute_href()).parts[0:len(item_path_parts)] + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent def test_merge_from_disk_into_existing(tmp_path): @@ -122,9 +121,9 @@ def test_merge_from_disk_into_existing(tmp_path): assert _paths_relative_to(workspace_dir) == { Path("collection.json"), Path("collection.json_items") / "asset1.tif", - Path("collection.json_items") / "asset1.tif.json", + Path("collection.json_items") / "ASSET1.TIF.json", Path("collection.json_items") / "asset2.tif", - Path("collection.json_items") / "asset2.tif.json", + Path("collection.json_items") / "ASSET2.TIF.json", } assert isinstance(imported_collection, Collection) @@ -134,7 +133,7 @@ def test_merge_from_disk_into_existing(tmp_path): for asset_key, asset in item.get_assets().items() } assert asset_workspace_uris == { - "asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items'/ 'asset2.tif' / 'asset2.tif'}", + "asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset2.tif'}", } # load it again @@ -161,8 +160,7 @@ def test_merge_from_disk_into_existing(tmp_path): for item in exported_collection.get_items(): for asset in item.get_assets().values(): - item_path_parts = Path(item.get_self_href()).parent.parts - assert item_path_parts == Path(asset.get_absolute_href()).parts[0:len(item_path_parts)] + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent def test_adjacent_collections_do_not_have_interfering_items_and_assets(tmp_path): @@ -193,18 +191,18 @@ def asset_contents(collection_filename: str): return f.read() workspace.merge(collection1, target=Path("collection1.json")) - assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n" + assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n" # put collection2 next to collection1 workspace.merge(collection2, target=Path("collection2.json")) - assert asset_contents(collection_filename="collection2.json") == "collection2-asset1.tif-asset1.tif\n" + assert asset_contents(collection_filename="collection2.json") == "collection2-ASSET1.TIF-asset1.tif\n" # separate collection files assert (workspace.root_directory / "collection1.json").exists() assert (workspace.root_directory / "collection2.json").exists() # collection2 should not overwrite collection1's items/assets - assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n" + assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n" def _collection( @@ -223,10 +221,13 @@ def _collection( # note: filepath_per_band behavior is tested in e.g. openeo-geopyspark-driver's # test_batch_result.test_export_workspace_merge_filepath_per_band - item = Item(id=asset_filename, geometry=None, bbox=None, datetime=now_utc(), properties={}) + item_id = asset_filename.upper() # different from asset_filename: unique yet predictable for the tests + item = Item(id=item_id, geometry=None, bbox=None, datetime=now_utc(), properties={}) - asset_path = root_path / item.id / asset_filename - asset = Asset(href=asset_path.name) # relative to item + # TODO: implementation assumes that relative asset path is a sibling of the collection file so this setup mirrors + # that; is it possible to assert this or even avoid entirely? + asset_path = root_path / asset_filename + asset = Asset(href=str(asset_path.absolute())) item.add_asset(key=asset_filename, asset=asset) collection.add_item(item) From 56d403681d45b0f599c3434f3599729907841d27 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 10 Jul 2025 15:51:59 +0200 Subject: [PATCH 3/3] adapt version and CHANGELOG https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111 --- CHANGELOG.md | 1 + openeo_driver/_version.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f59e0e8..c3e4084f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and start a new "In Progress" section above it. - Have `integrations.s3` for interaction with Object Storage that follows the S3 API. - `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232)) +- `DiskWorkspace`: support unified asset keys ([Open-EO/openeo-geopyspark-driver#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111)) ## 0.134.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 7fd448da..28322a79 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.135.0a4" +__version__ = "0.135.0a5"