Skip to content

Commit 7b6b553

Browse files
committed
feat: save cog with dask to azure
1 parent 796a99c commit 7b6b553

File tree

5 files changed

+408
-68
lines changed

5 files changed

+408
-68
lines changed

odc/geo/cog/_az.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import base64
2+
from typing import Any, Dict, List, Union
3+
4+
from azure.storage.blob import BlobBlock, BlobServiceClient
5+
from dask.delayed import Delayed
6+
7+
from ._mpu import mpu_write
8+
from ._multipart import MultiPartUploadBase
9+
10+
11+
class AzureLimits:
12+
"""
13+
Common Azure writer settings.
14+
"""
15+
16+
@property
17+
def min_write_sz(self) -> int:
18+
# Azure minimum write size for blocks (default is 4 MiB)
19+
return 4 * (1 << 20)
20+
21+
@property
22+
def max_write_sz(self) -> int:
23+
# Azure maximum write size for blocks (default is 100 MiB)
24+
return 100 * (1 << 20)
25+
26+
@property
27+
def min_part(self) -> int:
28+
return 1
29+
30+
@property
31+
def max_part(self) -> int:
32+
# Azure supports up to 50,000 blocks per blob
33+
return 50_000
34+
35+
36+
class MultiPartUpload(AzureLimits, MultiPartUploadBase):
37+
def __init__(self, account_url: str, container: str, blob: str, credential: Any = None):
38+
"""
39+
Initialise Azure multipart upload.
40+
41+
:param account_url: URL of the Azure storage account.
42+
:param container: Name of the container.
43+
s :param blob: Name of the blob.
44+
:param credential: Authentication credentials (e.g., SAS token or key).
45+
"""
46+
self.account_url = account_url
47+
self.container = container
48+
self.blob = blob
49+
self.credential = credential
50+
51+
# Initialise Azure Blob service client
52+
self.blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
53+
self.container_client = self.blob_service_client.get_container_client(container)
54+
self.blob_client = self.container_client.get_blob_client(blob)
55+
56+
self.block_ids: List[str] = []
57+
58+
def initiate(self, **kwargs) -> str:
59+
"""
60+
Initialise the upload. No-op for Azure.
61+
"""
62+
return "azure-block-upload"
63+
64+
def write_part(self, part: int, data: bytes) -> Dict[str, Any]:
65+
"""
66+
Stage a block in Azure.
67+
68+
:param part: Part number (unique).
69+
:param data: Data for this part.
70+
:return: A dictionary containing part information.
71+
"""
72+
block_id = base64.b64encode(f"block-{part}".encode()).decode()
73+
self.blob_client.stage_block(block_id=block_id, data=data)
74+
self.block_ids.append(block_id)
75+
return {"PartNumber": part, "BlockId": block_id}
76+
77+
def finalise(self, parts: List[Dict[str, Any]]) -> str:
78+
"""
79+
Commit the block list to finalise the upload.
80+
81+
:param parts: List of uploaded parts metadata.
82+
:return: The ETag of the finalised blob.
83+
"""
84+
block_list = [BlobBlock(block_id=part["BlockId"]) for part in parts]
85+
self.blob_client.commit_block_list(block_list)
86+
return self.blob_client.get_blob_properties().etag
87+
88+
def cancel(self):
89+
"""
90+
Cancel the upload by clearing the block list.
91+
"""
92+
self.block_ids.clear()
93+
94+
@property
95+
def url(self) -> str:
96+
"""
97+
Get the Azure blob URL.
98+
99+
:return: The full URL of the blob.
100+
"""
101+
return self.blob_client.url
102+
103+
@property
104+
def started(self) -> bool:
105+
"""
106+
Check if any blocks have been staged.
107+
108+
:return: True if blocks have been staged, False otherwise.
109+
"""
110+
return bool(self.block_ids)
111+
112+
def writer(self, kw: Dict[str, Any], client: Any = None):
113+
"""
114+
Return a stateless writer compatible with Dask.
115+
"""
116+
return DelayedAzureWriter(self, kw)
117+
118+
def upload(
119+
self,
120+
chunks: Union["dask.bag.Bag", List["dask.bag.Bag"]],
121+
*,
122+
mk_header: Any = None,
123+
mk_footer: Any = None,
124+
user_kw: Dict[str, Any] = None,
125+
writes_per_chunk: int = 1,
126+
spill_sz: int = 20 * (1 << 20),
127+
client: Any = None,
128+
**kw,
129+
) -> "Delayed":
130+
"""
131+
Upload chunks to Azure Blob Storage with multipart uploads.
132+
133+
:param chunks: Dask bag of chunks to upload.
134+
:param mk_header: Function to create header data.
135+
:param mk_footer: Function to create footer data.
136+
:param user_kw: User-provided metadata for the upload.
137+
:param writes_per_chunk: Number of writes per chunk.
138+
:param spill_sz: Spill size for buffering data.
139+
:param client: Dask client for distributed execution.
140+
:return: A Dask delayed object representing the finalised upload.
141+
"""
142+
write = self.writer(kw, client=client) if spill_sz else None
143+
return mpu_write(
144+
chunks,
145+
write,
146+
mk_header=mk_header,
147+
mk_footer=mk_footer,
148+
user_kw=user_kw,
149+
writes_per_chunk=writes_per_chunk,
150+
spill_sz=spill_sz,
151+
dask_name_prefix="azure-finalise",
152+
)
153+
154+
155+
class DelayedAzureWriter(AzureLimits):
156+
"""
157+
Dask-compatible writer for Azure Blob Storage multipart uploads.
158+
"""
159+
160+
def __init__(self, mpu: MultiPartUpload, kw: Dict[str, Any]):
161+
"""
162+
Initialise the Azure writer.
163+
164+
:param mpu: MultiPartUpload instance.
165+
:param kw: Additional parameters for the writer.
166+
"""
167+
self.mpu = mpu
168+
self.kw = kw # Additional metadata like ContentType
169+
170+
def __call__(self, part: int, data: bytes) -> Dict[str, Any]:
171+
"""
172+
Write a single part to Azure Blob Storage.
173+
174+
:param part: Part number.
175+
:param data: Chunk data.
176+
:return: Metadata for the written part.
177+
"""
178+
return self.mpu.write_part(part, data)
179+
180+
def finalise(self, parts: List[Dict[str, Any]]) -> str:
181+
"""
182+
Finalise the upload by committing the block list.
183+
184+
:param parts: List of uploaded parts metadata.
185+
:return: ETag of the finalised blob.
186+
"""
187+
return self.mpu.finalise(parts)

