|
1 | 1 | """These should probably be in datacube library."""
|
2 | 2 |
|
3 |
| -import json |
4 | 3 | import sys
|
5 | 4 | from typing import Sequence, Union
|
6 | 5 | from uuid import UUID, uuid5
|
7 | 6 |
|
8 | 7 | from datacube.index.hl import Doc2Dataset
|
9 |
| -from datacube.utils.documents import parse_yaml |
| 8 | +from datacube.utils.documents import parse_doc_stream |
10 | 9 |
|
11 | 10 | # Some random UUID to be ODC namespace
|
12 | 11 | ODC_NS = UUID("6f34c6f4-13d6-43c0-8e4e-42b6c13203af")
|
@@ -73,37 +72,6 @@ def from_metadata_stream(metadata_stream, index, **kwargs):
|
73 | 72 | yield (None, f"Error: {uri}, {err}")
|
74 | 73 |
|
75 | 74 |
|
76 |
| -def parse_doc_stream(doc_stream, on_error=None, transform=None): |
77 |
| - """ |
78 |
| - Replace doc bytes/strings with parsed dicts. |
79 |
| -
|
80 |
| - Stream[(uri, bytes)] -> Stream[(uri, dict)] |
81 |
| -
|
82 |
| -
|
83 |
| - :param doc_stream: sequence of (uri, doc: bytes|string) |
84 |
| - :param on_error: Callback uri, doc -> None |
85 |
| - :param transform: dict -> dict if supplied also apply further transform on parsed document |
86 |
| -
|
87 |
| - On output doc is replaced with python dict parsed from yaml, or with None |
88 |
| - if parsing/transform error occurred. |
89 |
| - """ |
90 |
| - for uri, doc in doc_stream: |
91 |
| - try: |
92 |
| - if uri.endswith(".json"): |
93 |
| - metadata = json.loads(doc) |
94 |
| - else: |
95 |
| - metadata = parse_yaml(doc) |
96 |
| - |
97 |
| - if transform is not None: |
98 |
| - metadata = transform(metadata) |
99 |
| - except Exception: # pylint: disable=broad-except |
100 |
| - if on_error is not None: |
101 |
| - on_error(uri, doc) |
102 |
| - metadata = None |
103 |
| - |
104 |
| - yield uri, metadata |
105 |
| - |
106 |
| - |
107 | 75 | def from_yaml_doc_stream(doc_stream, index, logger=None, transform=None, **kwargs):
|
108 | 76 | """
|
109 | 77 | Stream of yaml documents to a stream of Dataset results.
|
|
0 commit comments