Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/environment-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies:
- watermark
- xarray-datatree >=0.0.9
- xarray >=2024.10
- zarr >=2.12,<3.0
- zarr <3.0|>=3.0.10
- furo >=2022.09.15
- pip:
- git+https://github.com/ncar-xdev/ecgtools
Expand Down
2 changes: 1 addition & 1 deletion ci/environment-upstream-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies:
- scipy
- xarray-datatree
- xgcm
- zarr >=2.10,<3.0
- zarr <3.0|>=3.0.10
- pip:
- git+https://github.com/intake/intake.git
- git+https://github.com/pydata/xarray.git
Expand Down
2 changes: 1 addition & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ dependencies:
- scipy
- xarray >=2024.10
- xarray-datatree
- zarr >=2.12,<3.0
- zarr <3.0|>=3.0.10
# - pytest-icdiff
43 changes: 38 additions & 5 deletions docs/source/reference/esm-catalog-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,44 @@ The column names can optionally be associated with a controlled vocabulary, such

An assets object describes the columns in the CSV file relevant for opening the actual data files.

| Element | Type | Description |
| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. |
| format | string | The data format. Valid values are `netcdf`, `zarr`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. |
| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. |
| Element | Type | Description |
| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. |
| format | string | The data format. Valid values are `netcdf`, `zarr`, `zarr2`, `zarr3`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. |
| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. |

````{note}
Zarr v3 is built on asynchronous operations, and requires `xarray_open_kwargs` to contain the following dictionary fragment:
```python
xarray_open_kwargs = {
"storage_options" : {
"remote_options" : {
"async": true,
...
},
...
},
...
}
```

In contrast, Zarr v2 is synchronous and instead requires:

```python
xarray_open_kwargs = {
"storage_options" : {
"remote_options" : {
"async": false,
...
},
...
},
...
}
```
````

If `zarr2` or `zarr3` is specified in the `format` field, the `async` flag will be set automatically. If you specify `zarr` as the format, you must set the `async` flag manually in the `xarray_open_kwargs`.

### Aggregation Control Object

Expand Down
2 changes: 2 additions & 0 deletions intake_esm/cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class AggregationType(str, enum.Enum):
class DataFormat(str, enum.Enum):
netcdf = 'netcdf'
zarr = 'zarr'
zarr2 = 'zarr2'
zarr3 = 'zarr3'
reference = 'reference'
opendap = 'opendap'

Expand Down
6 changes: 4 additions & 2 deletions intake_esm/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from intake.source.base import DataSource, Schema

from .cat import Aggregation, DataFormat
from .utils import OPTIONS
from .utils import OPTIONS, _set_async_flag


