From ab1edcc3da34c5bc35a41bcab3be852f9afaf91f Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Mon, 1 Sep 2025 18:25:03 +0800 Subject: [PATCH 1/4] Test round trip changing format between csv & parquet Add note on default's that might have changed? --- intake_esm/cat.py | 49 +++++++++++++++---- intake_esm/core.py | 32 +++++++++++-- tests/test_core.py | 115 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 12 deletions(-) diff --git a/intake_esm/cat.py b/intake_esm/cat.py index 67f3b40d..47d4024c 100644 --- a/intake_esm/cat.py +++ b/intake_esm/cat.py @@ -7,6 +7,7 @@ import json import os import typing +import warnings import fsspec import pandas as pd @@ -19,7 +20,7 @@ from ._search import search, search_apply_require_all_on __framereaders__ = [pl, pd] -__filetypes__ = ['csv', 'csv.bz2', 'csv.gz', 'parquet'] +__filetypes__ = ['csv', 'csv.bz2', 'csv.gz', 'csv.zip', 'csv.xz', 'parquet'] def _allnan_or_nonan(df, column: str) -> bool: @@ -155,6 +156,8 @@ def save( *, directory: str | None = None, catalog_type: str = 'dict', + file_format: str = 'csv', + write_kwargs: dict | None = None, to_csv_kwargs: dict | None = None, json_dump_kwargs: dict | None = None, storage_options: dict[str, typing.Any] | None = None, @@ -172,8 +175,12 @@ def save( catalog_type: str The type of catalog to save. Whether to save the catalog table as a dictionary in the JSON file or as a separate CSV file. Valid options are 'dict' and 'file'. + file_format: str + The file format to use when saving the catalog table. Either 'csv' or 'parquet'. + If catalog_type is 'dict', this parameter is ignored. to_csv_kwargs : dict, optional Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method. + Compression is currently ignored when serializing to parquet. json_dump_kwargs : dict, optional Additional keyword arguments passed through to the :py:func:`~json.dump` function. storage_options: dict @@ -187,6 +194,22 @@ def save( """ + if to_csv_kwargs is not None: + warnings.warn( + 'to_csv_kwargs is deprecated and will be removed in a future version. ' + 'Please use read_kwargs instead.', + DeprecationWarning, + stacklevel=2, + ) + if write_kwargs is not None: + raise ValueError( + 'Cannot provide both `read_csv_kwargs` and `write_kwargs`. ' + 'Please use `write_kwargs`.' + ) + write_kwargs = to_csv_kwargs + + write_kwargs = write_kwargs or {} + if catalog_type not in {'file', 'dict'}: raise ValueError( f'catalog_type must be either "dict" or "file". Received catalog_type={catalog_type}' @@ -207,7 +230,9 @@ def save( for key in {'catalog_dict', 'catalog_file'}: data.pop(key, None) data['id'] = name - data['last_updated'] = datetime.datetime.now().utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + data['last_updated'] = datetime.datetime.now(datetime.timezone.utc).strftime( + '%Y-%m-%dT%H:%M:%SZ' + ) _tmp_df = self.df.copy(deep=True) @@ -216,14 +241,20 @@ def save( if catalog_type == 'file': csv_kwargs: dict[str, typing.Any] = {'index': False} - csv_kwargs |= to_csv_kwargs or {} + csv_kwargs |= write_kwargs or {} compression = csv_kwargs.get('compression', '') extensions = {'gzip': '.gz', 'bz2': '.bz2', 'zip': '.zip', 'xz': '.xz'} - csv_file_name = f'{csv_file_name}{extensions.get(compression, "")}' - data['catalog_file'] = str(csv_file_name) - - with fs.open(csv_file_name, 'wb') as csv_outfile: - _tmp_df.to_csv(csv_outfile, **csv_kwargs) + if file_format == 'csv': + csv_file_name = f'{csv_file_name}{extensions.get(compression, "")}' + data['catalog_file'] = str(csv_file_name) + with fs.open(csv_file_name, 'wb') as csv_outfile: + _tmp_df.to_csv(csv_outfile, **csv_kwargs) + elif file_format == 'parquet': + pq_file_name = f'{csv_file_name.rstrip(".csv")}.parquet' + data['catalog_file'] = str(pq_file_name) + write_kwargs.pop('compression', None) + with fs.open(pq_file_name, 'wb') as pq_outfile: + _tmp_df.to_parquet(pq_outfile, **write_kwargs) else: data['catalog_dict'] = _tmp_df.to_dict(orient='records') @@ -601,7 +632,7 @@ def __init__( elif self.catalog_file.endswith('.parquet'): self.driver = 'polars' self.filetype = 'parquet' - elif self.catalog_file.endswith('.csv.bz2'): + elif self.catalog_file.endswith('.csv.bz2') or self.catalog_file.endswith('.csv.xz'): self.driver = 'pandas' self.filetype = 'csv' else: diff --git a/intake_esm/core.py b/intake_esm/core.py index ce71e094..066085d9 100644 --- a/intake_esm/core.py +++ b/intake_esm/core.py @@ -492,7 +492,7 @@ def search( esmcat_results = pd.concat([esmcat_results, *derivedcat_results]) esmcat_results = esmcat_results[~esmcat_results.astype(str).duplicated()] - cat = self.__class__({'esmcat': self.esmcat.dict(), 'df': esmcat_results}) + cat = self.__class__({'esmcat': self.esmcat.model_dump(), 'df': esmcat_results}) cat.esmcat.catalog_file = None # Don't save the catalog file if self.esmcat.has_multiple_variable_assets: requested_variables = list(set(variables or []).union(dependents)) @@ -515,6 +515,8 @@ def serialize( name: pydantic.StrictStr, directory: pydantic.DirectoryPath | pydantic.StrictStr | None = None, catalog_type: str = 'dict', + file_format: str = 'csv', + write_kwargs: dict[typing.Any, typing.Any] | None = None, to_csv_kwargs: dict[typing.Any, typing.Any] | None = None, json_dump_kwargs: dict[typing.Any, typing.Any] | None = None, storage_options: dict[str, typing.Any] | None = None, @@ -529,8 +531,15 @@ def serialize( The path to the local directory. If None, use the current directory catalog_type: str, default 'dict' Whether to save the catalog table as a dictionary in the JSON file or as a separate CSV file. + file_format: str, default 'csv' + The file format to use when saving the catalog table. Either 'csv' or 'parquet'. + If catalog_type is 'dict', this parameter is ignored. + write_kwargs: dict, optional + Additional keyword arguments passed through to the :py:func:`~pandas.DataFrame.to_csv` or + :py:func:`~pandas.DataFrame.to_parquet` functions. + Compression is currently ignored when serializing to parquet. to_csv_kwargs : dict, optional - Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method. + Deprecated alias for `write_kwargs`. json_dump_kwargs : dict, optional Additional keyword arguments passed through to the :py:func:`~json.dump` function. storage_options: dict @@ -555,11 +564,28 @@ def serialize( >>> cat_subset.serialize(name='cmip6_bcc_esm1', catalog_type='file') """ + if to_csv_kwargs is not None: + warnings.warn( + 'to_csv_kwargs is deprecated and will be removed in a future version. ' + 'Please use read_kwargs instead.', + DeprecationWarning, + stacklevel=2, + ) + if write_kwargs is not None: + raise ValueError( + 'Cannot provide both `read_csv_kwargs` and `write_kwargs`. ' + 'Please use `write_kwargs`.' + ) + write_kwargs = to_csv_kwargs + + write_kwargs = write_kwargs or {} + self.esmcat.save( name, directory=directory, catalog_type=catalog_type, - to_csv_kwargs=to_csv_kwargs, + file_format=file_format, + write_kwargs=write_kwargs, json_dump_kwargs=json_dump_kwargs, storage_options=storage_options, ) diff --git a/tests/test_core.py b/tests/test_core.py index 6d4f5bc5..d9f4cf3b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,4 +1,5 @@ import ast +import json import os from unittest import mock @@ -24,6 +25,7 @@ catalog_dict_records, cdf_cat_sample_cesmle, cdf_cat_sample_cmip5, + cdf_cat_sample_cmip5_pq, cdf_cat_sample_cmip6, cdf_cat_sample_cmip6_noagg, mixed_cat_sample_cmip6, @@ -186,6 +188,119 @@ def test_read_csv_conflict(): ) +@pytest.mark.parametrize( + 'datastore, file_format', + [ + (catalog_dict_records, 'csv'), + (cdf_cat_sample_cmip6, 'csv'), + (zarr_cat_aws_cesm, 'csv'), + (zarr_cat_pangeo_cmip6, 'csv'), + (cdf_cat_sample_cmip5_pq, 'parquet'), + (multi_variable_cat, 'csv'), + ({'esmcat': sample_esmcat_data, 'df': sample_df}, 'csv'), + (intake_esm.cat.ESMCatalogModel.load(cdf_cat_sample_cmip6), 'csv'), + ], +) +@pytest.mark.parametrize('catalog_type', ['file', 'dict']) +def test_write_csv_conflict(tmp_path, datastore, file_format, catalog_type): + """Test that error is raised when `to_csv_kwargs` conflicts with `write_kwargs`.""" + cat = intake.open_esm_datastore(datastore) + + kwargs = { + 'name': 'test_catalog', + 'directory': tmp_path, + 'catalog_type': catalog_type, + 'file_format': file_format, + 'write_kwargs': {'compression': 'gzip'}, + } + + # Work when inputs are consistent + cat.serialize(**kwargs) + + kwargs['to_csv_kwargs'] = kwargs['write_kwargs'] + # Fails on conflict + with pytest.raises(ValueError): + cat.serialize( + name='test_catalog', + directory=tmp_path, + catalog_type='file', + file_format=file_format, + write_kwargs={'compression': 'gzip'}, + to_csv_kwargs={'compression': 'gzip'}, + ) + + kwargs.pop('write_kwargs') + + with pytest.warns( + DeprecationWarning, + match='to_csv_kwargs is deprecated and will be removed in a future version. ', + ): + cat.serialize( + name='test_catalog', + directory=tmp_path, + catalog_type=catalog_type, + file_format=file_format, + to_csv_kwargs={'compression': 'gzip'}, + ) + + +@pytest.mark.parametrize( + 'file_format', + [ + 'csv', + 'parquet', + ], +) +@pytest.mark.parametrize( + 'datastore', + [ + cdf_cat_sample_cmip5_pq, + cdf_cat_sample_cmip5, + ], +) +def test_open_and_reserialize(tmp_path, datastore, file_format): + """ + Open a catalog, and then re-serialize it into `tmp_path`. We want to + make sure that the reserialised catalog is the same as the original, but that + the catalog file is stored in the correct format. + """ + catalog = intake.open_esm_datastore(datastore) + + catalog.serialize( + name='test_catalog', + directory=tmp_path, + catalog_type='file', + file_format=file_format, + storage_options={}, + ) + + with open(datastore) as f: + catalog_json = json.load(f) + + with open(tmp_path / 'test_catalog.json') as f: + saved_json = json.load(f) + + assert saved_json.get('catalog_file', '').endswith(file_format) + + # Remove fields that are expected to change + changed_fieldnames = ['catalog_file', 'last_updated', 'id', 'title'] + for field in changed_fieldnames: + saved_json.pop(field, None) + catalog_json.pop(field, None) + + # This seems like an change to defaults? + for l_item in saved_json['aggregation_control']['aggregations']: + # l_item is a dict - remove empty options dicts + if l_item.get('options', {}) == {}: + l_item.pop('options', None) + + # This also seems like a change to defaults? + if saved_json.get('assets', {}).get('format_column_name', None) is None: + saved_json['assets'].pop('format_column_name', None) + + assert saved_json == catalog_json + + @pytest.mark.parametrize( 'query,regex', [ From b4c3cd7bef49948a12ad004faa448f6b34677727 Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 2 Sep 2025 07:53:20 +0800 Subject: [PATCH 2/4] Double serialise-unserialise - defaults changed? --- tests/test_core.py | 56 +++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index d9f4cf3b..a9b0aaa2 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -245,60 +245,64 @@ def test_write_csv_conflict(tmp_path, datastore, file_format, catalog_type): @pytest.mark.parametrize( - 'file_format', + 'file_format_1', [ 'csv', 'parquet', ], ) @pytest.mark.parametrize( - 'datastore', + 'file_format_2', [ - cdf_cat_sample_cmip5_pq, - cdf_cat_sample_cmip5, + 'csv', + 'parquet', ], ) -def test_open_and_reserialize(tmp_path, datastore, file_format): +def test_open_and_reserialize(tmp_path, file_format_1, file_format_2): """ Open a catalog, and then re-serialize it into `tmp_path`. We want to make sure that the reserialised catalog is the same as the original, but that the catalog file is stored in the correct format. + + Note: Round-trip twice as it seems the defaults have changed since the test dataset was + created. Only affects unspecified fields. """ - catalog = intake.open_esm_datastore(datastore) + catalog = intake.open_esm_datastore(cdf_cat_sample_cmip5) + + catalog.serialize( + name='serialized', + directory=tmp_path, + catalog_type='file', + file_format=file_format_1, + storage_options={}, + ) + + catalog = intake.open_esm_datastore(tmp_path / 'serialized.json') catalog.serialize( - name='test_catalog', + name='reserialized', directory=tmp_path, catalog_type='file', - file_format=file_format, + file_format=file_format_2, storage_options={}, ) - with open(datastore) as f: - catalog_json = json.load(f) + with open(tmp_path / 'serialized.json') as f: + serialized = json.load(f) - with open(tmp_path / 'test_catalog.json') as f: - saved_json = json.load(f) + with open(tmp_path / 'reserialized.json') as f: + reserialized = json.load(f) - assert saved_json.get('catalog_file', '').endswith(file_format) + assert serialized.get('catalog_file', '').endswith(file_format_1) + assert reserialized.get('catalog_file', '').endswith(file_format_2) # Remove fields that are expected to change changed_fieldnames = ['catalog_file', 'last_updated', 'id', 'title'] for field in changed_fieldnames: - saved_json.pop(field, None) - catalog_json.pop(field, None) - - # This seems like an change to defaults? - for l_item in saved_json['aggregation_control']['aggregations']: - # l_item is a dict - remove empty options dicts - if l_item.get('options', {}) == {}: - l_item.pop('options', None) - - # This also seems like a change to defaults? - if saved_json.get('assets', {}).get('format_column_name', None) is None: - saved_json['assets'].pop('format_column_name', None) + serialized.pop(field, None) + reserialized.pop(field, None) - assert saved_json == catalog_json + assert serialized == reserialized @pytest.mark.parametrize( From 448ea688b3d56f5e6069378b368329f9166bc24a Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 2 Sep 2025 08:04:40 +0800 Subject: [PATCH 3/4] Minor docstring change (for API reference) --- intake_esm/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/intake_esm/core.py b/intake_esm/core.py index 066085d9..05e76b37 100644 --- a/intake_esm/core.py +++ b/intake_esm/core.py @@ -521,7 +521,7 @@ def serialize( json_dump_kwargs: dict[typing.Any, typing.Any] | None = None, storage_options: dict[str, typing.Any] | None = None, ) -> None: - """Serialize catalog to corresponding json and csv files. + """Serialize catalog to corresponding json and csv (or parquet) files. Parameters ---------- @@ -549,7 +549,7 @@ def serialize( Notes ----- Large catalogs can result in large JSON files. To keep the JSON file size manageable, call with - `catalog_type='file'` to save catalog as a separate CSV file. + `catalog_type='file'` to save catalog as a separate CSV/parquet file. Examples -------- From 31985a7fd83ceedd2d3f66e267b9eeb923991c6f Mon Sep 17 00:00:00 2001 From: Charles Turner Date: Tue, 2 Sep 2025 08:16:03 +0800 Subject: [PATCH 4/4] Update ESM Catalog Specification --- docs/source/reference/esm-catalog-spec.md | 34 +++++++++++------------ 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/source/reference/esm-catalog-spec.md b/docs/source/reference/esm-catalog-spec.md index 44c44d3b..a2752331 100644 --- a/docs/source/reference/esm-catalog-spec.md +++ b/docs/source/reference/esm-catalog-spec.md @@ -43,8 +43,8 @@ The descriptor is a single json file, inspired by the [STAC spec](https://github ### Catalog The collection points to a single catalog. -A catalog is a CSV file. -The meaning of the columns in the csv file is defined by the parent collection. +A catalog is a CSV or parquet file. +The meaning of the columns in the csv/parquet file is defined by the parent collection. ``` activity_id,source_id,path @@ -65,29 +65,29 @@ They should be either [URIs](https://en.wikipedia.org/wiki/Uniform_Resource_Iden | id | string | **REQUIRED.** Identifier for the catalog. | | title | string | A short descriptive one-line title for the catalog. | | description | string | **REQUIRED.** Detailed multi-line description to fully explain the catalog. [CommonMark 0.28](http://commonmark.org/) syntax MAY be used for rich text representation. | -| catalog_file | string | **REQUIRED.** Path to a the CSV file with the catalog contents. | -| catalog_dict | array | If specified, it is mutually exclusive with `catalog_file`. An array of dictionaries that represents the data that would otherwise be in the csv. | +| catalog_file | string | **REQUIRED.** Path to a the CSV/parquet file with the catalog contents. | +| catalog_dict | array | If specified, it is mutually exclusive with `catalog_file`. An array of dictionaries that represents the data that would otherwise be in the csv/parquet. | | attributes | [[Attribute Object](#attribute-object)] | **REQUIRED.** A list of attribute columns in the data set. | -| assets | [Assets Object](#assets-object) | **REQUIRED.** Description of how the assets (data files) are referenced in the CSV catalog file. | +| assets | [Assets Object](#assets-object) | **REQUIRED.** Description of how the assets (data files) are referenced in the CSV/parquet catalog file. | | aggregation_control | [Aggregation Control Object](#aggregation-control-object) | **OPTIONAL.** Description of how to support aggregation of multiple assets into a single xarray data set. | ### Attribute Object -An attribute object describes a column in the catalog CSV file. +An attribute object describes a column in the catalog CSV/parquet file. The column names can optionally be associated with a controlled vocabulary, such as the [CMIP6 CVs](https://github.com/WCRP-CMIP/CMIP6_CVs), which explain how to interpret the attribute values. -| Element | Type | Description | -| ----------- | ------ | -------------------------------------------------------------------------------------- | -| column_name | string | **REQUIRED.** The name of the attribute column. Must be in the header of the CSV file. | -| vocabulary | string | Link to the controlled vocabulary for the attribute in the format of a URL. | +| Element | Type | Description | +| ----------- | ------ | ---------------------------------------------------------------------------------------------- | +| column_name | string | **REQUIRED.** The name of the attribute column. Must be in the header of the CSV/parquet file. | +| vocabulary | string | Link to the controlled vocabulary for the attribute in the format of a URL. | ### Assets Object -An assets object describes the columns in the CSV file relevant for opening the actual data files. +An assets object describes the columns in the CSV/parquet file relevant for opening the actual data files. | Element | Type | Description | | ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. | +| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV/parquet file. | | format | string | The data format. Valid values are `netcdf`, `zarr`, `zarr2`, `zarr3`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. | | format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. | @@ -128,11 +128,11 @@ If `zarr2` or `zarr3` is specified in the `format` field, the `async` flag will An aggregation control object defines neccessary information to use when aggregating multiple assets into a single xarray data set. -| Element | Type | Description | -| -------------------- | ------------------------------------------- | --------------------------------------------------------------------------------------- | -| variable_column_name | string | **REQUIRED.** Name of the attribute column in csv file that contains the variable name. | -| groupby_attrs | array | Column names (attributes) that define data sets that can be aggegrated. | -| aggregations | [[Aggregation Object](#aggregation-object)] | **OPTIONAL.** List of aggregations to apply to query results | +| Element | Type | Description | +| -------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------- | +| variable_column_name | string | **REQUIRED.** Name of the attribute column in csv/parquet file that contains the variable name. | +| groupby_attrs | array | Column names (attributes) that define data sets that can be aggegrated. | +| aggregations | [[Aggregation Object](#aggregation-object)] | **OPTIONAL.** List of aggregations to apply to query results | ### Aggregation Object