Skip to content

Commit 6270002

Browse files
committed
aws: import odc-cloud modules
This is the remaining synchronous S3 functionality in odc-cloud that is used and isn't already imported in datacube. I have added some partial type annotations that I know are correct, and done a light reformat+lint fixes.
1 parent e7f8447 commit 6270002

File tree

4 files changed

+466
-0
lines changed

4 files changed

+466
-0
lines changed

datacube/utils/aws/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import functools
1010
import os
1111
import time
12+
from collections.abc import Generator
1213
from typing import IO, Any, TypeAlias
1314
from urllib.parse import urlparse
1415
from urllib.request import urlopen
@@ -417,6 +418,26 @@ def s3_dump(data: bytes | str | IO, url: str, s3: MaybeS3 = None, **kwargs):
417418
return 200 <= code < 300
418419

419420

421+
def s3_ls_dir(uri: str, s3: BaseClient | None = None, **kw) -> Generator[str]:
422+
bucket, prefix = s3_url_parse(uri)
423+
424+
if len(prefix) > 0 and not prefix.endswith("/"):
425+
prefix = prefix + "/"
426+
427+
s3 = s3 or s3_client()
428+
paginator = s3.get_paginator("list_objects_v2")
429+
430+
for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/", **kw):
431+
sub_dirs = page.get("CommonPrefixes", [])
432+
files = page.get("Contents", [])
433+
434+
for p in sub_dirs:
435+
yield f"s3://{bucket}/{p['Prefix']}"
436+
437+
for o in files:
438+
yield f"s3://{bucket}/{o['Key']}"
439+
440+
420441
def get_aws_settings(
421442
profile: str | None = None,
422443
region_name: str = "auto",

datacube/utils/aws/inventory.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
2+
#
3+
# Copyright (c) 2015-2025 ODC Contributors
4+
# SPDX-License-Identifier: Apache-2.0
5+
import csv
6+
import json
7+
from collections.abc import Generator, Iterable
8+
from concurrent.futures import ThreadPoolExecutor, as_completed
9+
from gzip import GzipFile
10+
from io import BytesIO
11+
from types import SimpleNamespace
12+
13+
from botocore.client import BaseClient
14+
15+
from . import s3_client, s3_fetch, s3_ls_dir
16+
17+
18+
def find_latest_manifest(prefix: str, s3: BaseClient | None, **kw) -> str:
19+
"""
20+
Find latest manifest
21+
"""
22+
manifest_dirs = sorted(s3_ls_dir(prefix, s3=s3, **kw), reverse=True)
23+
for d in manifest_dirs:
24+
if d.endswith("/"):
25+
leaf = d.split("/")[-2]
26+
if leaf.endswith("Z"):
27+
return d + "manifest.json"
28+
return ""
29+
30+
31+
def retrieve_manifest_files(
32+
key: str, s3: BaseClient | None, schema: Iterable, **kw
33+
) -> Generator[SimpleNamespace]:
34+
"""
35+
Retrieve manifest file and return a namespace
36+
37+
namespace(
38+
Bucket=<bucket_name>,
39+
Key=<key_path>,
40+
LastModifiedDate=<date>,
41+
Size=<size>
42+
)
43+
"""
44+
bb = s3_fetch(key, s3=s3, **kw)
45+
gz = GzipFile(fileobj=BytesIO(bb), mode="r")
46+
csv_rdr = csv.reader(line.decode("utf8") for line in gz)
47+
for rec in csv_rdr:
48+
yield SimpleNamespace(**dict(zip(schema, rec)))
49+
50+
51+
def list_inventory(
52+
manifest: str,
53+
s3: BaseClient | None = None,
54+
prefix: str = "",
55+
suffix: str = "",
56+
contains: str = "",
57+
n_threads: int | None = None,
58+
**kw,
59+
) -> Generator[SimpleNamespace]:
60+
"""
61+
Returns a generator of inventory records
62+
63+
manifest -- s3:// url to manifest.json or a folder in which case latest one is chosen.
64+
65+
:param manifest:
66+
:param s3:
67+
:param prefix:
68+
:param suffix:
69+
:param contains:
70+
:param n_threads: number of threads, if not sent does not use threads
71+
:return: SimpleNamespace
72+
"""
73+
# TODO: refactor parallel execution part out of this function
74+
# pylint: disable=too-many-locals
75+
s3 = s3 or s3_client()
76+
77+
if manifest.endswith("/"):
78+
manifest = find_latest_manifest(manifest, s3, **kw)
79+
80+
info = json.loads(s3_fetch(manifest, s3=s3, **kw))
81+
82+
must_have_keys = {"fileFormat", "fileSchema", "files", "destinationBucket"}
83+
missing_keys = must_have_keys - set(info)
84+
if missing_keys:
85+
raise ValueError("Manifest file haven't parsed correctly")
86+
87+
if info["fileFormat"].upper() != "CSV":
88+
raise ValueError("Data is not in CSV format")
89+
90+
s3_prefix = "s3://" + info["destinationBucket"].split(":")[-1] + "/"
91+
data_urls = [s3_prefix + f["key"] for f in info["files"]]
92+
schema = tuple(info["fileSchema"].split(", "))
93+
94+
if n_threads:
95+
with ThreadPoolExecutor(max_workers=n_threads) as executor:
96+
tasks = [
97+
executor.submit(retrieve_manifest_files, key, s3, schema)
98+
for key in data_urls
99+
]
100+
101+
for future in as_completed(tasks):
102+
for namespace in future.result():
103+
key = namespace.Key
104+
if (
105+
key.startswith(prefix)
106+
and key.endswith(suffix)
107+
and contains in key
108+
):
109+
yield namespace
110+
else:
111+
for u in data_urls:
112+
for namespace in retrieve_manifest_files(u, s3, schema):
113+
key = namespace.Key
114+
if key.startswith(prefix) and key.endswith(suffix) and contains in key:
115+
yield namespace

datacube/utils/aws/queue.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
2+
#
3+
# Copyright (c) 2015-2025 ODC Contributors
4+
# SPDX-License-Identifier: Apache-2.0
5+
import itertools
6+
import json
7+
import logging
8+
from collections.abc import Generator, Iterable, Iterator, Mapping
9+
from typing import Any
10+
11+
import boto3
12+
from toolz import dicttoolz
13+
14+
_LOG: logging.Logger = logging.getLogger(__name__)
15+
16+
17+
class ODCSQSError(Exception):
18+
"""Something wrong with ODC/AWS SQS handling"""
19+
20+
21+
def redrive_queue(
22+
queue_name: str,
23+
to_queue_name: str | None = None,
24+
limit: int | None = None,
25+
dryrun: bool = False,
26+
max_wait: int = 5,
27+
messages_per_request: int = 10,
28+
) -> int:
29+
"""
30+
Redrive messages from one queue to another. Default usage is to define
31+
a "deadletter" queue, and pick its "alive" counterpart, and redrive
32+
messages to that queue.
33+
"""
34+
35+
def post_messages(to_queue, messages) -> list:
36+
message_bodies = [
37+
{"Id": str(n), "MessageBody": m.body} for n, m in enumerate(messages)
38+
]
39+
to_queue.send_messages(Entries=message_bodies)
40+
# Delete after sending, not before
41+
for message in messages:
42+
message.delete()
43+
return []
44+
45+
dead_queue = get_queue(queue_name)
46+
47+
if to_queue_name is not None:
48+
alive_queue = get_queue(to_queue_name)
49+
else:
50+
source_queues = list(dead_queue.dead_letter_source_queues.all())
51+
if len(source_queues) == 0:
52+
raise ODCSQSError(
53+
"No alive queue found for the deadletter queue, please check your configuration."
54+
)
55+
if len(source_queues) > 1:
56+
raise ODCSQSError(
57+
"Deadletter queue has more than one source, please specify the target queue name."
58+
)
59+
alive_queue = source_queues[0]
60+
61+
messages = get_messages(
62+
dead_queue,
63+
limit=limit,
64+
max_wait=max_wait,
65+
messages_per_request=messages_per_request,
66+
)
67+
count_messages = 0
68+
approx_n_messages = dead_queue.attributes.get("ApproximateNumberOfMessages")
69+
try:
70+
count_messages = int(approx_n_messages)
71+
except TypeError:
72+
_LOG.warning("Couldn't get approximate number of messages, setting to 0")
73+
74+
# If there are no messages then there's no work to do. If it's a dryrun, we
75+
# don't do anything either.
76+
if count_messages == 0 or dryrun:
77+
return count_messages
78+
79+
count = 0
80+
message_group = []
81+
for message in messages:
82+
# Assume this works. Exception handling elsewhere.
83+
message_group.append(message)
84+
count += 1
85+
86+
if count % 10 == 0:
87+
message_group = post_messages(alive_queue, message_group)
88+
89+
# Post the last few messages
90+
if len(message_group) > 0:
91+
_ = post_messages(alive_queue, message_group)
92+
93+
# Return the number of messages that were re-driven.
94+
return count
95+
96+
97+
def get_queue(queue_name: str):
98+
"""
99+
Return a queue resource by name, e.g., alex-really-secret-queue
100+
"""
101+
return boto3.resource("sqs").get_queue_by_name(QueueName=queue_name)
102+
103+
104+
def get_queues(prefix: str | None = None, contains: str | None = None) -> Generator:
105+
"""
106+
Return a list of sqs queues which the user is allowed to see and filtered by
107+
the parameters provided
108+
"""
109+
queues = boto3.resource("sqs").queues.all()
110+
if prefix is not None:
111+
queues = queues.filter(QueueNamePrefix=prefix)
112+
113+
if contains is not None:
114+
for queue in queues:
115+
if contains in queue.attributes.get("QueueArn").split(":")[-1]:
116+
yield queue
117+
else:
118+
yield from queues
119+
120+
121+
def publish_message(
122+
queue, message: str, message_attributes: Mapping[str, Any] | None = None
123+
) -> None:
124+
"""
125+
Publish a message to a queue resource. Message should be a JSON object dumped as a
126+
string.
127+
"""
128+
if message_attributes is None:
129+
message_attributes = {}
130+
queue.send_message(
131+
QueueUrl=queue.url, MessageBody=message, MessageAttributes=message_attributes
132+
)
133+
134+
135+
def publish_messages(queue, messages) -> None:
136+
"""
137+
Publish messages to a queue resource.
138+
"""
139+
queue.send_messages(Entries=messages)
140+
141+
142+
def _sqs_message_stream(queue, **kw) -> Generator:
143+
while True:
144+
messages = queue.receive_messages(**kw)
145+
if len(messages) == 0:
146+
return
147+
148+
yield from messages
149+
150+
151+
def get_messages(
152+
queue,
153+
limit: int | None = None,
154+
visibility_timeout: int = 60,
155+
message_attributes: Iterable[str] | None = None,
156+
max_wait: int = 1,
157+
messages_per_request: int = 1,
158+
**kw,
159+
) -> Iterator:
160+
"""
161+
Get messages from SQS queue resource. Returns a lazy sequence of message objects.
162+
163+
:queue: queue URL
164+
:param limit: the maximum number of messages to return from the queue (default to all)
165+
:param visibility_timeout: A period of time in seconds during which Amazon SQS prevents other consumers
166+
from receiving and processing the message
167+
:param message_attributes: Select what attributes to include in the messages, default All
168+
:param max_wait: Longest to wait in seconds before assuming queue is empty (default: 10)
169+
:param messages_per_request:
170+
:**kw: Any other arguments are passed to ``.receive_messages()`` boto3 call
171+
172+
:return: Iterator of sqs messages
173+
"""
174+
if message_attributes is None:
175+
message_attributes = ["All"]
176+
177+
messages = _sqs_message_stream(
178+
queue,
179+
VisibilityTimeout=visibility_timeout,
180+
MaxNumberOfMessages=messages_per_request,
181+
WaitTimeSeconds=max_wait,
182+
MessageAttributeNames=message_attributes,
183+
**kw,
184+
)
185+
186+
if limit is None:
187+
return messages
188+
189+
if limit < 1:
190+
raise ODCSQSError(f"Limit {limit} is not valid.")
191+
192+
return itertools.islice(messages, limit)
193+
194+
195+
def capture_attributes(action: str, stac: dict) -> dict:
196+
"""Determine SNS message attributes"""
197+
product = dicttoolz.get_in(["properties", "odc:product"], stac)
198+
date_time = dicttoolz.get_in(["properties", "datetime"], stac)
199+
maturity = dicttoolz.get_in(["properties", "dea:dataset_maturity"], stac)
200+
201+
if not product:
202+
product = stac.get("collection")
203+
204+
return {
205+
"action": {"DataType": "String", "StringValue": action},
206+
"product": {"DataType": "String", "StringValue": product},
207+
"datetime": {"DataType": "String", "StringValue": date_time},
208+
**(
209+
{"maturity": {"DataType": "String", "StringValue": maturity}}
210+
if maturity
211+
else {}
212+
),
213+
}
214+
215+
216+
def publish_to_topic(arn: str, action: str, stac: dict) -> None:
217+
"""
218+
Publish 'added' or 'archived' action to the provided sns topic
219+
"""
220+
boto3.client("sns").publish(
221+
TopicArn=arn,
222+
Message=json.dumps(stac),
223+
MessageAttributes=capture_attributes(action, stac),
224+
)

0 commit comments

Comments
 (0)