-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add asynchronous load method #10327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add asynchronous load method #10327
Changes from 129 commits
01e7518
83e553b
e44326d
4e4eeb0
d858059
d377780
3132f6a
900eef5
4c4462f
5b9b749
fadb953
57d9d23
11170fc
0b8fa41
f769f85
4eef318
29242a4
e6b3b3b
3ceab60
071c35a
29374f9
ab12bb8
62aa39d
dfe8bf7
a906dec
629ab31
7e9ae0f
d288351
e0731a0
9b41e78
67ba26a
9344e2e
f8f8563
30ce9be
5d15bbd
1f02de1
2342b50
b6d4a82
2079d7e
48e4534
cca7589
dfe9b87
84099f3
ab000c8
a8b7b46
82c7654
5eacdb0
9f33c09
093bf50
4073a24
842a06c
e19ab55
b9e8e06
8bc7bea
6c47e3f
a86f646
884ce13
a43af86
17d7a0e
d824a2d
6a13611
d79ed54
1da3359
dded9e0
4c347ad
b4ed8ee
0b55247
4018e28
6da81ce
7972164
618424a
cd97481
75abdec
7409372
1f79034
9881e8d
1197798
642fd48
a5a44f5
f22b56b
ebfede5
e882914
87c7fcb
67c77cc
0d4bb0f
ed2c808
df32020
aba2917
d3e6a64
4570aed
ddde08e
0ec670e
a3a3b62
b747f1c
a28a6a9
577cc72
a68579f
acc5c94
7776d41
0b1ebb5
df09780
84f8e30
19090b0
49416db
2ed8455
a8a2860
ef6afdf
de98308
e32ea13
da2d43c
ac3127f
d46fc3f
cc253c7
78c9116
a727ecb
959edc2
9b7afc2
b4ef26f
a7918e4
199d50a
1cbe913
46d9414
b4a5a90
c5bed0d
bc1fe4e
10f7e61
8416f00
4edd503
fcb2c11
4760574
8bce3bb
37be891
3bd7b8a
dfaac7e
d0a129a
f30a3a0
432bbd5
4f40792
cf1d127
54ab88c
dda58bf
02d661d
a074a25
f8f5e82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
"coveralls", | ||
"pip", | ||
"pytest", | ||
"pytest-asyncio", | ||
"pytest-cov", | ||
"pytest-env", | ||
"pytest-mypy-plugins", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import copy | ||
import datetime | ||
import io | ||
|
@@ -539,24 +540,50 @@ def load(self, **kwargs) -> Self: | |
dask.compute | ||
""" | ||
# access .data to coerce everything to numpy or dask arrays | ||
lazy_data = { | ||
chunked_data = { | ||
k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) | ||
} | ||
if lazy_data: | ||
chunkmanager = get_chunked_array_type(*lazy_data.values()) | ||
if chunked_data: | ||
chunkmanager = get_chunked_array_type(*chunked_data.values()) | ||
|
||
# evaluate all the chunked arrays simultaneously | ||
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( | ||
*lazy_data.values(), **kwargs | ||
*chunked_data.values(), **kwargs | ||
) | ||
|
||
for k, data in zip(lazy_data, evaluated_data, strict=False): | ||
for k, data in zip(chunked_data, evaluated_data, strict=False): | ||
self.variables[k].data = data | ||
|
||
# load everything else sequentially | ||
for k, v in self.variables.items(): | ||
if k not in lazy_data: | ||
v.load() | ||
[v.load() for k, v in self.variables.items() if k not in chunked_data] | ||
|
||
return self | ||
|
||
async def load_async(self, **kwargs) -> Self: | ||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# TODO refactor this to pull out the common chunked_data codepath | ||
dcherian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# this blocks on chunked arrays but not on lazily indexed arrays | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI dask has async compute but it seems hard to work in here :) |
||
|
||
# access .data to coerce everything to numpy or dask arrays | ||
chunked_data = { | ||
k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) | ||
} | ||
if chunked_data: | ||
chunkmanager = get_chunked_array_type(*chunked_data.values()) | ||
|
||
# evaluate all the chunked arrays simultaneously | ||
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( | ||
*chunked_data.values(), **kwargs | ||
) | ||
|
||
for k, data in zip(chunked_data, evaluated_data, strict=False): | ||
self.variables[k].data = data | ||
|
||
# load everything else concurrently | ||
coros = [ | ||
v.load_async() for k, v in self.variables.items() if k not in chunked_data | ||
] | ||
await asyncio.gather(*coros) | ||
Comment on lines
+622
to
+626
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could actually do this same thing inside of the synchronous
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should rate-limite all async def async_gather(*coros, concurrency: Optional[int] = None, return_exceptions: bool = False) -> list[Any]:
"""Execute a gather while limiting the number of concurrent tasks.
Args:
coros: coroutines
list of coroutines to execute
concurrency: int
concurrency limit
if None, defaults to config_obj.get('async.concurrency', 4)
if <= 0, no concurrency limit
"""
if concurrency is None:
concurrency = int(config_obj.get("async.concurrency", 4))
if concurrency > 0:
# if concurrency > 0, we use a semaphore to limit the number of concurrent coroutines
semaphore = asyncio.Semaphore(concurrency)
async def sem_coro(coro):
async with semaphore:
return await coro
results = await asyncio.gather(*(sem_coro(c) for c in coros), return_exceptions=return_exceptions)
else:
results = await asyncio.gather(*coros, return_exceptions=return_exceptions)
return results There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arguably that should be left to the underlying storage layer. Zarr already has its own rate limiting. Why introduce this additional complexity and configuration parameter in Xarray? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does zarr rate-limit per call or globally though? If it's rate-limited per call, and we make lots of concurrent calls from the xarray API, it will exceed the intended rate set in zarr... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not 100% on what Zarr will do but this will rate limit across Xarray variables. We will undoubtedly want to offer control here, even if the default is None for a start. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #10622 for a more general discussion on this problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's document in the docstrings for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Documented in dfaac7e |
||
|
||
return self | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.