diff --git a/flox/core.py b/flox/core.py index d8600b37..c444d277 100644 --- a/flox/core.py +++ b/flox/core.py @@ -119,6 +119,15 @@ # _simple_combine. DUMMY_AXIS = -2 +# Thresholds below which we will automatically rechunk to blockwise if it makes sense +# 1. Fractional change in number of chunks after rechunking +BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 +# 2. Fractional change in max chunk size after rechunking +BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 0.25 +# 3. If input arrays have chunk size smaller than `dask.array.chunk-size`, +# then adjust chunks to meet that size first. +BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR = 1.25 + logger = logging.getLogger("flox") @@ -223,8 +232,11 @@ def identity(x: T) -> T: return x -def _issorted(arr: np.ndarray) -> bool: - return bool((arr[:-1] <= arr[1:]).all()) +def _issorted(arr: np.ndarray, ascending=True) -> bool: + if ascending: + return bool((arr[:-1] <= arr[1:]).all()) + else: + return bool((arr[:-1] >= arr[1:]).all()) def _is_arg_reduction(func: T_Agg) -> bool: @@ -325,6 +337,8 @@ def _get_optimal_chunks_for_groups(chunks, labels): Δl = abs(c - l) if c == 0 or newchunkidx[-1] > l: continue + f = f.item() # noqa + l = l.item() # noqa if Δf < Δl and f > newchunkidx[-1]: newchunkidx.append(f) else: @@ -716,7 +730,9 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray: +def rechunk_for_blockwise( + array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True +) -> tuple[T_MethodOpt, DaskArray]: """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarrassingly parallel group reductions. @@ -739,14 +755,43 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray Rechunked array """ - # TODO: this should be unnecessary? - labels = factorize_((labels,), axes=())[0] + + import dask + from dask.utils import parse_bytes + chunks = array.chunks[axis] - newchunks = _get_optimal_chunks_for_groups(chunks, labels) + if len(chunks) == 1: + return "blockwise", array + + factor = parse_bytes(dask.config.get("array.chunk-size")) / ( + math.prod(array.chunksize) * array.dtype.itemsize + ) + if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR: + new_constant_chunks = math.ceil(factor) * max(chunks) + q, r = divmod(array.shape[axis], new_constant_chunks) + new_input_chunks = (new_constant_chunks,) * q + (r,) + else: + new_input_chunks = chunks + + # FIXME: this should be unnecessary? + labels = factorize_((labels,), axes=())[0] + newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels) if newchunks == chunks: - return array + return "blockwise", array + + Δn = abs(len(newchunks) - len(new_input_chunks)) + if force or ( + (Δn / len(new_input_chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) + and ( + abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks) + < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ) + ): + logger.debug("Rechunking to enable blockwise.") + return "blockwise", array.rechunk({axis: newchunks}) else: - return array.rechunk({axis: newchunks}) + logger.debug("Didn't meet thresholds to do automatic rechunking for blockwise reductions.") + return None, array def reindex_numpy(array, from_: pd.Index, to: pd.Index, fill_value, dtype, axis: int): @@ -2712,6 +2757,17 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) + if ( + method is None + and is_duck_dask_array(array) + and not any_by_dask + and by_.ndim == 1 + and _issorted(by_, ascending=True) + ): + # Let's try rechunking for sorted 1D by. + (single_axis,) = axis_ + method, array = rechunk_for_blockwise(array, single_axis, by_, force=False) + is_first_last = _is_first_last_reduction(func) if is_first_last: if has_dask and nax != 1: @@ -2899,7 +2955,7 @@ def groupby_reduce( # if preferred method is already blockwise, no need to rechunk if preferred_method != "blockwise" and method == "blockwise" and by_.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by_) + _, array = rechunk_for_blockwise(array, axis=-1, labels=by_) result, groups = partial_agg( array=array, diff --git a/flox/xarray.py b/flox/xarray.py index 76f7dcc0..ade30f5d 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import toolz import xarray as xr from packaging.version import Version @@ -589,7 +590,7 @@ def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_Data DataArray or Dataset Xarray object with rechunked arrays. """ - return _rechunk(rechunk_array_for_blockwise, obj, dim, labels) + return _rechunk(toolz.compose(toolz.last, rechunk_array_for_blockwise), obj, dim, labels) def _rechunk(func, obj, dim, labels, **kwargs):