diff --git a/ci/environment-docs.yml b/ci/environment-docs.yml index 121c5094..a2a0b157 100644 --- a/ci/environment-docs.yml +++ b/ci/environment-docs.yml @@ -30,7 +30,7 @@ dependencies: - watermark - xarray-datatree >=0.0.9 - xarray >=2024.10 - - zarr >=2.12,<3.0 + - zarr <3.0|>=3.0.10 - furo >=2022.09.15 - pip: - git+https://github.com/ncar-xdev/ecgtools diff --git a/ci/environment-upstream-dev.yml b/ci/environment-upstream-dev.yml index 645ace32..620e03d0 100644 --- a/ci/environment-upstream-dev.yml +++ b/ci/environment-upstream-dev.yml @@ -37,7 +37,7 @@ dependencies: - scipy - xarray-datatree - xgcm - - zarr >=2.10,<3.0 + - zarr <3.0|>=3.0.10 - pip: - git+https://github.com/intake/intake.git - git+https://github.com/pydata/xarray.git diff --git a/ci/environment.yml b/ci/environment.yml index cd90304e..d70d3c28 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -34,5 +34,5 @@ dependencies: - scipy - xarray >=2024.10 - xarray-datatree - - zarr >=2.12,<3.0 + - zarr <3.0|>=3.0.10 # - pytest-icdiff diff --git a/docs/source/reference/esm-catalog-spec.md b/docs/source/reference/esm-catalog-spec.md index 58b11ae6..44c44d3b 100644 --- a/docs/source/reference/esm-catalog-spec.md +++ b/docs/source/reference/esm-catalog-spec.md @@ -85,11 +85,44 @@ The column names can optionally be associated with a controlled vocabulary, such An assets object describes the columns in the CSV file relevant for opening the actual data files. -| Element | Type | Description | -| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. | -| format | string | The data format. Valid values are `netcdf`, `zarr`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. | -| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. | +| Element | Type | Description | +| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. | +| format | string | The data format. Valid values are `netcdf`, `zarr`, `zarr2`, `zarr3`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. | +| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. | + +````{note} + Zarr v3 is built on asynchronous operations, and requires `xarray_open_kwargs` to contain the following dictionary fragment: + ```python +xarray_open_kwargs = { + "storage_options" : { + "remote_options" : { + "async": true, + ... + }, + ... + }, + ... +} + ``` + +In contrast, Zarr v2 is synchronous and instead requires: + + ```python +xarray_open_kwargs = { + "storage_options" : { + "remote_options" : { + "async": false, + ... + }, + ... + }, + ... +} + ``` +```` + +If `zarr2` or `zarr3` is specified in the `format` field, the `async` flag will be set automatically. If you specify `zarr` as the format, you must set the `async` flag manually in the `xarray_open_kwargs`. ### Aggregation Control Object diff --git a/intake_esm/cat.py b/intake_esm/cat.py index cfa8d520..8f9e5c83 100644 --- a/intake_esm/cat.py +++ b/intake_esm/cat.py @@ -54,6 +54,8 @@ class AggregationType(str, enum.Enum): class DataFormat(str, enum.Enum): netcdf = 'netcdf' zarr = 'zarr' + zarr2 = 'zarr2' + zarr3 = 'zarr3' reference = 'reference' opendap = 'opendap' diff --git a/intake_esm/source.py b/intake_esm/source.py index 338f4a30..2181e6e1 100644 --- a/intake_esm/source.py +++ b/intake_esm/source.py @@ -9,7 +9,7 @@ from intake.source.base import DataSource, Schema from .cat import Aggregation, DataFormat -from .utils import OPTIONS +from .utils import OPTIONS, _set_async_flag class ConcatenationWarning(UserWarning): @@ -23,7 +23,7 @@ class ESMDataSourceError(Exception): def _get_xarray_open_kwargs(data_format, xarray_open_kwargs=None, storage_options=None): xarray_open_kwargs = (xarray_open_kwargs or {}).copy() _default_open_kwargs = { - 'engine': 'zarr' if data_format in {'zarr', 'reference'} else 'netcdf4', + 'engine': 'zarr' if data_format in {'zarr', 'zarr2', 'zarr3', 'reference'} else 'netcdf4', 'chunks': {}, 'backend_kwargs': {}, 'decode_timedelta': False, @@ -40,6 +40,8 @@ def _get_xarray_open_kwargs(data_format, xarray_open_kwargs=None, storage_option ): xarray_open_kwargs['backend_kwargs']['storage_options'] = {} or storage_options + xarray_open_kwargs = _set_async_flag(data_format, xarray_open_kwargs) + return xarray_open_kwargs diff --git a/intake_esm/utils.py b/intake_esm/utils.py index 5b8883ae..559b66ca 100644 --- a/intake_esm/utils.py +++ b/intake_esm/utils.py @@ -5,6 +5,13 @@ from collections import defaultdict import polars as pl +import zarr + +__all__ = [ + 'OPTIONS', + 'set_options', + '_set_async_flag', +] def show_versions(file=sys.stdout): # pragma: no cover @@ -57,6 +64,59 @@ def show_versions(file=sys.stdout): # pragma: no cover print(f'{k}: {stat}', file=file) +def _zarr_async() -> bool: + """ + Zarr went all async in version 3.0.0. This sets the async flag based on + the zarr version in storage options + """ + + return int(zarr.__version__.split('.')[0]) > 2 + + +def _set_async_flag(data_format: str, xarray_open_kwargs: dict) -> dict: + """ + If we have the data format set to either zarr2 or zarr3, the async flag in + `xarray_open_kwargs['storage_options']['remote_opetions']` is constrained to + be either False or True, respectively. + + Parameters + ---------- + data_format : str + + xarray_open_kwargs : dict + The xarray open kwargs dictionary that may contain storage options. + Returns + ------- + dict + The updated xarray open kwargs with the async flag set appropriately. + """ + if data_format not in {'zarr2', 'zarr3'}: + return xarray_open_kwargs + + storage_opts_template = { + 'backend_kwargs': {'storage_options': {'remote_options': {'asynchronous': _zarr_async()}}} + } + if ( + xarray_open_kwargs.get('backend_kwargs', {}) + .get('storage_options', {}) + .get('remote_options', None) + is not None + ): + xarray_open_kwargs['backend_kwargs']['storage_options']['remote_options'][ + 'asynchronous' + ] = _zarr_async() + elif xarray_open_kwargs.get('backend_kwargs', {}).get('storage_options', None) is not None: + xarray_open_kwargs['backend_kwargs']['storage_options'] = storage_opts_template[ + 'backend_kwargs' + ]['storage_options'] + elif xarray_open_kwargs.get('backend_kwargs', None) is not None: + xarray_open_kwargs['backend_kwargs'] = storage_opts_template['backend_kwargs'] + else: + xarray_open_kwargs = storage_opts_template + + return xarray_open_kwargs + + OPTIONS = { 'attrs_prefix': 'intake_esm_attrs', 'dataset_key': 'intake_esm_dataset_key', diff --git a/requirements.txt b/requirements.txt index b2c6f2c0..d8b6e72a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ pydantic>=2.0 pydap!=3.5.5 requests>=2.24.0 xarray>=2024.10 -zarr>=2.12 +# Allow zarr >2.12 or zarr 3.1.0+ +zarr>=2.12,!=3.0.* diff --git a/tests/sample-catalogs/cesm1-lens-zarr2.json b/tests/sample-catalogs/cesm1-lens-zarr2.json new file mode 100644 index 00000000..fd0d0b59 --- /dev/null +++ b/tests/sample-catalogs/cesm1-lens-zarr2.json @@ -0,0 +1,35 @@ +{ + "esmcat_version": "0.1.0", + "id": "sample-cesm1-lens-zarr2", + "description": "This is a sample ESM catalog for CESM1-LENS data in zarr v2 format", + "catalog_file": "./tests/sample-catalogs/cesm1-lens-aws-zarr.csv", + "attributes": [ + { + "column_name": "experiment", + "vocabulary": "" + }, + { + "column_name": "component", + "vocabulary": "" + }, + { + "column_name": "frequency", + "vocabulary": "" + }, + { "column_name": "variable", "vocabulary": "" } + ], + "assets": { + "column_name": "path", + "format": "zarr2" + }, + "aggregation_control": { + "variable_column_name": "variable", + "groupby_attrs": ["component", "experiment", "frequency"], + "aggregations": [ + { + "type": "union", + "attribute_name": "variable" + } + ] + } +} diff --git a/tests/sample-catalogs/cesm1-lens-zarr3.json b/tests/sample-catalogs/cesm1-lens-zarr3.json new file mode 100644 index 00000000..d3c5556b --- /dev/null +++ b/tests/sample-catalogs/cesm1-lens-zarr3.json @@ -0,0 +1,35 @@ +{ + "esmcat_version": "0.1.0", + "id": "sample-cesm1-lens-zarr3", + "description": "This is a sample ESM catalog for CESM1-LENS data in zarr v3 format", + "catalog_file": "./tests/sample-catalogs/cesm1-lens-aws-zarr.csv", + "attributes": [ + { + "column_name": "experiment", + "vocabulary": "" + }, + { + "column_name": "component", + "vocabulary": "" + }, + { + "column_name": "frequency", + "vocabulary": "" + }, + { "column_name": "variable", "vocabulary": "" } + ], + "assets": { + "column_name": "path", + "format": "zarr3" + }, + "aggregation_control": { + "variable_column_name": "variable", + "groupby_attrs": ["component", "experiment", "frequency"], + "aggregations": [ + { + "type": "union", + "attribute_name": "variable" + } + ] + } +} diff --git a/tests/test_cat.py b/tests/test_cat.py index 2975c905..71aaf04d 100644 --- a/tests/test_cat.py +++ b/tests/test_cat.py @@ -21,11 +21,19 @@ sample_pl_df, zarr_cat_aws_cesm, zarr_cat_pangeo_cmip6, + zarr_v2_cat, + zarr_v3_cat, ) @pytest.mark.parametrize( - 'column_name, format, format_column_name', [('test', 'zarr', None), ('test', 'netcdf', None)] + 'column_name, format, format_column_name', + [ + ('test', 'zarr', None), + ('test', 'zarr2', None), + ('test', 'zarr3', None), + ('test', 'netcdf', None), + ], ) def test_assets(column_name, format, format_column_name): a = Assets(column_name=column_name, format=format, format_column_name=format_column_name) @@ -53,6 +61,8 @@ def test_assets_mutually_exclusive(): cdf_cat_sample_cmip6_noagg, cdf_cat_sample_cesmle, multi_variable_cat, + zarr_v2_cat, + zarr_v3_cat, ], ) @pytest.mark.flaky(max_runs=3, min_passes=1) # Cold start related failures diff --git a/tests/test_source.py b/tests/test_source.py index 573a0713..d36fab58 100644 --- a/tests/test_source.py +++ b/tests/test_source.py @@ -15,6 +15,7 @@ _open_dataset, _update_attrs, ) +from intake_esm.utils import _zarr_async dask.config.set(scheduler='single-threaded') @@ -84,8 +85,12 @@ def test_open_dataset_kerchunk(kerchunk_file=kerchunk_file): xarray_open_kwargs = _get_xarray_open_kwargs( 'reference', dict(engine='zarr', consolidated=False), - storage_options={'remote_protocol': 's3', 'remote_options': {'anon': True}}, + storage_options={ + 'remote_protocol': 's3', + 'remote_options': {'anon': True, 'asynchronous': _zarr_async()}, + }, ) + ds = _open_dataset( data_format='reference', urlpath=kerchunk_file, diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..c96221c5 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,259 @@ +from unittest import mock + +import polars as pl +import pytest +from polars.testing import assert_frame_equal + +from intake_esm.utils import MinimalExploder, _set_async_flag + + +@pytest.mark.parametrize( + 'data_format, xarray_open_kwargs, expected', + [ + ('zarr', {}, {}), + ('netcdf', {}, {}), + ('reference', {}, {}), + ( + 'zarr2', + {}, + { + 'backend_kwargs': { + 'storage_options': {'remote_options': {'asynchronous': False}}, + } + }, + ), + ( + 'zarr3', + {}, + { + 'backend_kwargs': { + 'storage_options': {'remote_options': {'asynchronous': True}}, + } + }, + ), + ( + 'zarr2', + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': {}, + 'decode_timedelta': False, + }, + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': { + 'storage_options': {'remote_options': {'asynchronous': False}}, + }, + 'decode_timedelta': False, + }, + ), + ( + 'zarr3', + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': {}, + 'decode_timedelta': False, + }, + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': { + 'storage_options': {'remote_options': {'asynchronous': True}}, + }, + 'decode_timedelta': False, + }, + ), + ( + 'zarr2', + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': {'storage_options': {'remote_options': {'anon': True}}}, + 'decode_timedelta': False, + }, + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': { + 'storage_options': {'remote_options': {'anon': True, 'asynchronous': False}}, + }, + 'decode_timedelta': False, + }, + ), + ( + 'zarr3', + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': {'storage_options': {'remote_options': {'anon': True}}}, + 'decode_timedelta': False, + }, + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': { + 'storage_options': {'remote_options': {'anon': True, 'asynchronous': True}}, + }, + 'decode_timedelta': False, + }, + ), + ( + 'zarr3', + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': { + 'storage_options': { + 'remote_options': {'anon': True}, + 'remote_protocol': 's3', + } + }, + 'decode_timedelta': False, + }, + { + 'engine': 'zarr', + 'chunks': {}, + 'backend_kwargs': { + 'storage_options': { + 'remote_options': {'anon': True, 'asynchronous': True}, + 'remote_protocol': 's3', + } + }, + 'decode_timedelta': False, + }, + ), + ], +) +def test__set_async_flag(data_format, xarray_open_kwargs, expected): + with mock.patch('intake_esm.utils._zarr_async', return_value=data_format == 'zarr3'): + res = _set_async_flag(data_format, xarray_open_kwargs) + assert res == expected + + +def test_MinimalExploder_length_patterns(): + df = pl.DataFrame( + { + 'a': [['a', 'b'], ['c'], ['d', 'e', 'f']], + 'b': [['a'], ['b', 'c'], ['d', 'e', 'f']], + 'c': [['a', 'b', 'c'], ['d'], ['e', 'f']], + 'd': [[1, 2], [3], [4, 5, 6]], # Same as a but all numbers + 'e': ['first', 'second', 'third'], # not iterable`` + } + ) + + me = MinimalExploder(df) + + assert me.length_patterns == { + 'a': (2, 1, 3), + 'b': (1, 2, 3), + 'c': (3, 1, 2), + 'd': (2, 1, 3), + } + + +def test_MinimalExploder_explodable_groups(): + df = pl.DataFrame( + { + 'a': [['a', 'b'], ['c'], ['d', 'e', 'f']], + 'b': [['a'], ['b', 'c'], ['d', 'e', 'f']], + 'c': [['a', 'b', 'c'], ['d'], ['e', 'f']], + 'd': [[1, 2], [3], [4, 5, 6]], # Same as a but all numbers + 'e': ['first', 'second', 'third'], # not iterable`` + } + ) + + me = MinimalExploder(df) + + assert me.explodable_groups == [['a', 'd'], ['b'], ['c']] + + +def test_MinimalExploder_summary(): + df = pl.DataFrame( + { + 'a': [['a', 'b'], ['c'], ['d', 'e', 'f']], + 'b': [['a'], ['b', 'c'], ['d', 'e', 'f']], + 'c': [['a', 'b', 'c'], ['d'], ['e', 'f']], + 'd': [[1, 2], [3], [4, 5, 6]], # Same as a but all numbers + 'e': ['first', 'second', 'third'], # not iterable`` + } + ) + + me = MinimalExploder(df) + + assert me.summary == { + 'explodable_groups': 3, + 'explosion_operations_needed': 3, + 'groups': [ + ['a', 'd'], + ['b'], + ['c'], + ], + 'list_columns': 4, + 'total_columns': 5, + 'unique_patterns': 3, + } + + +def test_MinimalExploder_single_explode(): + """Ensure two columns with the same list lengths are only exploded once""" + df = pl.DataFrame( + { + 'a': [['a', 'b'], ['c'], ['d', 'e', 'f']], + 'b': [[1, 2], [3], [4, 5, 6]], # Same as a but all numbers + 'c': ['first', 'second', 'third'], + } + ) + + exploded_df = MinimalExploder(df)() + assert (len(exploded_df)) == 6 + + expected_df_1 = pl.DataFrame( + { + 'a': ['a', 'b', 'c', 'd', 'e', 'f'], + 'b': [1, 2, 3, 4, 5, 6], + 'c': ['first', 'first', 'second', 'third', 'third', 'third'], + } + ) + + assert_frame_equal(exploded_df, expected_df_1) + + +def test_MinimalExploder_double_explode(): + """ + Make sure that if we have two columns which have different column lengths, + we do explode both of them separately + """ + df = pl.DataFrame( + { + 'a': [[1, 2], [3, 4, 5]], + 'b': [['a', 'b', 'c'], ['d', 'e']], + 'c': ['first', 'second'], + } + ) + + expected_df = pl.DataFrame( + { + 'a': [1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5], + 'b': ['a', 'b', 'c', 'a', 'b', 'c', 'd', 'e', 'd', 'e', 'd', 'e'], + 'c': [ + 'first', + 'first', + 'first', + 'first', + 'first', + 'first', + 'second', + 'second', + 'second', + 'second', + 'second', + 'second', + ], + } + ) + + exploded_df = MinimalExploder(df)() + + assert_frame_equal(exploded_df, expected_df) diff --git a/tests/utils.py b/tests/utils.py index 94aeab0d..85826200 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,6 +20,8 @@ access_columns_with_iterables_cat = os.path.join( here, 'sample-catalogs/access-columns-with-iterables.json' ) +zarr_v2_cat = os.path.join(here, 'sample-catalogs/cesm1-lens-zarr2.json') +zarr_v3_cat = os.path.join(here, 'sample-catalogs/cesm1-lens-zarr2.json') sample_df = pd.DataFrame(