Skip to content

issue 1111 merge new stac #412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and start a new "In Progress" section above it.

- Have `integrations.s3` for interaction with Object Storage that follows the S3 API.
- `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232))
- `DiskWorkspace`: support unified asset keys ([Open-EO/openeo-geopyspark-driver#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111))

## 0.134.0

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.135.0a4"
__version__ = "0.135.0a5"
48 changes: 34 additions & 14 deletions openeo_driver/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,35 +84,50 @@ def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str:
return str(Path(parent_dir) / target.name)

def item_func(item: Item, parent_dir: str) -> str:
# prevent items/assets of 2 adjacent Collection documents from interfering with each other:
# item ID can be a relative_asset_path but does not have to be
unique_item_filename = item.id.replace("/", "_")
# prevent items/assets of 2 adjacent Collection documents from interfering with each other;
# unlike an object storage object, a Collection file cannot act as a parent "directory" as well
return f"{parent_dir}/{target.name}_items/{item.id}.json"
return f"{parent_dir}/{target.name}_items/{unique_item_filename}.json"

return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func)

def replace_asset_href(asset_key: str, asset: Asset) -> Asset:
if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
def replace_asset_href(asset: Asset, src_collection_path: Path) -> Asset:
# pystac will handle STAC but not underlying assets; set asset hrefs up front
asset_uri_parts = urlparse(asset.get_absolute_href())
if asset_uri_parts.scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}")

absolute_asset_path = Path(asset_uri_parts.path)
# TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs;
# it ends up in the asset metadata on disk
asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href()
asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path
asset.extra_fields["_original_absolute_path"] = str(absolute_asset_path)
relative_asset_path = absolute_asset_path.relative_to(src_collection_path.parent)
asset.href = str(relative_asset_path) # relative to item document
return asset

new_collection_path = Path(new_collection.get_self_href())

if not existing_collection:
new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
new_collection = new_collection.map_assets(replace_asset_href)
new_collection = new_collection.map_assets(
lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path)
)
new_collection.save(CatalogType.SELF_CONTAINED)

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
file_operation(
asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent)
)
relative_asset_path = asset.href
asset_parent_dir = (
Path(new_collection.get_self_href()).parent / f"{target.name}_items" / relative_asset_path
).parent
asset_parent_dir.mkdir(parents=True, exist_ok=True) # asset might not end up next to item
file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir))
else:
merged_collection = _merge_collection_metadata(existing_collection, new_collection)
new_collection = new_collection.map_assets(replace_asset_href)
new_collection = new_collection.map_assets(
lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path)
)

for new_item in new_collection.get_items():
new_item.clear_links() # sever ties with previous collection
Expand All @@ -123,9 +138,14 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset:

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
file_operation(
asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent
)
relative_asset_path = asset.href
asset_parent_dir = (
Path(merged_collection.get_self_href()).parent
/ f"{target.name}_items"
/ relative_asset_path
).parent
asset_parent_dir.mkdir(parents=True, exist_ok=True)
file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir))

for item in new_collection.get_items():
for asset in item.assets.values():
Expand Down
21 changes: 12 additions & 9 deletions tests/test_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def test_merge_from_disk_new(tmp_path):

assert _paths_relative_to(workspace_dir) == {
Path("collection.json"),
Path("collection.json_items") / "ASSET.TIF.json",
Path("collection.json_items") / "asset.tif",
Path("collection.json_items") / "asset.tif.json",
}

assert isinstance(imported_collection, Collection)
Expand Down Expand Up @@ -121,9 +121,9 @@ def test_merge_from_disk_into_existing(tmp_path):
assert _paths_relative_to(workspace_dir) == {
Path("collection.json"),
Path("collection.json_items") / "asset1.tif",
Path("collection.json_items") / "asset1.tif.json",
Path("collection.json_items") / "ASSET1.TIF.json",
Path("collection.json_items") / "asset2.tif",
Path("collection.json_items") / "asset2.tif.json",
Path("collection.json_items") / "ASSET2.TIF.json",
}

assert isinstance(imported_collection, Collection)
Expand Down Expand Up @@ -191,18 +191,18 @@ def asset_contents(collection_filename: str):
return f.read()

workspace.merge(collection1, target=Path("collection1.json"))
assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n"
assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n"

# put collection2 next to collection1
workspace.merge(collection2, target=Path("collection2.json"))
assert asset_contents(collection_filename="collection2.json") == "collection2-asset1.tif-asset1.tif\n"
assert asset_contents(collection_filename="collection2.json") == "collection2-ASSET1.TIF-asset1.tif\n"

# separate collection files
assert (workspace.root_directory / "collection1.json").exists()
assert (workspace.root_directory / "collection2.json").exists()

# collection2 should not overwrite collection1's items/assets
assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n"
assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n"


def _collection(
Expand All @@ -221,10 +221,13 @@ def _collection(
# note: filepath_per_band behavior is tested in e.g. openeo-geopyspark-driver's
# test_batch_result.test_export_workspace_merge_filepath_per_band

item = Item(id=asset_filename, geometry=None, bbox=None, datetime=now_utc(), properties={})
item_id = asset_filename.upper() # different from asset_filename: unique yet predictable for the tests
item = Item(id=item_id, geometry=None, bbox=None, datetime=now_utc(), properties={})

asset_path = root_path / item.id / asset_filename
asset = Asset(href=asset_path.name) # relative to item
# TODO: implementation assumes that relative asset path is a sibling of the collection file so this setup mirrors
# that; is it possible to assert this or even avoid entirely?
asset_path = root_path / asset_filename
asset = Asset(href=str(asset_path.absolute()))

item.add_asset(key=asset_filename, asset=asset)
collection.add_item(item)
Expand Down