Skip to content

Commit 1c17ffc

Browse files
@mgrover1 comment, add some extra tests, allow zarr version specification in the data format
1 parent 1cc1759 commit 1c17ffc

File tree

10 files changed

+425
-23
lines changed

10 files changed

+425
-23
lines changed

docs/source/reference/esm-catalog-spec.md

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ The column names can optionally be associated with a controlled vocabulary, such
8585

8686
An assets object describes the columns in the CSV file relevant for opening the actual data files.
8787

88-
| Element | Type | Description |
89-
| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
90-
| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. |
91-
| format | string | The data format. Valid values are `netcdf`, `zarr`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. |
92-
| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. |
88+
| Element | Type | Description |
89+
| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
90+
| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. |
91+
| format | string | The data format. Valid values are `netcdf`, `zarr`, `zarr2`, `zarr3`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. |
92+
| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. |
9393

94-
```{note}
94+
````{note}
9595
Zarr v3 is built on asynchronous operations, and requires `xarray_open_kwargs` to contain the following dictionary fragment:
9696
```python
9797
xarray_open_kwargs ={
@@ -103,19 +103,24 @@ An assets object describes the columns in the CSV file relevant for opening the
103103
},
104104
...
105105
}
106-
``````
107-
In contrast, Zarr v2 is synchronous and instead requires:
108-
```python
109-
xarray_open_kwargs ={
110-
"storage_options" : {
111-
"remote_options" : {
112-
"async": false,
113-
...
114-
}
115-
},
116-
...
117-
}
118-
```
106+
````
107+
108+
In contrast, Zarr v2 is synchronous and instead requires:
109+
110+
```python
111+
xarray_open_kwargs ={
112+
"storage_options" : {
113+
"remote_options" : {
114+
"async": false,
115+
...
116+
}
117+
},
118+
...
119+
}
120+
```
121+
122+
If `zarr2` or `zarr3` is specified in the `format` field, the `async` flag will be set automatically. If you specify `zarr` as the format, you must set the `async` flag manually in the `xarray_open_kwargs`.
123+
119124
```
120125
121126
### Aggregation Control Object
@@ -137,3 +142,4 @@ An aggregation object describes types of operations done during the aggregation
137142
| type | string | **REQUIRED.** Type of aggregation operation to apply. Valid values include: `join_new`, `join_existing`, `union` |
138143
| attribute_name | string | Name of attribute (column) across which to aggregate. |
139144
| options | object | **OPTIONAL.** Aggregration settings that are passed as keywords arguments to [`xarray.concat()`](https://xarray.pydata.org/en/stable/generated/xarray.concat.html) or [`xarray.merge()`](https://xarray.pydata.org/en/stable/generated/xarray.merge.html#xarray.merge). For `join_existing`, it must contain the name of the existing dimension to use (for e.g.: something like `{'dim': 'time'}`). |
145+
```

intake_esm/cat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class AggregationType(str, enum.Enum):
5454
class DataFormat(str, enum.Enum):
5555
netcdf = 'netcdf'
5656
zarr = 'zarr'
57+
zarr2 = 'zarr2'
58+
zarr3 = 'zarr3'
5759
reference = 'reference'
5860
opendap = 'opendap'
5961

intake_esm/source.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from intake.source.base import DataSource, Schema
1010

1111
from .cat import Aggregation, DataFormat
12-
from .utils import OPTIONS
12+
from .utils import OPTIONS, _set_async_flag
1313

1414

