Skip to content

Commit de52b92

Browse files
DiskWorkspace: support unified asset keys
* issue 1111 merge new stac * DiskWorkspace: avoid assumption that item ID/asset key == relative asset path Open-EO/openeo-geopyspark-driver#1111 * adapt version and CHANGELOG Open-EO/openeo-geopyspark-driver#1111 --------- Co-authored-by: Jan Van den bosch <jan@bossie.org>
1 parent 97755d2 commit de52b92

File tree

4 files changed

+48
-24
lines changed

4 files changed

+48
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ and start a new "In Progress" section above it.
2323

2424
- Have `integrations.s3` for interaction with Object Storage that follows the S3 API.
2525
- `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232))
26+
- `DiskWorkspace`: support unified asset keys ([Open-EO/openeo-geopyspark-driver#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111))
2627

2728
## 0.134.0
2829

openeo_driver/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.135.0a4"
1+
__version__ = "0.135.0a5"

openeo_driver/workspace.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,35 +84,50 @@ def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str:
8484
return str(Path(parent_dir) / target.name)
8585

8686
def item_func(item: Item, parent_dir: str) -> str:
87-
# prevent items/assets of 2 adjacent Collection documents from interfering with each other:
87+
# item ID can be a relative_asset_path but does not have to be
88+
unique_item_filename = item.id.replace("/", "_")
89+
# prevent items/assets of 2 adjacent Collection documents from interfering with each other;
8890
# unlike an object storage object, a Collection file cannot act as a parent "directory" as well
89-
return f"{parent_dir}/{target.name}_items/{item.id}.json"
91+
return f"{parent_dir}/{target.name}_items/{unique_item_filename}.json"
9092

9193
return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func)
9294

93-
def replace_asset_href(asset_key: str, asset: Asset) -> Asset:
94-
if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
95+
def replace_asset_href(asset: Asset, src_collection_path: Path) -> Asset:
96+
# pystac will handle STAC but not underlying assets; set asset hrefs up front
97+
asset_uri_parts = urlparse(asset.get_absolute_href())
98+
if asset_uri_parts.scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
9599
raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}")
96100

101+
absolute_asset_path = Path(asset_uri_parts.path)
97102
# TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs;
98103
# it ends up in the asset metadata on disk
99-
asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href()
100-
asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path
104+
asset.extra_fields["_original_absolute_path"] = str(absolute_asset_path)
105+
relative_asset_path = absolute_asset_path.relative_to(src_collection_path.parent)
106+
asset.href = str(relative_asset_path) # relative to item document
101107
return asset
102108

109+
new_collection_path = Path(new_collection.get_self_href())
110+
103111
if not existing_collection:
104112
new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
105-
new_collection = new_collection.map_assets(replace_asset_href)
113+
new_collection = new_collection.map_assets(
114+
lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path)
115+
)
106116
new_collection.save(CatalogType.SELF_CONTAINED)
107117

108118
for new_item in new_collection.get_items():
109119
for asset in new_item.get_assets().values():
110-
file_operation(
111-
asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent)
112-
)
120+
relative_asset_path = asset.href
121+
asset_parent_dir = (
122+
Path(new_collection.get_self_href()).parent / f"{target.name}_items" / relative_asset_path
123+
).parent
124+
asset_parent_dir.mkdir(parents=True, exist_ok=True) # asset might not end up next to item
125+
file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir))
113126
else:
114127
merged_collection = _merge_collection_metadata(existing_collection, new_collection)
115-
new_collection = new_collection.map_assets(replace_asset_href)
128+
new_collection = new_collection.map_assets(
129+
lambda _, asset: replace_asset_href(asset, src_collection_path=new_collection_path)
130+
)
116131

117132
for new_item in new_collection.get_items():
118133
new_item.clear_links() # sever ties with previous collection
@@ -123,9 +138,14 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset:
123138

124139
for new_item in new_collection.get_items():
125140
for asset in new_item.get_assets().values():
126-
file_operation(
127-
asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent
128-
)
141+
relative_asset_path = asset.href
142+
asset_parent_dir = (
143+
Path(merged_collection.get_self_href()).parent
144+
/ f"{target.name}_items"
145+
/ relative_asset_path
146+
).parent
147+
asset_parent_dir.mkdir(parents=True, exist_ok=True)
148+
file_operation(asset.extra_fields["_original_absolute_path"], str(asset_parent_dir))
129149

130150
for item in new_collection.get_items():
131151
for asset in item.assets.values():

tests/test_workspace.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ def test_merge_from_disk_new(tmp_path):
6666

6767
assert _paths_relative_to(workspace_dir) == {
6868
Path("collection.json"),
69+
Path("collection.json_items") / "ASSET.TIF.json",
6970
Path("collection.json_items") / "asset.tif",
70-
Path("collection.json_items") / "asset.tif.json",
7171
}
7272

7373
assert isinstance(imported_collection, Collection)
@@ -121,9 +121,9 @@ def test_merge_from_disk_into_existing(tmp_path):
121121
assert _paths_relative_to(workspace_dir) == {
122122
Path("collection.json"),
123123
Path("collection.json_items") / "asset1.tif",
124-
Path("collection.json_items") / "asset1.tif.json",
124+
Path("collection.json_items") / "ASSET1.TIF.json",
125125
Path("collection.json_items") / "asset2.tif",
126-
Path("collection.json_items") / "asset2.tif.json",
126+
Path("collection.json_items") / "ASSET2.TIF.json",
127127
}
128128

129129
assert isinstance(imported_collection, Collection)
@@ -191,18 +191,18 @@ def asset_contents(collection_filename: str):
191191
return f.read()
192192

193193
workspace.merge(collection1, target=Path("collection1.json"))
194-
assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n"
194+
assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n"
195195

196196
# put collection2 next to collection1
197197
workspace.merge(collection2, target=Path("collection2.json"))
198-
assert asset_contents(collection_filename="collection2.json") == "collection2-asset1.tif-asset1.tif\n"
198+
assert asset_contents(collection_filename="collection2.json") == "collection2-ASSET1.TIF-asset1.tif\n"
199199

200200
# separate collection files
201201
assert (workspace.root_directory / "collection1.json").exists()
202202
assert (workspace.root_directory / "collection2.json").exists()
203203

204204
# collection2 should not overwrite collection1's items/assets
205-
assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n"
205+
assert asset_contents(collection_filename="collection1.json") == "collection1-ASSET1.TIF-asset1.tif\n"
206206

207207

208208
def _collection(
@@ -221,10 +221,13 @@ def _collection(
221221
# note: filepath_per_band behavior is tested in e.g. openeo-geopyspark-driver's
222222
# test_batch_result.test_export_workspace_merge_filepath_per_band
223223

224-
item = Item(id=asset_filename, geometry=None, bbox=None, datetime=now_utc(), properties={})
224+
item_id = asset_filename.upper() # different from asset_filename: unique yet predictable for the tests
225+
item = Item(id=item_id, geometry=None, bbox=None, datetime=now_utc(), properties={})
225226

226-
asset_path = root_path / item.id / asset_filename
227-
asset = Asset(href=asset_path.name) # relative to item
227+
# TODO: implementation assumes that relative asset path is a sibling of the collection file so this setup mirrors
228+
# that; is it possible to assert this or even avoid entirely?
229+
asset_path = root_path / asset_filename
230+
asset = Asset(href=str(asset_path.absolute()))
228231

229232
item.add_asset(key=asset_filename, asset=asset)
230233
collection.add_item(item)

0 commit comments

Comments
 (0)