odc/geo/cog/_multipart.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""
2+
Multipart upload interface.
3+
"""
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Any, Dict, List, Union
7+
8+
import dask.bag
9+
10+
11+
class MultiPartUploadBase(ABC):
12+
"""Abstract base class for multipart upload."""
13+
14+
@abstractmethod
15+
def initiate(self, **kwargs) -> str:
16+
"""Initiate a multipart upload and return an identifier."""
17+
pass
18+
19+
@abstractmethod
20+
def write_part(self, part: int, data: bytes) -> Dict[str, Any]:
21+
"""Upload a single part."""
22+
pass
23+
24+
@abstractmethod
25+
def finalise(self, parts: List[Dict[str, Any]]) -> str:
26+
"""Finalise the upload with a list of parts."""
27+
pass
28+
29+
@abstractmethod
30+
def cancel(self, other: str = ""):
31+
"""Cancel the multipart upload."""
32+
pass
33+
34+
@property
35+
@abstractmethod
36+
def url(self) -> str:
37+
"""Return the URL of the upload target."""
38+
pass
39+
40+
@property
41+
@abstractmethod
42+
def started(self) -> bool:
43+
"""Check if the multipart upload has been initiated."""
44+
pass
45+
46+
@abstractmethod
47+
def writer(self, kw: Dict[str, Any], *, client: Any = None) -> Any:
48+
"""
49+
Return a Dask-compatible writer for multipart uploads.
50+
51+
:param kw: Additional parameters for the writer.
52+
:param client: Dask client for distributed execution.
53+
"""
54+
pass
55+
56+
@abstractmethod
57+
def upload(
58+
self,
59+
chunks: Union["dask.bag.Bag", List["dask.bag.Bag"]],
60+
*,
61+
mk_header: Any = None,
62+
mk_footer: Any = None,
63+
user_kw: Dict[str, Any] = None,
64+
writes_per_chunk: int = 1,
65+
spill_sz: int = 20 * (1 << 20),
66+
client: Any = None,
67+
**kw,
68+
) -> Any:
69+
"""
70+
Orchestrate the upload process with multipart uploads.
71+
72+
:param chunks: Dask bag of chunks to upload.
73+
:param mk_header: Function to create header data.
74+
:param mk_footer: Function to create footer data.
75+
:param user_kw: User-provided metadata for the upload.
76+
:param writes_per_chunk: Number of writes per chunk.
77+
:param spill_sz: Spill size for buffering data.
78+
:param client: Dask client for distributed execution.
79+
:return: A Dask delayed object representing the finalised upload.
80+
"""
81+
pass

0 commit comments

Comments
 (0)