Skip to content

Commit 6da4943

Browse files
authored
feat: streaming geoparquet writes (#178)
Closes #174. cc @hrodmn Example usage, from the notebook update: ```python import itertools iterator = itertools.batched(items, 499) with rustac.geoparquet_writer(list(next(iterator)), "items-batched.parquet") as writer: for item_batch in iterator: writer.write(list(item_batch)) item_collection = await rustac.read("items-batched.parquet") ```
1 parent 1800edd commit 6da4943

File tree

13 files changed

+2192
-1595
lines changed

13 files changed

+2192
-1595
lines changed

Cargo.lock

Lines changed: 281 additions & 259 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,8 @@ tracing = "0.1.41"
4343

4444
[build-dependencies]
4545
cargo-lock = "10"
46+
47+
[patch.crates-io]
48+
stac = { git = 'https://github.com/stac-utils/rustac.git' }
49+
stac-io = { git = 'https://github.com/stac-utils/rustac.git' }
50+
rustac = { git = 'https://github.com/stac-utils/rustac.git' }

docs/api/parquet.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
description: Write stac-geoparquet
3+
---
4+
5+
# parquet
6+
7+
::: rustac.GeoparquetWriter
8+
::: rustac.geoparquet_writer

docs/api/stac.md

Lines changed: 0 additions & 8 deletions
This file was deleted.

docs/notebooks/stac-geoparquet.ipynb

Lines changed: 107 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222
},
2323
{
2424
"cell_type": "code",
25-
"execution_count": 68,
25+
"execution_count": 1,
2626
"id": "37025933",
2727
"metadata": {},
2828
"outputs": [
2929
{
3030
"name": "stdout",
3131
"output_type": "stream",
3232
"text": [
33-
"150.2 kB\n"
33+
"74.0 kB\n"
3434
]
3535
}
3636
],
@@ -89,7 +89,7 @@
8989
},
9090
{
9191
"cell_type": "code",
92-
"execution_count": 69,
92+
"execution_count": 2,
9393
"id": "164ecaee",
9494
"metadata": {},
9595
"outputs": [
@@ -134,6 +134,67 @@
134134
"print(json.dumps(item_collection[\"features\"][0], indent=2))"
135135
]
136136
},
137+
{
138+
"cell_type": "markdown",
139+
"id": "9325e2af",
140+
"metadata": {},
141+
"source": [
142+
"### Writing in chunks\n",
143+
"\n",
144+
"If you have a lot of items, you might not want to load them all into memory at once.\n",
145+
"We provide a context manager for iteratively writing **stac-geoparquet**.\n",
146+
"This example is a bit contrived, but you get the idea."
147+
]
148+
},
149+
{
150+
"cell_type": "code",
151+
"execution_count": 3,
152+
"id": "9045b4b4",
153+
"metadata": {},
154+
"outputs": [
155+
{
156+
"name": "stdout",
157+
"output_type": "stream",
158+
"text": [
159+
"Writing batch of 499 items\n",
160+
"Writing batch of 499 items\n",
161+
"Writing batch of 499 items\n",
162+
"Writing batch of 499 items\n",
163+
"Writing batch of 499 items\n",
164+
"Writing batch of 499 items\n",
165+
"Writing batch of 499 items\n",
166+
"Writing batch of 499 items\n",
167+
"Writing batch of 499 items\n",
168+
"Writing batch of 499 items\n",
169+
"Writing batch of 499 items\n",
170+
"Writing batch of 499 items\n",
171+
"Writing batch of 499 items\n",
172+
"Writing batch of 499 items\n",
173+
"Writing batch of 499 items\n",
174+
"Writing batch of 499 items\n",
175+
"Writing batch of 499 items\n",
176+
"Writing batch of 499 items\n",
177+
"Writing batch of 499 items\n",
178+
"Writing batch of 20 items\n",
179+
"Read back 10000 items\n"
180+
]
181+
}
182+
],
183+
"source": [
184+
"import itertools\n",
185+
"\n",
186+
"iterator = itertools.batched(items, 499)\n",
187+
"\n",
188+
"with rustac.geoparquet_writer(list(next(iterator)), \"items-batched.parquet\") as writer:\n",
189+
" for item_batch in iterator:\n",
190+
" print(f\"Writing batch of {len(item_batch)} items\")\n",
191+
" writer.write(list(item_batch))\n",
192+
"\n",
193+
"\n",
194+
"item_collection = await rustac.read(\"items-batched.parquet\")\n",
195+
"print(\"Read back\", len(item_collection[\"features\"]), \"items\")"
196+
]
197+
},
137198
{
138199
"cell_type": "markdown",
139200
"id": "2223d4ce",
@@ -162,16 +223,16 @@
162223
},
163224
{
164225
"cell_type": "code",
165-
"execution_count": 70,
226+
"execution_count": 4,
166227
"id": "870cbebb",
167228
"metadata": {},
168229
"outputs": [
169230
{
170231
"name": "stdout",
171232
"output_type": "stream",
172233
"text": [
173-
"That took 0.37 seconds to read\n",
174-
"That took 1.20 seconds to write\n",
234+
"That took 0.08 seconds to read\n",
235+
"That took 0.28 seconds to write\n",
175236
"9999 items have a 'foo' property\n"
176237
]
177238
}
@@ -219,46 +280,46 @@
219280
},
220281
{
221282
"cell_type": "code",
222-
"execution_count": 71,
283+
"execution_count": 5,
223284
"id": "0fabaa18",
224285
"metadata": {},
225286
"outputs": [
226287
{
227288
"data": {
228289
"text/plain": [
229-
"┌───────────┬──────────────────────────┬───────────────────────────┐\n",
230-
"│ id │ datetime │ geometry │\n",
231-
"│ varchar │ timestamp with time zone │ geometry\n",
232-
"├───────────┼──────────────────────────┼───────────────────────────┤\n",
233-
"│ item-0 │ 2023-12-31 17:00:00-07 │ POINT (-105.1019 40.1672)\n",
234-
"│ item-1 │ 2023-12-31 18:00:00-07 │ POINT (-105.1019 40.1672)\n",
235-
"│ item-2 │ 2023-12-31 19:00:00-07 │ POINT (-105.1019 40.1672)\n",
236-
"│ item-3 │ 2023-12-31 20:00:00-07 │ POINT (-105.1019 40.1672)\n",
237-
"│ item-4 │ 2023-12-31 21:00:00-07 │ POINT (-105.1019 40.1672)\n",
238-
"│ item-5 │ 2023-12-31 22:00:00-07 │ POINT (-105.1019 40.1672)\n",
239-
"│ item-6 │ 2023-12-31 23:00:00-07 │ POINT (-105.1019 40.1672)\n",
240-
"│ item-7 │ 2024-01-01 00:00:00-07 │ POINT (-105.1019 40.1672)\n",
241-
"│ item-8 │ 2024-01-01 01:00:00-07 │ POINT (-105.1019 40.1672)\n",
242-
"│ item-9 │ 2024-01-01 02:00:00-07 │ POINT (-105.1019 40.1672)\n",
243-
"│ · │ · │ ·\n",
244-
"│ · │ · │ ·\n",
245-
"│ · │ · │ ·\n",
246-
"│ item-9990 │ 2025-02-19 23:00:00-07 │ POINT (-105.1019 40.1672)\n",
247-
"│ item-9991 │ 2025-02-20 00:00:00-07 │ POINT (-105.1019 40.1672)\n",
248-
"│ item-9992 │ 2025-02-20 01:00:00-07 │ POINT (-105.1019 40.1672)\n",
249-
"│ item-9993 │ 2025-02-20 02:00:00-07 │ POINT (-105.1019 40.1672)\n",
250-
"│ item-9994 │ 2025-02-20 03:00:00-07 │ POINT (-105.1019 40.1672)\n",
251-
"│ item-9995 │ 2025-02-20 04:00:00-07 │ POINT (-105.1019 40.1672)\n",
252-
"│ item-9996 │ 2025-02-20 05:00:00-07 │ POINT (-105.1019 40.1672)\n",
253-
"│ item-9997 │ 2025-02-20 06:00:00-07 │ POINT (-105.1019 40.1672)\n",
254-
"│ item-9998 │ 2025-02-20 07:00:00-07 │ POINT (-105.1019 40.1672)\n",
255-
"│ item-9999 │ 2025-02-20 08:00:00-07 │ POINT (-105.1019 40.1672)\n",
256-
"├───────────┴──────────────────────────┴───────────────────────────┤\n",
257-
"│ ? rows (>9999 rows, 20 shown) 3 columns │\n",
258-
"└──────────────────────────────────────────────────────────────────┘"
290+
"┌───────────┬──────────────────────────┬────────────────────────────────────────────────────────────────────\n",
291+
"│ id │ datetime │ geometry \n",
292+
"│ varchar │ timestamp with time zone │ blob \n",
293+
"├───────────┼──────────────────────────┼────────────────────────────────────────────────────────────────────\n",
294+
"│ item-0 │ 2023-12-31 17:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
295+
"│ item-1 │ 2023-12-31 18:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
296+
"│ item-2 │ 2023-12-31 19:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
297+
"│ item-3 │ 2023-12-31 20:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
298+
"│ item-4 │ 2023-12-31 21:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
299+
"│ item-5 │ 2023-12-31 22:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
300+
"│ item-6 │ 2023-12-31 23:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
301+
"│ item-7 │ 2024-01-01 00:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
302+
"│ item-8 │ 2024-01-01 01:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
303+
"│ item-9 │ 2024-01-01 02:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
304+
"│ · │ · │ · \n",
305+
"│ · │ · │ · \n",
306+
"│ · │ · │ · \n",
307+
"│ item-9990 │ 2025-02-19 23:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
308+
"│ item-9991 │ 2025-02-20 00:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
309+
"│ item-9992 │ 2025-02-20 01:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
310+
"│ item-9993 │ 2025-02-20 02:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
311+
"│ item-9994 │ 2025-02-20 03:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
312+
"│ item-9995 │ 2025-02-20 04:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
313+
"│ item-9996 │ 2025-02-20 05:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
314+
"│ item-9997 │ 2025-02-20 06:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
315+
"│ item-9998 │ 2025-02-20 07:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
316+
"│ item-9999 │ 2025-02-20 08:00:00-07 │ \\x01\\x01\\x00\\x00\\x00\\x98\\xDD\\x93\\x87\\x85FZ\\xC0\\x13\\xF2A\\xCFf\\x15D@\n",
317+
"├───────────┴──────────────────────────┴────────────────────────────────────────────────────────────────────\n",
318+
"│ ? rows (>9999 rows, 20 shown) 3 columns │\n",
319+
"└───────────────────────────────────────────────────────────────────────────────────────────────────────────"
259320
]
260321
},
261-
"execution_count": 71,
322+
"execution_count": 5,
262323
"metadata": {},
263324
"output_type": "execute_result"
264325
}
@@ -282,7 +343,7 @@
282343
},
283344
{
284345
"cell_type": "code",
285-
"execution_count": 72,
346+
"execution_count": 6,
286347
"id": "c01c0ef5",
287348
"metadata": {},
288349
"outputs": [
@@ -297,7 +358,7 @@
297358
"└──────────────┘"
298359
]
299360
},
300-
"execution_count": 72,
361+
"execution_count": 6,
301362
"metadata": {},
302363
"output_type": "execute_result"
303364
}
@@ -318,7 +379,7 @@
318379
},
319380
{
320381
"cell_type": "code",
321-
"execution_count": 73,
382+
"execution_count": 7,
322383
"id": "18bc3a4b",
323384
"metadata": {},
324385
"outputs": [
@@ -327,10 +388,10 @@
327388
"evalue": "Binder Error: Referenced column \"foo\" not found in FROM clause!\nCandidate bindings: \"bbox\"",
328389
"output_type": "error",
329390
"traceback": [
330-
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
331-
"\u001b[0;31mBinderException\u001b[0m Traceback (most recent call last)",
332-
"Cell \u001b[0;32mIn[73], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mduckdb\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msql\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mselect id, foo from read_parquet([\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mitems.parquet\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m, \u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mnew-items.parquet\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m])\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n",
333-
"\u001b[0;31mBinderException\u001b[0m: Binder Error: Referenced column \"foo\" not found in FROM clause!\nCandidate bindings: \"bbox\""
391+
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
392+
"\u001b[31mBinderException\u001b[39m Traceback (most recent call last)",
393+
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[7]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[43mduckdb\u001b[49m\u001b[43m.\u001b[49m\u001b[43msql\u001b[49m\u001b[43m(\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mselect id, foo from read_parquet([\u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[33;43mitems.parquet\u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[33;43m, \u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[33;43mnew-items.parquet\u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[33;43m])\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n",
394+
"\u001b[31mBinderException\u001b[39m: Binder Error: Referenced column \"foo\" not found in FROM clause!\nCandidate bindings: \"bbox\""
334395
]
335396
}
336397
],
@@ -341,7 +402,7 @@
341402
],
342403
"metadata": {
343404
"kernelspec": {
344-
"display_name": ".venv",
405+
"display_name": "rustac-py",
345406
"language": "python",
346407
"name": "python3"
347408
},

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ nav:
3131
- arrow: api/arrow.md
3232
- duckdb: api/duckdb.md
3333
- migrate: api/migrate.md
34+
- parquet: api/parquet.md
3435
- read: api/read.md
3536
- search: api/search.md
3637
- stac: api/stac.md

python/rustac/__init__.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,42 @@
11
from __future__ import annotations
2+
from pathlib import Path
3+
from collections.abc import Generator
4+
from contextlib import contextmanager
5+
from typing import Any
26

37
from .rustac import *
48
from . import store
59

10+
@contextmanager
11+
def geoparquet_writer(items: list[dict[str, Any]], path: str, drop_invalid_attributes: bool = True) -> Generator[GeoparquetWriter]:
12+
"""Open a geoparquet writer in a context manager.
13+
14+
The items provided to the initial call will be used to build the geoparquet
15+
schema. All subsequent items must have the same schema.
16+
17+
The underlying parquet writer will group batches of items into row groups
18+
based upon it's default configuration; the row groups are _not_ determined
19+
by the size of the item lists passed to the writer.
20+
21+
Args:
22+
items: The STAC items
23+
path: The path for the stac-geoparquet file
24+
drop_invalid_attributes: If true, invalid attributes (e.g. an `id` in
25+
the `properties` field) will be dropped. If false, raise an error if
26+
an invalid attribute is encountered.
27+
28+
Examples:
29+
30+
>>> with geoparquet_writer(item_batches[0], "out.parquet") as w:
31+
... for items in item_batches[1:]:
32+
... w.write(items)
33+
...
34+
>>>
35+
"""
36+
writer = GeoparquetWriter(items, path, drop_invalid_attributes)
37+
yield writer
38+
writer.finish()
39+
640
__doc__ = rustac.__doc__
741
if hasattr(rustac, "__all__"):
842
__all__ = rustac.__all__

python/rustac/rustac.pyi

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,38 @@ from rustac.store import ObjectStore
1111

1212
AnyObjectStore = ObjectStore | ObstoreObjectStore
1313

14+
class GeoparquetWriter:
15+
"""A helper class to write geoparquet from batches of items."""
16+
17+
def __init__(
18+
self,
19+
items: list[dict[str, Any]],
20+
path: str,
21+
drop_invalid_attributes: bool = True,
22+
) -> None:
23+
"""Creates a new writer for the provided items and the path.
24+
25+
Args:
26+
items: The STAC items to write to geoparquet. The schema of these
27+
items will be used for the output file, and any additional items
28+
added to the writer need to have the same schema.
29+
path: The filesystem path to write the stac-geoparquet to.
30+
drop_invalid_attributes: Whether to drop invalid attributes in the
31+
items' `properties` (e.g. an additional `id` property). If false,
32+
raise an error instead.
33+
"""
34+
35+
def write(self, items: list[dict[str, Any]]) -> None:
36+
"""Writes more items to the geoparquet.
37+
38+
Args:
39+
items: The items to write. They must have the same schema as the
40+
items used to initialize the writer.
41+
"""
42+
43+
def finish(self) -> None:
44+
"""Finishes writing the stac-geoparquet file."""
45+
1446
class RustacError(Exception):
1547
"""A package-specific exception."""
1648

0 commit comments

Comments
 (0)