Skip to content

Commit bf95c1a

Browse files
authored
Merge branch 'develop-1.9' into integrate_1.9
2 parents a2547bc + 51fc07f commit bf95c1a

File tree

4 files changed

+66
-40
lines changed

4 files changed

+66
-40
lines changed

apps/dc_tools/odc/apps/dc_tools/_stac.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import numpy
1111
from datacube.model import Dataset
1212
from odc.geo.geom import Geometry, box
13-
from eodatasets3.serialise import from_doc
1413
from eodatasets3.stac import to_stac_item
1514
from toolz import get_in
1615
from urllib.parse import urlparse
@@ -231,7 +230,11 @@ def _get_stac_bands(
231230
# If transform specified here in the asset it should override
232231
# the properties-specified transform.
233232
transform = asset.get("proj:transform") or proj_transform
234-
grid = f"g{transform[0]:g}m"
233+
234+
if transform is not None:
235+
grid = f"g{transform[0]:g}m"
236+
else:
237+
grid = default_grid
235238

236239
# As per transform, shape here overrides properties
237240
shape = asset.get("proj:shape") or proj_shape
@@ -373,6 +376,11 @@ def stac_transform(input_stac: Document) -> Document:
373376
proj_transform=proj_transform,
374377
)
375378

379+
# STAC document may not have top-level proj:shape property
380+
# use one of the bands as a default
381+
proj_shape = grids.get("default").get("shape")
382+
proj_transform = grids.get("default").get("transform")
383+
376384
stac_properties, lineage = _get_stac_properties_lineage(input_stac)
377385

378386
epsg = properties["proj:epsg"]
@@ -444,6 +452,7 @@ def transform_geom_json_coordinates_to_list(geom_json):
444452

445453
def ds_to_stac(ds: Dataset) -> dict:
446454
"""Get STAC document from dataset with eo3 metadata"""
455+
from eodatasets3.serialise import from_doc
447456
if ds.is_eo3:
448457
if not ds.uris:
449458
raise ValueError("Can't find dataset location")

apps/dc_tools/odc/apps/dc_tools/stac_api_to_dc.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99

