From d9a7ab572696f5902f505f12a00d0d5757dc1e71 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 2 Aug 2024 11:06:31 -0600 Subject: [PATCH 1/2] Auto rechunk to enable blockwise reduction Done when 1. `method` is None 2. Grouping and reducing by a 1D array We gate this on fractional change in number of chunks and change in size of largest chunk. Closes #359 --- flox/core.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index d8600b37..6caddb35 100644 --- a/flox/core.py +++ b/flox/core.py @@ -119,6 +119,15 @@ # _simple_combine. DUMMY_AXIS = -2 +# Thresholds below which we will automatically rechunk to blockwise if it makes sense +# 1. Fractional change in number of chunks after rechunking +BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 +# 2. Fractional change in max chunk size after rechunking +BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 0.25 +# 3. If input arrays have chunk size smaller than `dask.array.chunk-size`, +# then adjust chunks to meet that size first. +BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR = 1.25 + logger = logging.getLogger("flox") @@ -223,8 +232,11 @@ def identity(x: T) -> T: return x -def _issorted(arr: np.ndarray) -> bool: - return bool((arr[:-1] <= arr[1:]).all()) +def _issorted(arr: np.ndarray, ascending=True) -> bool: + if ascending: + return bool((arr[:-1] <= arr[1:]).all()) + else: + return bool((arr[:-1] >= arr[1:]).all()) def _is_arg_reduction(func: T_Agg) -> bool: @@ -325,6 +337,8 @@ def _get_optimal_chunks_for_groups(chunks, labels): Δl = abs(c - l) if c == 0 or newchunkidx[-1] > l: continue + f = f.item() # noqa + l = l.item() # noqa if Δf < Δl and f > newchunkidx[-1]: newchunkidx.append(f) else: @@ -716,7 +730,9 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray: +def rechunk_for_blockwise( + array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True +) -> tuple[T_MethodOpt, DaskArray]: """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarrassingly parallel group reductions. @@ -739,14 +755,43 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray Rechunked array """ - # TODO: this should be unnecessary? - labels = factorize_((labels,), axes=())[0] + + import dask + from dask.utils import parse_bytes + chunks = array.chunks[axis] - newchunks = _get_optimal_chunks_for_groups(chunks, labels) + if len(chunks) == 1: + return "blockwise", array + + factor = parse_bytes(dask.config.get("array.chunk-size")) / ( + math.prod(array.chunksize) * array.dtype.itemsize + ) + if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR: + new_constant_chunks = math.ceil(factor) * max(chunks) + q, r = divmod(array.shape[axis], new_constant_chunks) + new_input_chunks = (new_constant_chunks,) * q + (r,) + else: + new_input_chunks = chunks + + # FIXME: this should be unnecessary? + labels = factorize_((labels,), axes=())[0] + newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels) if newchunks == chunks: return array + + Δn = abs(len(newchunks) - len(new_input_chunks)) + if force or ( + (Δn / len(new_input_chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) + and ( + abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks) + < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ) + ): + logger.debug("Rechunking to enable blockwise.") + return "blockwise", array.rechunk({axis: newchunks}) else: - return array.rechunk({axis: newchunks}) + logger.debug("Didn't meet thresholds to do automatic rechunking for blockwise reductions.") + return None, array def reindex_numpy(array, from_: pd.Index, to: pd.Index, fill_value, dtype, axis: int): @@ -2712,6 +2757,17 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) + if ( + method is None + and is_duck_dask_array(array) + and not any_by_dask + and by_.ndim == 1 + and _issorted(by_, ascending=True) + ): + # Let's try rechunking for sorted 1D by. + (single_axis,) = axis_ + method, array = rechunk_for_blockwise(array, single_axis, by_, force=False) + is_first_last = _is_first_last_reduction(func) if is_first_last: if has_dask and nax != 1: @@ -2899,7 +2955,7 @@ def groupby_reduce( # if preferred method is already blockwise, no need to rechunk if preferred_method != "blockwise" and method == "blockwise" and by_.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by_) + _, array = rechunk_for_blockwise(array, axis=-1, labels=by_) result, groups = partial_agg( array=array, From c46223d7b7b94570553875a27af8a723fb689b99 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 17 Jun 2025 10:33:10 -0600 Subject: [PATCH 2/2] Small fix. --- flox/core.py | 2 +- flox/xarray.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 6caddb35..c444d277 100644 --- a/flox/core.py +++ b/flox/core.py @@ -777,7 +777,7 @@ def rechunk_for_blockwise( labels = factorize_((labels,), axes=())[0] newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels) if newchunks == chunks: - return array + return "blockwise", array Δn = abs(len(newchunks) - len(new_input_chunks)) if force or ( diff --git a/flox/xarray.py b/flox/xarray.py index 76f7dcc0..ade30f5d 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import toolz import xarray as xr from packaging.version import Version @@ -589,7 +590,7 @@ def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_Data DataArray or Dataset Xarray object with rechunked arrays. """ - return _rechunk(rechunk_array_for_blockwise, obj, dim, labels) + return _rechunk(toolz.compose(toolz.last, rechunk_array_for_blockwise), obj, dim, labels) def _rechunk(func, obj, dim, labels, **kwargs):