Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions docs/source/reference/esm-catalog-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ The descriptor is a single json file, inspired by the [STAC spec](https://github
### Catalog

The collection points to a single catalog.
A catalog is a CSV file.
The meaning of the columns in the csv file is defined by the parent collection.
A catalog is a CSV or parquet file.
The meaning of the columns in the csv/parquet file is defined by the parent collection.

```
activity_id,source_id,path
Expand All @@ -65,29 +65,29 @@ They should be either [URIs](https://en.wikipedia.org/wiki/Uniform_Resource_Iden
| id | string | **REQUIRED.** Identifier for the catalog. |
| title | string | A short descriptive one-line title for the catalog. |
| description | string | **REQUIRED.** Detailed multi-line description to fully explain the catalog. [CommonMark 0.28](http://commonmark.org/) syntax MAY be used for rich text representation. |
| catalog_file | string | **REQUIRED.** Path to a the CSV file with the catalog contents. |
| catalog_dict | array | If specified, it is mutually exclusive with `catalog_file`. An array of dictionaries that represents the data that would otherwise be in the csv. |
| catalog_file | string | **REQUIRED.** Path to a the CSV/parquet file with the catalog contents. |
| catalog_dict | array | If specified, it is mutually exclusive with `catalog_file`. An array of dictionaries that represents the data that would otherwise be in the csv/parquet. |
| attributes | [[Attribute Object](#attribute-object)] | **REQUIRED.** A list of attribute columns in the data set. |
| assets | [Assets Object](#assets-object) | **REQUIRED.** Description of how the assets (data files) are referenced in the CSV catalog file. |
| assets | [Assets Object](#assets-object) | **REQUIRED.** Description of how the assets (data files) are referenced in the CSV/parquet catalog file. |
| aggregation_control | [Aggregation Control Object](#aggregation-control-object) | **OPTIONAL.** Description of how to support aggregation of multiple assets into a single xarray data set. |

### Attribute Object

An attribute object describes a column in the catalog CSV file.
An attribute object describes a column in the catalog CSV/parquet file.
The column names can optionally be associated with a controlled vocabulary, such as the [CMIP6 CVs](https://github.com/WCRP-CMIP/CMIP6_CVs), which explain how to interpret the attribute values.

| Element | Type | Description |
| ----------- | ------ | -------------------------------------------------------------------------------------- |
| column_name | string | **REQUIRED.** The name of the attribute column. Must be in the header of the CSV file. |
| vocabulary | string | Link to the controlled vocabulary for the attribute in the format of a URL. |
| Element | Type | Description |
| ----------- | ------ | ---------------------------------------------------------------------------------------------- |
| column_name | string | **REQUIRED.** The name of the attribute column. Must be in the header of the CSV/parquet file. |
| vocabulary | string | Link to the controlled vocabulary for the attribute in the format of a URL. |

### Assets Object

An assets object describes the columns in the CSV file relevant for opening the actual data files.
An assets object describes the columns in the CSV/parquet file relevant for opening the actual data files.

| Element | Type | Description |
| ------------------ | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV file. |
| column_name | string | **REQUIRED.** The name of the column containing the path to the asset. Must be in the header of the CSV/parquet file. |
| format | string | The data format. Valid values are `netcdf`, `zarr`, `zarr2`, `zarr3`, `opendap` or `reference` ([`kerchunk`](https://github.com/fsspec/kerchunk) reference files). If specified, it means that all data in the catalog is the same type. |
| format_column_name | string | The column name which contains the data format, allowing for variable data types in one catalog. Mutually exclusive with `format`. |

Expand Down Expand Up @@ -128,11 +128,11 @@ If `zarr2` or `zarr3` is specified in the `format` field, the `async` flag will

An aggregation control object defines neccessary information to use when aggregating multiple assets into a single xarray data set.

| Element | Type | Description |
| -------------------- | ------------------------------------------- | --------------------------------------------------------------------------------------- |
| variable_column_name | string | **REQUIRED.** Name of the attribute column in csv file that contains the variable name. |
| groupby_attrs | array | Column names (attributes) that define data sets that can be aggegrated. |
| aggregations | [[Aggregation Object](#aggregation-object)] | **OPTIONAL.** List of aggregations to apply to query results |
| Element | Type | Description |
| -------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------- |
| variable_column_name | string | **REQUIRED.** Name of the attribute column in csv/parquet file that contains the variable name. |
| groupby_attrs | array | Column names (attributes) that define data sets that can be aggegrated. |
| aggregations | [[Aggregation Object](#aggregation-object)] | **OPTIONAL.** List of aggregations to apply to query results |

### Aggregation Object

Expand Down
49 changes: 40 additions & 9 deletions intake_esm/cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import os
import typing
import warnings

import fsspec
import pandas as pd
Expand All @@ -19,7 +20,7 @@
from ._search import search, search_apply_require_all_on

__framereaders__ = [pl, pd]
__filetypes__ = ['csv', 'csv.bz2', 'csv.gz', 'parquet']
__filetypes__ = ['csv', 'csv.bz2', 'csv.gz', 'csv.zip', 'csv.xz', 'parquet']


def _allnan_or_nonan(df, column: str) -> bool:
Expand Down Expand Up @@ -155,6 +156,8 @@ def save(
*,
directory: str | None = None,
catalog_type: str = 'dict',
file_format: str = 'csv',
write_kwargs: dict | None = None,
to_csv_kwargs: dict | None = None,
json_dump_kwargs: dict | None = None,
storage_options: dict[str, typing.Any] | None = None,
Expand All @@ -172,8 +175,12 @@ def save(
catalog_type: str
The type of catalog to save. Whether to save the catalog table as a dictionary
in the JSON file or as a separate CSV file. Valid options are 'dict' and 'file'.
file_format: str
The file format to use when saving the catalog table. Either 'csv' or 'parquet'.
If catalog_type is 'dict', this parameter is ignored.
to_csv_kwargs : dict, optional
Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method.
Compression is currently ignored when serializing to parquet.
json_dump_kwargs : dict, optional
Additional keyword arguments passed through to the :py:func:`~json.dump` function.
storage_options: dict
Expand All @@ -187,6 +194,22 @@ def save(

"""

if to_csv_kwargs is not None:
warnings.warn(
'to_csv_kwargs is deprecated and will be removed in a future version. '
'Please use read_kwargs instead.',
DeprecationWarning,
stacklevel=2,
)
if write_kwargs is not None:
raise ValueError(
'Cannot provide both `read_csv_kwargs` and `write_kwargs`. '
'Please use `write_kwargs`.'
)
write_kwargs = to_csv_kwargs

write_kwargs = write_kwargs or {}

if catalog_type not in {'file', 'dict'}:
raise ValueError(
f'catalog_type must be either "dict" or "file". Received catalog_type={catalog_type}'
Expand All @@ -207,7 +230,9 @@ def save(
for key in {'catalog_dict', 'catalog_file'}:
data.pop(key, None)
data['id'] = name
data['last_updated'] = datetime.datetime.now().utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
data['last_updated'] = datetime.datetime.now(datetime.timezone.utc).strftime(
'%Y-%m-%dT%H:%M:%SZ'
)

_tmp_df = self.df.copy(deep=True)

Expand All @@ -216,14 +241,20 @@ def save(

if catalog_type == 'file':
csv_kwargs: dict[str, typing.Any] = {'index': False}
csv_kwargs |= to_csv_kwargs or {}
csv_kwargs |= write_kwargs or {}
compression = csv_kwargs.get('compression', '')
extensions = {'gzip': '.gz', 'bz2': '.bz2', 'zip': '.zip', 'xz': '.xz'}
csv_file_name = f'{csv_file_name}{extensions.get(compression, "")}'
data['catalog_file'] = str(csv_file_name)

with fs.open(csv_file_name, 'wb') as csv_outfile:
_tmp_df.to_csv(csv_outfile, **csv_kwargs)
if file_format == 'csv':
csv_file_name = f'{csv_file_name}{extensions.get(compression, "")}'
data['catalog_file'] = str(csv_file_name)
with fs.open(csv_file_name, 'wb') as csv_outfile:
_tmp_df.to_csv(csv_outfile, **csv_kwargs)
elif file_format == 'parquet':
pq_file_name = f'{csv_file_name.rstrip(".csv")}.parquet'
data['catalog_file'] = str(pq_file_name)
write_kwargs.pop('compression', None)
with fs.open(pq_file_name, 'wb') as pq_outfile:
_tmp_df.to_parquet(pq_outfile, **write_kwargs)
else:
data['catalog_dict'] = _tmp_df.to_dict(orient='records')

Expand Down Expand Up @@ -601,7 +632,7 @@ def __init__(
elif self.catalog_file.endswith('.parquet'):
self.driver = 'polars'
self.filetype = 'parquet'
elif self.catalog_file.endswith('.csv.bz2'):
elif self.catalog_file.endswith('.csv.bz2') or self.catalog_file.endswith('.csv.xz'):
self.driver = 'pandas'
self.filetype = 'csv'
else:
Expand Down
36 changes: 31 additions & 5 deletions intake_esm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def search(
esmcat_results = pd.concat([esmcat_results, *derivedcat_results])
esmcat_results = esmcat_results[~esmcat_results.astype(str).duplicated()]

cat = self.__class__({'esmcat': self.esmcat.dict(), 'df': esmcat_results})
cat = self.__class__({'esmcat': self.esmcat.model_dump(), 'df': esmcat_results})
cat.esmcat.catalog_file = None # Don't save the catalog file
if self.esmcat.has_multiple_variable_assets:
requested_variables = list(set(variables or []).union(dependents))
Expand All @@ -515,11 +515,13 @@ def serialize(
name: pydantic.StrictStr,
directory: pydantic.DirectoryPath | pydantic.StrictStr | None = None,
catalog_type: str = 'dict',
file_format: str = 'csv',
write_kwargs: dict[typing.Any, typing.Any] | None = None,
to_csv_kwargs: dict[typing.Any, typing.Any] | None = None,
json_dump_kwargs: dict[typing.Any, typing.Any] | None = None,
storage_options: dict[str, typing.Any] | None = None,
) -> None:
"""Serialize catalog to corresponding json and csv files.
"""Serialize catalog to corresponding json and csv (or parquet) files.

Parameters
----------
Expand All @@ -529,8 +531,15 @@ def serialize(
The path to the local directory. If None, use the current directory
catalog_type: str, default 'dict'
Whether to save the catalog table as a dictionary in the JSON file or as a separate CSV file.
file_format: str, default 'csv'
The file format to use when saving the catalog table. Either 'csv' or 'parquet'.
If catalog_type is 'dict', this parameter is ignored.
write_kwargs: dict, optional
Additional keyword arguments passed through to the :py:func:`~pandas.DataFrame.to_csv` or
:py:func:`~pandas.DataFrame.to_parquet` functions.
Compression is currently ignored when serializing to parquet.
to_csv_kwargs : dict, optional
Additional keyword arguments passed through to the :py:meth:`~pandas.DataFrame.to_csv` method.
Deprecated alias for `write_kwargs`.
json_dump_kwargs : dict, optional
Additional keyword arguments passed through to the :py:func:`~json.dump` function.
storage_options: dict
Expand All @@ -540,7 +549,7 @@ def serialize(
Notes
-----
Large catalogs can result in large JSON files. To keep the JSON file size manageable, call with
`catalog_type='file'` to save catalog as a separate CSV file.
`catalog_type='file'` to save catalog as a separate CSV/parquet file.

Examples
--------
Expand All @@ -555,11 +564,28 @@ def serialize(
>>> cat_subset.serialize(name='cmip6_bcc_esm1', catalog_type='file')
"""

if to_csv_kwargs is not None:
warnings.warn(
'to_csv_kwargs is deprecated and will be removed in a future version. '
'Please use read_kwargs instead.',
DeprecationWarning,
stacklevel=2,
)
if write_kwargs is not None:
raise ValueError(
'Cannot provide both `read_csv_kwargs` and `write_kwargs`. '
'Please use `write_kwargs`.'
)
write_kwargs = to_csv_kwargs

write_kwargs = write_kwargs or {}

self.esmcat.save(
name,
directory=directory,
catalog_type=catalog_type,
to_csv_kwargs=to_csv_kwargs,
file_format=file_format,
write_kwargs=write_kwargs,
json_dump_kwargs=json_dump_kwargs,
storage_options=storage_options,
)
Expand Down
Loading
Loading