Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 1 addition & 33 deletions apps/dc_tools/odc/apps/dc_tools/_docs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""These should probably be in datacube library."""

import json
import sys
from typing import Sequence, Union
from uuid import UUID, uuid5

from datacube.index.hl import Doc2Dataset
from datacube.utils.documents import parse_yaml
from datacube.utils.documents import parse_doc_stream

# Some random UUID to be ODC namespace
ODC_NS = UUID("6f34c6f4-13d6-43c0-8e4e-42b6c13203af")
Expand Down Expand Up @@ -73,37 +72,6 @@ def from_metadata_stream(metadata_stream, index, **kwargs):
yield (None, f"Error: {uri}, {err}")


def parse_doc_stream(doc_stream, on_error=None, transform=None):
"""
Replace doc bytes/strings with parsed dicts.

Stream[(uri, bytes)] -> Stream[(uri, dict)]


:param doc_stream: sequence of (uri, doc: bytes|string)
:param on_error: Callback uri, doc -> None
:param transform: dict -> dict if supplied also apply further transform on parsed document

On output doc is replaced with python dict parsed from yaml, or with None
if parsing/transform error occurred.
"""
for uri, doc in doc_stream:
try:
if uri.endswith(".json"):
metadata = json.loads(doc)
else:
metadata = parse_yaml(doc)

if transform is not None:
metadata = transform(metadata)
except Exception: # pylint: disable=broad-except
if on_error is not None:
on_error(uri, doc)
metadata = None

yield uri, metadata


def from_yaml_doc_stream(doc_stream, index, logger=None, transform=None, **kwargs):
"""
Stream of yaml documents to a stream of Dataset results.
Expand Down
3 changes: 1 addition & 2 deletions apps/dc_tools/odc/apps/dc_tools/s3_to_dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from datacube import Datacube
from datacube.index.hl import Doc2Dataset
from datacube.ui.click import environment_option, pass_config
from datacube.utils.documents import parse_doc_stream
from odc.aio import S3Fetcher, s3_find_glob
from odc.apps.dc_tools._docs import parse_doc_stream
from odc.apps.dc_tools.utils import (
IndexingException,
SkippedException,
Expand Down Expand Up @@ -286,7 +286,6 @@ def cli(
"Any wildcard characters will be escaped."
)
# Get a generator from supplied S3 Uri for candidate documents
fetcher = None
# Grab the URL from the resulting S3 item
if is_glob:
fetcher = S3Fetcher(aws_unsigned=no_sign_request)
Expand Down
2 changes: 1 addition & 1 deletion apps/dc_tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = [
]
dependencies = [
"click",
"datacube>=1.9.6",
"datacube>=1.9.9",
"datadog",
"eodatasets3>=1.9",
"fsspec",
Expand Down
2 changes: 1 addition & 1 deletion tests/test-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
- python=3.12

# Datacube
- datacube>=1.9.0
- datacube>=1.9.9
- sqlalchemy>=2.0

# odc.ui
Expand Down