1515
class ConcatenationWarning(UserWarning):
@@ -23,7 +23,7 @@ class ESMDataSourceError(Exception):
2323
def _get_xarray_open_kwargs(data_format, xarray_open_kwargs=None, storage_options=None):
2424
xarray_open_kwargs = (xarray_open_kwargs or {}).copy()
2525
_default_open_kwargs = {
26-
'engine': 'zarr' if data_format in {'zarr', 'reference'} else 'netcdf4',
26+
'engine': 'zarr' if data_format in {'zarr', 'zarr2', 'zarr3', 'reference'} else 'netcdf4',
2727
'chunks': {},
2828
'backend_kwargs': {},
2929
'decode_timedelta': False,
@@ -40,6 +40,8 @@ def _get_xarray_open_kwargs(data_format, xarray_open_kwargs=None, storage_option
4040
):
4141
xarray_open_kwargs['backend_kwargs']['storage_options'] = {} or storage_options
4242

43+
xarray_open_kwargs = _set_async_flag(data_format, xarray_open_kwargs)
44+
4345
return xarray_open_kwargs
4446

4547

intake_esm/utils.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
import polars as pl
88
import zarr
99

10+
__all__ = [
11+
'OPTIONS',
12+
'set_options',
13+
'_set_async_flag',
14+
]
15+
1016

1117
def show_versions(file=sys.stdout): # pragma: no cover
1218
"""print the versions of intake-esm and its dependencies.
@@ -67,6 +73,50 @@ def _zarr_async() -> bool:
6773
return int(zarr.__version__.split('.')[0]) > 2
6874

6975

76+
def _set_async_flag(data_format: str, xarray_open_kwargs: dict) -> dict:
77+
"""
78+
If we have the data format set to either zarr2 or zarr3, the async flag in
79+
`xarray_open_kwargs['storage_options']['remote_opetions']` is constrained to
80+
be either False or True, respectively.
81+
82+
Parameters
83+
----------
84+
data_format : str
85+
86+
xarray_open_kwargs : dict
87+
The xarray open kwargs dictionary that may contain storage options.
88+
Returns
89+
-------
90+
dict
91+
The updated xarray open kwargs with the async flag set appropriately.
92+
"""
93+
if data_format not in {'zarr2', 'zarr3'}:
94+
return xarray_open_kwargs
95+
96+
storage_opts_template = {
97+
'backend_kwargs': {'storage_options': {'remote_options': {'asynchronous': _zarr_async()}}}
98+
}
99+
if (
100+
xarray_open_kwargs.get('backend_kwargs', {})
101+
.get('storage_options', {})
102+
.get('remote_options', None)
103+
is not None
104+
):
105+
xarray_open_kwargs['backend_kwargs']['storage_options']['remote_options'][
106+
'asynchronous'
107+
] = _zarr_async()
108+
elif xarray_open_kwargs.get('backend_kwargs', {}).get('storage_options', None) is not None:
109+
xarray_open_kwargs['backend_kwargs']['storage_options'] = storage_opts_template[
110+
'backend_kwargs'
111+
]['storage_options']
112+
elif xarray_open_kwargs.get('backend_kwargs', None) is not None:
113+
xarray_open_kwargs['backend_kwargs'] = storage_opts_template['backend_kwargs']
114+
else:
115+
xarray_open_kwargs = storage_opts_template
116+
117+
return xarray_open_kwargs
118+
119+
70120
OPTIONS = {
71121
'attrs_prefix': 'intake_esm_attrs',
72122
'dataset_key': 'intake_esm_dataset_key',

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ pydantic>=2.0
1010
pydap!=3.5.5
1111
requests>=2.24.0
1212
xarray>=2024.10
13-
zarr>=3.0.10
13+
# Allow zarr 2.x or zarr 3.1.0+
14+
zarr!=3.0.*
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"esmcat_version": "0.1.0",
3+
"id": "sample-cesm1-lens-zarr2",
4+
"description": "This is a sample ESM catalog for CESM1-LENS data in zarr v2 format",
5+
"catalog_file": "./tests/sample-catalogs/cesm1-lens-aws-zarr.csv",
6+
"attributes": [
7+
{
8+
"column_name": "experiment",
9+
"vocabulary": ""
10+
},
11+
{
12+
"column_name": "component",
13+
"vocabulary": ""
14+
},
15+
{
16+
"column_name": "frequency",
17+
"vocabulary": ""
18+
},
19+
{ "column_name": "variable", "vocabulary": "" }
20+
],
21+
"assets": {
22+
"column_name": "path",
23+
"format": "zarr2"
24+
},
25+
"aggregation_control": {
26+
"variable_column_name": "variable",
27+
"groupby_attrs": ["component", "experiment", "frequency"],
28+
"aggregations": [
29+
{
30+
"type": "union",
31+
"attribute_name": "variable"
32+
}
33+
]
34+
}
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"esmcat_version": "0.1.0",
3+
"id": "sample-cesm1-lens-zarr3",
4+
"description": "This is a sample ESM catalog for CESM1-LENS data in zarr v3 format",
5+
"catalog_file": "./tests/sample-catalogs/cesm1-lens-aws-zarr.csv",
6+
"attributes": [
7+
{
8+
"column_name": "experiment",
9+
"vocabulary": ""
10+
},
11+
{
12+
"column_name": "component",
13+
"vocabulary": ""
14+
},
15+
{
16+
"column_name": "frequency",
17+
"vocabulary": ""
18+
},
19+
{ "column_name": "variable", "vocabulary": "" }
20+
],
21+
"assets": {
22+
"column_name": "path",
23+
"format": "zarr3"
24+
},
25+
"aggregation_control": {
26+
"variable_column_name": "variable",
27+
"groupby_attrs": ["component", "experiment", "frequency"],
28+
"aggregations": [
29+
{
30+
"type": "union",
31+
"attribute_name": "variable"
32+
}
33+
]
34+
}
35+
}

tests/test_cat.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,19 @@
2121
sample_pl_df,
2222
zarr_cat_aws_cesm,
2323
zarr_cat_pangeo_cmip6,
24+
zarr_v2_cat,
25+
zarr_v3_cat,
2426
)
2527

2628

2729
@pytest.mark.parametrize(
28-
'column_name, format, format_column_name', [('test', 'zarr', None), ('test', 'netcdf', None)]
30+
'column_name, format, format_column_name',
31+
[
32+
('test', 'zarr', None),
33+
('test', 'zarr2', None),
34+
('test', 'zarr3', None),
35+
('test', 'netcdf', None),
36+
],
2937
)
3038
def test_assets(column_name, format, format_column_name):
3139
a = Assets(column_name=column_name, format=format, format_column_name=format_column_name)
@@ -53,6 +61,8 @@ def test_assets_mutually_exclusive():
5361
cdf_cat_sample_cmip6_noagg,
5462
cdf_cat_sample_cesmle,
5563
multi_variable_cat,
64+
zarr_v2_cat,
65+
zarr_v3_cat,
5666
],
5767
)
5868
@pytest.mark.flaky(max_runs=3, min_passes=1) # Cold start related failures

0 commit comments

Comments
 (0)