From cc2bd4515259336a4476378c88e4bc1e6a0bb42d Mon Sep 17 00:00:00 2001 From: Nitin75408 Date: Wed, 11 Feb 2026 21:41:42 +0530 Subject: [PATCH 1/4] Fixed open_mfdataset hangs indefinitely --- xarray/backends/api.py | 26 ++++++++------ xarray/backends/file_manager.py | 5 +++ xarray/backends/h5netcdf_.py | 43 ++++++++++++++++++---- xarray/tests/test_distributed.py | 61 ++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 17 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index b93c7d517ac..1323a740ae8 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -140,6 +140,15 @@ def _multi_file_closer(closers): closer() +def _preprocess_mfdataset(ds: Dataset, preprocess: Callable[[Dataset], Dataset]) -> Dataset: + # Preserve the underlying file closer if preprocess returns a Dataset without one. + # This keeps resource cleanup reliable while allowing arbitrary preprocess functions. + processed_ds = preprocess(ds) + if processed_ds._close is None and ds._close is not None: + processed_ds.set_close(ds._close) + return processed_ds + + def load_dataset(filename_or_obj: T_PathFileOrDataStore, **kwargs) -> Dataset: """Open, load into memory, and close a Dataset from a file or file-like object. @@ -1617,14 +1626,10 @@ class (a subclass of ``BackendEntrypoint``) can also be used. if parallel: import dask - # wrap the open_dataset, getattr, and preprocess with delayed + # wrap open_dataset and preprocess with delayed open_ = dask.delayed(open_dataset) - getattr_ = dask.delayed(getattr) - if preprocess is not None: - preprocess = dask.delayed(preprocess) else: open_ = open_dataset - getattr_ = getattr if errors not in ("raise", "warn", "ignore"): raise ValueError( @@ -1652,14 +1657,15 @@ class (a subclass of ``BackendEntrypoint``) can also be used. combined_ids_paths = _infer_concat_order_from_positions(paths) ids = list(combined_ids_paths.keys()) - closers = [getattr_(ds, "_close") for ds in datasets] if preprocess is not None: - datasets = [preprocess(ds) for ds in datasets] + datasets = [_preprocess_mfdataset(ds, preprocess) for ds in datasets] if parallel: - # calling compute here will return the datasets/file_objs lists, - # the underlying datasets will still be stored as dask arrays - datasets, closers = dask.compute(datasets, closers) + # calling compute here will return the list of datasets; the underlying + # arrays remain lazy and dask-backed. + (datasets,) = dask.compute(datasets) + + closers = [ds._close for ds in datasets if ds._close is not None] # Combine all datasets, closing them in case of a ValueError try: diff --git a/xarray/backends/file_manager.py b/xarray/backends/file_manager.py index e6f94f93668..0d661c550d6 100644 --- a/xarray/backends/file_manager.py +++ b/xarray/backends/file_manager.py @@ -1,6 +1,7 @@ from __future__ import annotations import atexit +import pickle import threading import uuid import warnings @@ -422,6 +423,10 @@ def __del__(self) -> None: def __getstate__(self): # file is intentionally omitted: we want to open it again opener = _get_none if self._closed else self._opener + # Fail fast if opener arguments are not serializable. Without this guard, + # distributed execution can block while attempting to serialize delayed + # tasks that capture unpickleable file-like handles. + pickle.dumps((self._args, self._kwargs)) return (opener, self._args, self._mode, self._lock, self._kwargs) def __setstate__(self, state) -> None: diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index a838c2798a5..7851a3e96be 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -3,7 +3,7 @@ import functools import io import os -from collections.abc import Iterable +from collections.abc import Iterable, Mapping from typing import TYPE_CHECKING, Any, Self import numpy as np @@ -232,12 +232,20 @@ def open( else: lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) - manager_cls = ( - CachingFileManager - if isinstance(filename, str) and not is_remote_uri(filename) - else PickleableFileManager - ) - manager = manager_cls(h5netcdf.File, filename, mode=mode, kwargs=kwargs) + if isinstance(filename, str) and not is_remote_uri(filename): + manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) + elif mode == "r" and _is_fsspec_file_obj(filename): + # Reopen fsspec-backed files from fs/path instead of serializing a live + # file handle across distributed workers. + manager = PickleableFileManager( + _open_h5netcdf_from_fsspec, + filename.fs, + filename.path, + mode=mode, + kwargs={"h5netcdf_kwargs": kwargs}, + ) + else: + manager = PickleableFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) return cls( manager, @@ -465,6 +473,27 @@ def _normalize_filename_or_obj( return _normalize_path(filename_or_obj) +def _is_fsspec_file_obj(obj: Any) -> bool: + fs = getattr(obj, "fs", None) + path = getattr(obj, "path", None) + return fs is not None and path is not None and callable(getattr(fs, "open", None)) + + +def _open_h5netcdf_from_fsspec( + fs: Any, + path: str, + *, + mode: str = "r", + h5netcdf_kwargs: Mapping[str, Any] | None = None, +): + import h5netcdf + + file_mode = "rb" if mode == "r" else mode + file_obj = fs.open(path, mode=file_mode) + kwargs = {} if h5netcdf_kwargs is None else dict(h5netcdf_kwargs) + return h5netcdf.File(file_obj, mode=mode, **kwargs) + + class H5netcdfBackendEntrypoint(BackendEntrypoint): """ Backend for netCDF files based on the h5netcdf package. diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index c802fb65014..561b427571d 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -2,6 +2,7 @@ from __future__ import annotations +import io import pickle from typing import TYPE_CHECKING, Any @@ -40,6 +41,7 @@ has_netCDF4, has_scipy, requires_cftime, + requires_h5netcdf, requires_netCDF4, requires_zarr, ) @@ -176,6 +178,65 @@ def test_open_mfdataset_multiple_files_parallel_distributed(parallel, tmp_path): assert_identical(tf["test"], da) +@requires_h5netcdf +def test_open_mfdataset_file_like_parallel_distributed_h5netcdf(): + time = np.arange(20) + x = np.arange(4) + data = np.arange(80).reshape(20, 4) + da = xr.DataArray(data, coords={"time": time, "x": x}, dims=("time", "x"), name="v") + + file_objs = [] + for i in range(0, 20, 10): + ds = da.isel(time=slice(i, i + 10)).to_dataset() + file_content = ds.to_netcdf(engine="h5netcdf") + file_objs.append(io.BytesIO(file_content)) + + with cluster() as (s, [_a, _b]): + with Client(s["address"]): + with xr.open_mfdataset( + file_objs, + engine="h5netcdf", + parallel=True, + concat_dim="time", + combine="nested", + ) as tf: + assert_identical(tf["v"], da) + + +@requires_h5netcdf +@pytest.mark.timeout(30) +def test_open_mfdataset_parallel_distributed_h5netcdf_fsspec_file_objects(tmp_path): + fsspec = pytest.importorskip("fsspec") + + time = np.arange(20) + x = np.arange(4) + data = np.arange(80).reshape(20, 4) + da = xr.DataArray(data, coords={"time": time, "x": x}, dims=("time", "x"), name="v") + + paths = [] + for i in range(0, 20, 10): + path = tmp_path / f"chunk_{i}.nc" + da.isel(time=slice(i, i + 10)).to_dataset().to_netcdf(path, engine="h5netcdf") + paths.append(path) + + fs = fsspec.filesystem("file") + with fs.open(str(paths[0]), "rb") as f0, fs.open(str(paths[1]), "rb") as f1: + # Regression test for GH10807: + # the buggy implementation builds delayed tasks that serialize closer + # callables extracted from worker-side datasets. With h5netcdf and + # file-like inputs this can block indefinitely on distributed. + with cluster() as (s, [_a, _b]): + with Client(s["address"]): + with xr.open_mfdataset( + [f0, f1], + engine="h5netcdf", + parallel=True, + concat_dim="time", + combine="nested", + ) as tf: + assert_identical(tf["v"], da) + + # TODO: move this to test_backends.py @requires_cftime @requires_netCDF4 From 31a43e47a5db0e9cd14fd1501a03a8ea0852ee45 Mon Sep 17 00:00:00 2001 From: Nitin75408 Date: Wed, 11 Feb 2026 22:07:47 +0530 Subject: [PATCH 2/4] Fixed open_mfdataset hangs indefinitely --- doc/whats-new.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 2809ea9002d..c14c2de4b18 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -53,6 +53,9 @@ Bug Fixes :py:func:`open_dataset` is called with a non-existent local file path (:issue:`10896`). By `Kristian KollsgÄrd `_. +- Fix a regression where :py:func:`open_mfdataset` could hang indefinitely with + ``engine="h5netcdf"`` and ``parallel=True`` on distributed schedulers when + opening file-like objects from remote filesystems (:issue:`10807`). Documentation ~~~~~~~~~~~~~ From 92eec7f64aec5171e1b8df253f56765380742837 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 11 Feb 2026 16:39:00 +0000 Subject: [PATCH 3/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/backends/api.py | 4 +++- xarray/backends/h5netcdf_.py | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 1323a740ae8..587a372cb59 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -140,7 +140,9 @@ def _multi_file_closer(closers): closer() -def _preprocess_mfdataset(ds: Dataset, preprocess: Callable[[Dataset], Dataset]) -> Dataset: +def _preprocess_mfdataset( + ds: Dataset, preprocess: Callable[[Dataset], Dataset] +) -> Dataset: # Preserve the underlying file closer if preprocess returns a Dataset without one. # This keeps resource cleanup reliable while allowing arbitrary preprocess functions. processed_ds = preprocess(ds) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 7851a3e96be..3a5f3d335fb 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -233,7 +233,9 @@ def open( lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) if isinstance(filename, str) and not is_remote_uri(filename): - manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) + manager = CachingFileManager( + h5netcdf.File, filename, mode=mode, kwargs=kwargs + ) elif mode == "r" and _is_fsspec_file_obj(filename): # Reopen fsspec-backed files from fs/path instead of serializing a live # file handle across distributed workers. @@ -245,7 +247,9 @@ def open( kwargs={"h5netcdf_kwargs": kwargs}, ) else: - manager = PickleableFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) + manager = PickleableFileManager( + h5netcdf.File, filename, mode=mode, kwargs=kwargs + ) return cls( manager, From d9f99f3d52f3fc91274acab595482cfd50d58d71 Mon Sep 17 00:00:00 2001 From: Nitin75408 Date: Wed, 11 Feb 2026 22:35:07 +0530 Subject: [PATCH 4/4] Fixed open_mfdataset hangs indefinitely --- xarray/backends/h5netcdf_.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 3a5f3d335fb..e2dd4a88ff6 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -232,6 +232,7 @@ def open( else: lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) + manager: FileManager[Any] if isinstance(filename, str) and not is_remote_uri(filename): manager = CachingFileManager( h5netcdf.File, filename, mode=mode, kwargs=kwargs