Skip to content

Commit 547ddd4

Browse files
committed
FIX: Address Copilot review comments in async zarr loading
- Fix typo in comment (rootshalgive -> root) - Add max_concurrency validation (raise ValueError if < 1) - Update open_store_async docstring to reflect actual behavior - Add semaphore bound to _iter_zarr_groups_async - Add create_default_indexes param to _open_groups_from_stores_async - Add semaphore bound to create_indexes_async in api.py - Fix whats-new.rst PR references (10742 -> 11149)
1 parent c204229 commit 547ddd4

File tree

3 files changed

+36
-17
lines changed

3 files changed

+36
-17
lines changed

doc/whats-new.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ New Features
1616

1717
- Added ``max_concurrency`` parameter to :py:func:`open_datatree` to control
1818
the maximum number of concurrent I/O operations when opening groups in parallel
19-
with the Zarr backend (:pull:`10742`).
19+
with the Zarr backend (:pull:`11149`).
2020
By `Alfonso Ladino <https://github.com/aladinor>`_.
2121

2222
Breaking Changes
@@ -153,7 +153,7 @@ Performance
153153
~~~~~~~~~~~
154154

155155
- Improve performance of :py:func:`open_datatree` for zarr stores by using async/concurrent
156-
loading of groups and indexes (:pull:`10742`).
156+
loading of groups and indexes (:pull:`11149`).
157157
By `Alfonso Ladino <https://github.com/aladinor>`_.
158158
- Add a fastpath to the backend plugin system for standard engines (:issue:`10178`, :pull:`10937`).
159159
By `Sam Levang <https://github.com/slevang>`_.

xarray/backends/api.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,17 @@ def _datatree_from_backend_datatree(
362362
async def create_indexes_async() -> dict[str, Dataset]:
363363
import asyncio
364364

365+
sem = asyncio.Semaphore(10)
366+
367+
async def _bounded_create_index(
368+
path: str, ds: Dataset,
369+
) -> tuple[str, Dataset]:
370+
async with sem:
371+
return await _create_index_for_node(path, ds)
372+
365373
results: dict[str, Dataset] = {}
366374
tasks = [
367-
_create_index_for_node(path, node.dataset)
375+
_bounded_create_index(path, node.dataset)
368376
for path, [node] in group_subtrees(backend_tree)
369377
]
370378
for fut in asyncio.as_completed(tasks):

xarray/backends/zarr.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -767,11 +767,13 @@ async def open_store_async(
767767
write_empty: bool | None = None,
768768
cache_members: bool = False,
769769
):
770-
"""Async version of open_store using flat group discovery.
770+
"""Async version of open_store using asynchronous group discovery.
771771
772-
This method uses store.list() to discover all groups in a single
773-
async call, which is significantly faster than recursive traversal
774-
for stores that support listing (like icechunk).
772+
This method discovers all groups by recursively traversing the store,
773+
issuing asynchronous directory listing calls (via store.list_dir when
774+
available) and per-directory store.get calls. As a result, group discovery
775+
may involve multiple asynchronous operations and round-trips, especially
776+
for large or deeply nested hierarchies.
775777
776778
Parameters
777779
----------
@@ -1960,6 +1962,7 @@ async def _open_groups_from_stores_async(
19601962
use_cftime=None,
19611963
decode_timedelta=None,
19621964
max_concurrency: int | None = None,
1965+
create_default_indexes: bool = True,
19631966
) -> dict[str, Dataset]:
19641967
"""Shared async core: open datasets from pre-opened stores concurrently.
19651968
@@ -1981,6 +1984,10 @@ async def _open_groups_from_stores_async(
19811984

19821985
if max_concurrency is None:
19831986
max_concurrency = 10
1987+
elif max_concurrency < 1:
1988+
raise ValueError(
1989+
f"max_concurrency must be None or an integer >= 1, got {max_concurrency!r}"
1990+
)
19841991
sem = asyncio.Semaphore(max_concurrency)
19851992

19861993
async def open_one(path_group: str, store) -> tuple[str, Dataset]:
@@ -1998,7 +2005,8 @@ async def open_one(path_group: str, store) -> tuple[str, Dataset]:
19982005
decode_timedelta=decode_timedelta,
19992006
)
20002007
# Create indexes concurrently
2001-
ds = await _maybe_create_default_indexes_async(ds)
2008+
if create_default_indexes:
2009+
ds = await _maybe_create_default_indexes_async(ds)
20022010
if group:
20032011
group_name = str(NodePath(path_group).relative_to(parent))
20042012
else:
@@ -2232,16 +2240,19 @@ async def discover_subgroups(dir_path: str) -> list[str]:
22322240
subdirs.append(subdir)
22332241

22342242
# Check each subdirectory in parallel
2243+
_sem = asyncio.Semaphore(10)
2244+
22352245
async def check_subdir(subdir: str) -> tuple[str | None, list[str]]:
22362246
"""Check if subdir is a group and discover its subgroups."""
2237-
if await is_zarr_group(subdir):
2238-
group_path = (
2239-
str(parent_nodepath / subdir) if subdir else str(parent_nodepath)
2240-
)
2241-
# Recursively find subgroups
2242-
sub_groups = await discover_subgroups(subdir)
2243-
return group_path, sub_groups
2244-
return None, []
2247+
async with _sem:
2248+
if await is_zarr_group(subdir):
2249+
group_path = (
2250+
str(parent_nodepath / subdir) if subdir else str(parent_nodepath)
2251+
)
2252+
# Recursively find subgroups
2253+
sub_groups = await discover_subgroups(subdir)
2254+
return group_path, sub_groups
2255+
return None, []
22452256

22462257
# Run all subdirectory checks in parallel
22472258
results = await asyncio.gather(*[check_subdir(sd) for sd in subdirs])
@@ -2253,7 +2264,7 @@ async def check_subdir(subdir: str) -> tuple[str | None, list[str]]:
22532264

22542265
return found_groups
22552266

2256-
# Start discovery from rootshalgive
2267+
# Start discovery from root
22572268
subgroups = await discover_subgroups("")
22582269
group_paths.extend(subgroups)
22592270

0 commit comments

Comments
 (0)