-
Notifications
You must be signed in to change notification settings - Fork 52
Description
Continuing from coiled/feedback#229.
Previous Discussion
When multiple assets are in a single chunk, stackstac reads them serially:
stackstac/stackstac/to_dask.py
Lines 180 to 182 in d53f3dc
# NOTE: when there are multiple assets, we _could_ parallelize these reads with our own threadpool. # However, that would probably increase memory usage, since the internal, thread-local GDAL datasets # would end up copied to even more threads. Chunks are still processed in parallel, so you'll always have
nthreadsconcurrent reads at once. Generally, when working in any parallel framework, you don't want to layer parallelism: https://stackoverflow.com/a/39425693/17100540. Because these reads are mostly (but not entirely!) network bound, I can see an argument for stackstac parallelizing them internally, but it still would likely increase memory usage. The general best practice is that things running in a Dask task should be single-threaded.
I understand what you're saying about layering parallelism. I thought asyncio was an option, but after reading more, it looks like rasterio/GDAL are incompatible with asyncio, so threads are the only option. However, it still seems like threads within a task would be the best option for a time-series use-case.
The additional memory burden of threads within a task would (in worst case) be the number of simultaneous reads * the extra data (to be thrown away) per read. Since I'm reading from a different image in each thread, I presume I don't need to copy any GDALDatasets. In my case the size of each chunk (50 images) is tiny (1MB), whereas each read task takes about 50sec. If we assume the extra data per read is on the same order of magnitude as the data I keep, each task would use 50MB max.
So threads within the task would get me 1sec per chunk, for 50MB per chunk, a 50X speedup without modifying memory significantly for me.The other option is to keep each chunk/task single-threaded, and put lots of threads on dask. This would require a graph with 1 image per chunk, so 50X graph size, to get the same performance with 50 threads per worker, and the worker still uses 50X memory. However, this causes everything on my cluster to use 50 threads per worker, instead of this specific operation where I know the execution time is very large in comparison to the memory usage.
I've experimented with second option, and it causes memory pressure since other operations are also executing on the cluster (eg. zarr reads, rechunking, resampling). I get out-of-memory error even doing nthreads = 2 * ncores instead of nthreads = ncores. It also makes the task graph very large (>300K tasks).A similar logic applies if I want a very large number of disjoint spatial points instead of a time-series.
Please let me know if I've got anything wrong here, or there's another better option.
Attempted Solution
I attempted to multithread the reads in fetch_raster_window:
def fetch_raster_window(
...
all_empty: bool = True
entry: ReaderTableEntry
thread_pool = ThreadPoolExecutor(len(reader_table))
futures = []
for index, entry in np.ndenumerate(reader_table):
if entry:
reader, asset_window = entry
# Only read if the window we're fetching actually overlaps with the asset
if windows.intersect(current_window, asset_window):
# TODO when the Reader won't be rescaling, support passing `output` to avoid the copy?
futures.append(thread_pool.submit(lambda: (index, reader.read(current_window))))
for future in as_completed(futures):
index, data = future.result()
if all_empty:
# Turn `output` from a broadcast-trick array to a real array, so it's writeable
if (
np.isnan(data)
if np.isnan(fill_value)
else np.equal(data, fill_value)
).all():
# Unless the data we just read is all empty anyway
continue
output = np.array(output)
all_empty = False
output[index] = data
thread_pool.shutdown()Problem
Unfortunately, this produces an error:
...
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/to_dask.py", line 186, in <lambda>
futures.append(thread_pool.submit(lambda: (index, reader.read(current_window))))
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 385, in read
reader = self.dataset
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 381, in dataset
self._dataset = self._open()
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 337, in _open
raise RuntimeError(msg) from e
RuntimeError: Error opening 'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/18/H/WC/2017/10/S2B_18HWC_20171024_0_L2A/B04.tif': RasterioIOError('CURL error: getaddrinfo() thread failed to start')What I Tried:
- Increasing the number of max_threads in the
thread_pool - Ensuring AWS credentials and requester pays were enabled in the
gdal_envrasterio.session - Using Dask's
synchronousscheduler
Do you have any suggestions for how to deal with this problem? Or is there a better approach for me to achieve parallelism here?