Skip to content

Parallelize reads for timeseries #199

@bkachroo

Description

@bkachroo

Continuing from coiled/feedback#229.

Previous Discussion

When multiple assets are in a single chunk, stackstac reads them serially:

# NOTE: when there are multiple assets, we _could_ parallelize these reads with our own threadpool.
# However, that would probably increase memory usage, since the internal, thread-local GDAL datasets
# would end up copied to even more threads.

Chunks are still processed in parallel, so you'll always have nthreads concurrent reads at once. Generally, when working in any parallel framework, you don't want to layer parallelism: https://stackoverflow.com/a/39425693/17100540. Because these reads are mostly (but not entirely!) network bound, I can see an argument for stackstac parallelizing them internally, but it still would likely increase memory usage. The general best practice is that things running in a Dask task should be single-threaded.

I understand what you're saying about layering parallelism. I thought asyncio was an option, but after reading more, it looks like rasterio/GDAL are incompatible with asyncio, so threads are the only option. However, it still seems like threads within a task would be the best option for a time-series use-case.

The additional memory burden of threads within a task would (in worst case) be the number of simultaneous reads * the extra data (to be thrown away) per read. Since I'm reading from a different image in each thread, I presume I don't need to copy any GDALDatasets. In my case the size of each chunk (50 images) is tiny (1MB), whereas each read task takes about 50sec. If we assume the extra data per read is on the same order of magnitude as the data I keep, each task would use 50MB max.
So threads within the task would get me 1sec per chunk, for 50MB per chunk, a 50X speedup without modifying memory significantly for me.

The other option is to keep each chunk/task single-threaded, and put lots of threads on dask. This would require a graph with 1 image per chunk, so 50X graph size, to get the same performance with 50 threads per worker, and the worker still uses 50X memory. However, this causes everything on my cluster to use 50 threads per worker, instead of this specific operation where I know the execution time is very large in comparison to the memory usage.
I've experimented with second option, and it causes memory pressure since other operations are also executing on the cluster (eg. zarr reads, rechunking, resampling). I get out-of-memory error even doing nthreads = 2 * ncores instead of nthreads = ncores. It also makes the task graph very large (>300K tasks).

A similar logic applies if I want a very large number of disjoint spatial points instead of a time-series.
Please let me know if I've got anything wrong here, or there's another better option.

Attempted Solution

I attempted to multithread the reads in fetch_raster_window:

def fetch_raster_window(

...

all_empty: bool = True
entry: ReaderTableEntry
thread_pool = ThreadPoolExecutor(len(reader_table))
futures = []
for index, entry in np.ndenumerate(reader_table):
    if entry:
        reader, asset_window = entry
        # Only read if the window we're fetching actually overlaps with the asset
        if windows.intersect(current_window, asset_window):

            # TODO when the Reader won't be rescaling, support passing `output` to avoid the copy?
            futures.append(thread_pool.submit(lambda: (index, reader.read(current_window))))

for future in as_completed(futures):
    index, data = future.result()
    if all_empty:
        # Turn `output` from a broadcast-trick array to a real array, so it's writeable
        if (
            np.isnan(data)
            if np.isnan(fill_value)
            else np.equal(data, fill_value)
        ).all():
            # Unless the data we just read is all empty anyway
            continue
        output = np.array(output)
        all_empty = False

    output[index] = data

thread_pool.shutdown()

Problem

Unfortunately, this produces an error:

...
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/to_dask.py", line 186, in <lambda>
  futures.append(thread_pool.submit(lambda: (index, reader.read(current_window))))
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 385, in read
  reader = self.dataset
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 381, in dataset
  self._dataset = self._open()
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 337, in _open
  raise RuntimeError(msg) from e
RuntimeError: Error opening 'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/18/H/WC/2017/10/S2B_18HWC_20171024_0_L2A/B04.tif': RasterioIOError('CURL error: getaddrinfo() thread failed to start')

What I Tried:

  • Increasing the number of max_threads in the thread_pool
  • Ensuring AWS credentials and requester pays were enabled in the gdal_env rasterio.session
  • Using Dask's synchronous scheduler

Do you have any suggestions for how to deal with this problem? Or is there a better approach for me to achieve parallelism here?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions