Skip to content

issue 1111 merge new stac #412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions openeo_driver/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,29 +90,37 @@ def item_func(item: Item, parent_dir: str) -> str:

return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func)

def replace_asset_href(asset_key: str, asset: Asset) -> Asset:
def replace_asset_href(asset_key: str, asset: Asset, collection_href:str) -> Asset:
if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}")

# TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs;
# it ends up in the asset metadata on disk
asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href()
asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path
asset_href = asset.get_absolute_href()
asset.extra_fields["_original_absolute_href"] = asset_href
if asset_href.startswith("s3"):
asset.href = Path(asset_href).name
else:
common_path = os.path.commonpath([asset_href,collection_href])
asset.href = os.path.relpath(asset_href,common_path)
return asset

collection_href = new_collection.get_self_href()
if not existing_collection:
new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
new_collection = new_collection.map_assets(replace_asset_href)
new_collection = new_collection.map_assets(lambda k,v: replace_asset_href(k,v,collection_href))
new_collection.save(CatalogType.SELF_CONTAINED)

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
asset_path = Path(new_item.get_self_href()).parent / Path(asset.href).parent
asset_path.mkdir(parents=True)
file_operation(
asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent)
asset.extra_fields["_original_absolute_href"], str(asset_path)
)
else:
merged_collection = _merge_collection_metadata(existing_collection, new_collection)
new_collection = new_collection.map_assets(replace_asset_href)
new_collection = new_collection.map_assets(lambda k,v: replace_asset_href(k,v,collection_href))

for new_item in new_collection.get_items():
new_item.clear_links() # sever ties with previous collection
Expand All @@ -123,13 +131,15 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset:

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
asset_path = Path(new_item.get_self_href()).parent / Path(asset.href).parent
asset_path.mkdir(parents=True)
file_operation(
asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent
asset.extra_fields["_original_absolute_href"], str(asset_path)
)

for item in new_collection.get_items():
for asset in item.assets.values():
workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}"
workspace_uri = f"file:{Path(item.get_self_href()).parent / asset.href}"
asset.extra_fields["alternate"] = {"file": workspace_uri}

return new_collection
Expand Down
10 changes: 6 additions & 4 deletions tests/test_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_merge_from_disk_new(tmp_path):
for asset_key, asset in item.get_assets().items()
}
assert asset_workspace_uris == {
"asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif'}"
"asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif' / 'asset.tif'}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a reason for now having filename twice in the path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path of the asset is the collection path + item id + asset filename with item id = asset.tif and asset filename = asset.tif. The merge now takes the path relative to the collection path, so item id + asset filename which is twice the asset.tif. See

asset_path = root_path / item.id / asset_filename

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into it further but I'm not sure about this.

The problem with the current workspace implementations is that they assume that the asset key contains a relative path (relative to the job directory).

With the introduction of unified asset keys, they no longer have any meaning (could be anything) so this assumption no longer holds and this shortcut in the code has to be removed and implemented in a different way.

It's true that the dummy STAC items in these tests mirror the current implementation in that their item ID == asset key == relative asset path; this is a bit confusing with the new implementation in mind but at the same time the actual item ID should not matter.

I would expect the workspace URI (= the URI of an asset that it gets as it is exported to a workspace) to remain the same: /path/to/collection.json_items/asset.tif and the item ID is not involved.

}

# load it again
Expand All @@ -90,7 +90,8 @@ def test_merge_from_disk_new(tmp_path):

for item in exported_collection.get_items():
for asset in item.get_assets().values():
assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent
item_path_parts = Path(item.get_self_href()).parent.parts
assert item_path_parts == Path(asset.get_absolute_href()).parts[0:len(item_path_parts)]


def test_merge_from_disk_into_existing(tmp_path):
Expand Down Expand Up @@ -133,7 +134,7 @@ def test_merge_from_disk_into_existing(tmp_path):
for asset_key, asset in item.get_assets().items()
}
assert asset_workspace_uris == {
"asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset2.tif'}",
"asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items'/ 'asset2.tif' / 'asset2.tif'}",
}

# load it again
Expand All @@ -160,7 +161,8 @@ def test_merge_from_disk_into_existing(tmp_path):

for item in exported_collection.get_items():
for asset in item.get_assets().values():
assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent
item_path_parts = Path(item.get_self_href()).parent.parts
assert item_path_parts == Path(asset.get_absolute_href()).parts[0:len(item_path_parts)]


def test_adjacent_collections_do_not_have_interfering_items_and_assets(tmp_path):
Expand Down