class ConcatenationWarning(UserWarning):
Expand All @@ -23,7 +23,7 @@ class ESMDataSourceError(Exception):
def _get_xarray_open_kwargs(data_format, xarray_open_kwargs=None, storage_options=None):
xarray_open_kwargs = (xarray_open_kwargs or {}).copy()
_default_open_kwargs = {
'engine': 'zarr' if data_format in {'zarr', 'reference'} else 'netcdf4',
'engine': 'zarr' if data_format in {'zarr', 'zarr2', 'zarr3', 'reference'} else 'netcdf4',
'chunks': {},
'backend_kwargs': {},
'decode_timedelta': False,
Expand All @@ -40,6 +40,8 @@ def _get_xarray_open_kwargs(data_format, xarray_open_kwargs=None, storage_option
):
xarray_open_kwargs['backend_kwargs']['storage_options'] = {} or storage_options

xarray_open_kwargs = _set_async_flag(data_format, xarray_open_kwargs)

return xarray_open_kwargs


Expand Down
60 changes: 60 additions & 0 deletions intake_esm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
from collections import defaultdict

import polars as pl
import zarr

__all__ = [
'OPTIONS',
'set_options',
'_set_async_flag',
]


def show_versions(file=sys.stdout): # pragma: no cover
Expand Down Expand Up @@ -57,6 +64,59 @@ def show_versions(file=sys.stdout): # pragma: no cover
print(f'{k}: {stat}', file=file)


def _zarr_async() -> bool:
"""
Zarr went all async in version 3.0.0. This sets the async flag based on
the zarr version in storage options
"""

return int(zarr.__version__.split('.')[0]) > 2


def _set_async_flag(data_format: str, xarray_open_kwargs: dict) -> dict:
"""
If we have the data format set to either zarr2 or zarr3, the async flag in
`xarray_open_kwargs['storage_options']['remote_opetions']` is constrained to
be either False or True, respectively.

Parameters
----------
data_format : str

xarray_open_kwargs : dict
The xarray open kwargs dictionary that may contain storage options.
Returns
-------
dict
The updated xarray open kwargs with the async flag set appropriately.
"""
if data_format not in {'zarr2', 'zarr3'}:
return xarray_open_kwargs

storage_opts_template = {
'backend_kwargs': {'storage_options': {'remote_options': {'asynchronous': _zarr_async()}}}
}
if (
xarray_open_kwargs.get('backend_kwargs', {})
.get('storage_options', {})
.get('remote_options', None)
is not None
):
xarray_open_kwargs['backend_kwargs']['storage_options']['remote_options'][
'asynchronous'
] = _zarr_async()
elif xarray_open_kwargs.get('backend_kwargs', {}).get('storage_options', None) is not None:
xarray_open_kwargs['backend_kwargs']['storage_options'] = storage_opts_template[
'backend_kwargs'
]['storage_options']
elif xarray_open_kwargs.get('backend_kwargs', None) is not None:
xarray_open_kwargs['backend_kwargs'] = storage_opts_template['backend_kwargs']
else:
xarray_open_kwargs = storage_opts_template

return xarray_open_kwargs


OPTIONS = {
'attrs_prefix': 'intake_esm_attrs',
'dataset_key': 'intake_esm_dataset_key',
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pydantic>=2.0
pydap!=3.5.5
requests>=2.24.0
xarray>=2024.10
zarr>=2.12
# Allow zarr >2.12 or zarr 3.1.0+
zarr>=2.12,!=3.0.*
35 changes: 35 additions & 0 deletions tests/sample-catalogs/cesm1-lens-zarr2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"esmcat_version": "0.1.0",
"id": "sample-cesm1-lens-zarr2",
"description": "This is a sample ESM catalog for CESM1-LENS data in zarr v2 format",
"catalog_file": "./tests/sample-catalogs/cesm1-lens-aws-zarr.csv",
"attributes": [
{
"column_name": "experiment",
"vocabulary": ""
},
{
"column_name": "component",
"vocabulary": ""
},
{
"column_name": "frequency",
"vocabulary": ""
},
{ "column_name": "variable", "vocabulary": "" }
],
"assets": {
"column_name": "path",
"format": "zarr2"
},
"aggregation_control": {
"variable_column_name": "variable",
"groupby_attrs": ["component", "experiment", "frequency"],
"aggregations": [
{
"type": "union",
"attribute_name": "variable"
}
]
}
}
35 changes: 35 additions & 0 deletions tests/sample-catalogs/cesm1-lens-zarr3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"esmcat_version": "0.1.0",
"id": "sample-cesm1-lens-zarr3",
"description": "This is a sample ESM catalog for CESM1-LENS data in zarr v3 format",
"catalog_file": "./tests/sample-catalogs/cesm1-lens-aws-zarr.csv",
"attributes": [
{
"column_name": "experiment",
"vocabulary": ""
},
{
"column_name": "component",
"vocabulary": ""
},
{
"column_name": "frequency",
"vocabulary": ""
},
{ "column_name": "variable", "vocabulary": "" }
],
"assets": {
"column_name": "path",
"format": "zarr3"
},
"aggregation_control": {
"variable_column_name": "variable",
"groupby_attrs": ["component", "experiment", "frequency"],
"aggregations": [
{
"type": "union",
"attribute_name": "variable"
}
]
}
}
12 changes: 11 additions & 1 deletion tests/test_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@
sample_pl_df,
zarr_cat_aws_cesm,
zarr_cat_pangeo_cmip6,
zarr_v2_cat,
zarr_v3_cat,
)


@pytest.mark.parametrize(
'column_name, format, format_column_name', [('test', 'zarr', None), ('test', 'netcdf', None)]
'column_name, format, format_column_name',
[
('test', 'zarr', None),
('test', 'zarr2', None),
('test', 'zarr3', None),
('test', 'netcdf', None),
],
)
def test_assets(column_name, format, format_column_name):
a = Assets(column_name=column_name, format=format, format_column_name=format_column_name)
Expand Down Expand Up @@ -53,6 +61,8 @@ def test_assets_mutually_exclusive():
cdf_cat_sample_cmip6_noagg,
cdf_cat_sample_cesmle,
multi_variable_cat,
zarr_v2_cat,
zarr_v3_cat,
],
)
@pytest.mark.flaky(max_runs=3, min_passes=1) # Cold start related failures
Expand Down
7 changes: 6 additions & 1 deletion tests/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_open_dataset,
_update_attrs,
)
from intake_esm.utils import _zarr_async

dask.config.set(scheduler='single-threaded')

Expand Down Expand Up @@ -84,8 +85,12 @@ def test_open_dataset_kerchunk(kerchunk_file=kerchunk_file):
xarray_open_kwargs = _get_xarray_open_kwargs(
'reference',
dict(engine='zarr', consolidated=False),
storage_options={'remote_protocol': 's3', 'remote_options': {'anon': True}},
storage_options={
'remote_protocol': 's3',
'remote_options': {'anon': True, 'asynchronous': _zarr_async()},
},
)

ds = _open_dataset(
data_format='reference',
urlpath=kerchunk_file,
Expand Down
Loading
Loading