1010
import click
1111
from datacube import Datacube
12-
from datacube.index.hl import Doc2Dataset
12+
from datacube.model import Dataset
1313
from datacube.ui.click import environment_option, pass_config
14-
from odc.apps.dc_tools._stac import stac_transform
14+
from odc.stac.eo3 import stac2ds
15+
1516
from odc.apps.dc_tools.utils import (
1617
SkippedException,
1718
allow_unsafe,
@@ -64,8 +65,9 @@ def _parse_options(options: Optional[str]) -> Dict[str, Any]:
6465

6566
def item_to_meta_uri(
6667
item: Item,
68+
dc: Datacube,
6769
rename_product: Optional[str] = None,
68-
) -> Generator[Tuple[dict, str, bool], None, None]:
70+
) -> Generator[Tuple[Dataset, str, bool], None, None]:
6971
for link in item.links:
7072
if link.rel == "self":
7173
uri = link.target
@@ -75,31 +77,44 @@ def item_to_meta_uri(
7577
uri = link.target
7678
break
7779

78-
metadata = item.to_dict()
7980
if rename_product is not None:
80-
metadata["properties"]["odc:product"] = rename_product
81-
stac = metadata
82-
metadata = stac_transform(metadata)
81+
item.properties["odc:product"] = rename_product
82+
83+
# Try to the Datacube product for the dataset
84+
product_name = item.properties.get("odc:product", item.collection_id)
85+
product_name_sanitised = product_name.replace("-", "_")
86+
product = dc.index.products.get_by_name(product_name_sanitised)
87+
88+
if product is None:
89+
logging.warning(
90+
"Couldn't find matching product for product name: %s",
91+
product_name_sanitised,
92+
)
93+
raise SkippedException(f"Couldn't find matching product for product name: {product_name_sanitised}")
94+
95+
# Convert the STAC Item to a Dataset
96+
dataset = next(stac2ds([item]))
97+
# And assign the product ID
98+
dataset.product = product
8399

84-
return (metadata, uri, stac)
100+
return (dataset, uri, item.to_dict())
85101

86102

87103
def process_item(
88104
item: Item,
89105
dc: Datacube,
90-
doc2ds: Doc2Dataset,
91106
update_if_exists: bool,
92107
allow_unsafe: bool,
93108
rename_product: Optional[str] = None,
94109
archive_less_mature: int = None,
95110
publish_action: bool = False,
96111
):
97-
meta, uri, stac = item_to_meta_uri(item, rename_product)
112+
dataset, uri, stac = item_to_meta_uri(item, dc, rename_product)
98113
index_update_dataset(
99-
meta,
114+
dataset,
100115
uri,
101116
dc,
102-
doc2ds,
117+
None,
103118
update_if_exists=update_if_exists,
104119
allow_unsafe=allow_unsafe,
105120
archive_less_mature=archive_less_mature,
@@ -118,7 +133,6 @@ def stac_api_to_odc(
118133
archive_less_mature: int = None,
119134
publish_action: Optional[str] = None,
120135
) -> Tuple[int, int, int]:
121-
doc2ds = Doc2Dataset(dc.index)
122136
client = Client.open(catalog_href)
123137

124138
search = client.search(**config)
@@ -143,14 +157,13 @@ def stac_api_to_odc(
143157
process_item,
144158
item,
145159
dc,
146-
doc2ds,
147160
update_if_exists=update_if_exists,
148161
allow_unsafe=allow_unsafe,
149162
rename_product=rename_product,
150163
archive_less_mature=archive_less_mature,
151164
publish_action=publish_action,
152165
): item.id
153-
for item in search.get_all_items()
166+
for item in search.items()
154167
}
155168
for future in concurrent.futures.as_completed(future_to_item):
156169
item = future_to_item[future]

apps/dc_tools/odc/apps/dc_tools/utils.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import click
55
import importlib_resources
66
from datacube import Datacube
7+
from datacube.model import Dataset
78
from datacube.index.hl import Doc2Dataset
89
from datacube.utils import changes
910
from datadog import initialize, statsd
@@ -183,14 +184,15 @@ def get_esri_list():
183184

184185

185186
def index_update_dataset(
186-
metadata: dict,
187+
dataset: dict | Dataset,
187188
uri: str,
188189
dc: Datacube,
189-
doc2ds: Doc2Dataset,
190+
doc2ds: Doc2Dataset | None,
190191
update: bool = False,
191192
update_if_exists: bool = False,
192193
allow_unsafe: bool = False,
193194
archive_less_mature: Optional[int] = None,
195+
auto_add_lineage: Optional[bool] = False,
194196
publish_action: Optional[str] = None,
195197
stac_doc: Optional[dict] = None,
196198
) -> int:
@@ -215,19 +217,17 @@ def index_update_dataset(
215217
:param stac_doc: STAC document for publication to SNS topic.
216218
:return: Returns nothing. Raises an exception if anything goes wrong.
217219
"""
218-
if uri is None:
219-
raise IndexingException("Failed to get URI from metadata doc")
220220
# Make sure we can create a dataset first
221-
try:
222-
ds, err = doc2ds(metadata, uri)
223-
except ValueError as e:
224-
raise IndexingException(
225-
f"Exception thrown when trying to create dataset: '{e}'\n The URI was {uri}"
226-
) from e
227-
if ds is None:
228-
raise IndexingException(
229-
f"Failed to create dataset with error {err}\n The URI was {uri}"
230-
)
221+
if not isinstance(dataset, Dataset):
222+
print("Not a dataset: ", dataset)
223+
try:
224+
if doc2ds is None:
225+
doc2ds = Doc2Dataset(dc.index)
226+
dataset, _ = doc2ds(dataset, uri)
227+
except ValueError as e:
228+
raise IndexingException(
229+
f"Exception thrown when trying to create dataset: '{e}'\n The URI was {uri}"
230+
) from e
231231

232232
with dc.index.transaction():
233233
# Process in a transaction
@@ -236,13 +236,13 @@ def index_update_dataset(
236236
updated = False
237237

238238
if isinstance(archive_less_mature, int) and publish_action:
239-
dupes = dc.index.datasets.find_less_mature(ds, archive_less_mature)
239+
dupes = dc.index.datasets.find_less_mature(dataset, archive_less_mature)
240240
for dupe in dupes:
241241
archive_stacs.append(ds_to_stac(dupe))
242242

243243
# Now do something with the dataset
244244
# Note that any of the exceptions raised below will rollback any archiving performed above.
245-
if dc.index.datasets.has(metadata.get("id")):
245+
if dc.index.datasets.has(dataset.id):
246246
# Update
247247
if update or update_if_exists:
248248
# Set up update fields
@@ -252,7 +252,7 @@ def index_update_dataset(
252252
# Do the updating
253253
try:
254254
dc.index.datasets.update(
255-
ds,
255+
dataset,
256256
updates_allowed=updates,
257257
archive_less_mature=archive_less_mature,
258258
)
@@ -262,9 +262,8 @@ def index_update_dataset(
262262
f"Updating the dataset raised an exception: {e}"
263263
)
264264
else:
265-
logging.warning("Dataset already exists, not indexing")
266265
raise SkippedException(
267-
f"Dataset {metadata.get('id')} already exists, not indexing"
266+
f"Dataset {dataset.id} already exists, not indexing"
268267
)
269268
else:
270269
if update:
@@ -273,22 +272,26 @@ def index_update_dataset(
273272
"Can't update dataset because it doesn't exist."
274273
)
275274
# Everything is working as expected, add the dataset
276-
dc.index.datasets.add(ds, archive_less_mature=archive_less_mature)
275+
dc.index.datasets.add(
276+
dataset,
277+
with_lineage=auto_add_lineage,
278+
archive_less_mature=archive_less_mature,
279+
)
277280
added = True
278281

279282
if publish_action:
280283
for arch_stac in archive_stacs:
281284
publish_to_topic(arn=publish_action, action="ARCHIVED", stac=arch_stac)
282285

283286
if added:
284-
logging.info("New Dataset Added: %s", ds.id)
287+
logging.info("New Dataset Added: %s", dataset.id)
285288
if publish_action:
286289
# if STAC was not provided, generate from dataset
287-
stac_doc = stac_doc if stac_doc else ds_to_stac(ds)
290+
stac_doc = stac_doc if stac_doc else ds_to_stac(dataset)
288291
publish_to_topic(arn=publish_action, action="ADDED", stac=stac_doc)
289292

290293
if updated:
291-
logging.info("Existing Dataset Updated: %s", ds.id)
294+
logging.info("Existing Dataset Updated: %s", dataset.id)
292295

293296

294297
def statsd_gauge_reporting(value, tags=None, statsd_setting="localhost:8125"):

apps/dc_tools/setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ install_requires =
3434
odc_io
3535
odc-cloud[ASYNC]>=0.2.3
3636
odc-geo
37+
odc-stac
3738
pystac>=1.0.0
3839
rio-stac
3940
urlpath

0 commit comments

Comments
 (0)