Skip to content

Support for multiple outputs in prefect tasks #38

@jhamman

Description

@jhamman

Prefect tasks can return multiple outputs and it would be nice if the FunnelResult supported this as well.

from typing import Tuple

import xarray as xr

from funnel import CacheStore
from funnel.prefect.result import FunnelResult
from prefect import task, Flow

store = CacheStore()


@task(result=FunnelResult(store, serializer='xarray.zarr'), target='foo')
def my_task() -> Tuple[xr.Dataset, xr.Dataset]:
    a = xr.DataArray([1, 2, 3], dims='x', name='foo-1').to_dataset()
    b = xr.DataArray([1, 2, 3], dims='x', name='foo-2').to_dataset()
    return a, b


with Flow('foo-flow') as flow:
    ds_a, ds_b = my_task()
    
flow.run()

This currently fails with the following error:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1398, in _construct_dataarray
    variable = self._variables[name]
KeyError: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/tasks/core/operators.py", line 38, in run
    return task_result[key]
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1502, in __getitem__
    return self._construct_dataarray(key)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1400, in _construct_dataarray
    _, name, variable = _get_virtual_variable(
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 158, in _get_virtual_variable
    raise KeyError(key)
KeyError: 0

But this has me thinking that the FunnelResult may need some slight modifications to handle multiple outputs. As a starting point, I'm curious if a modified artifact schema is necessary. In the example below I turn the json object into a list of artifacts

# /tmp/funnel_metadata_store/foo.artifact.json
[
  {
    "key": "foo[0]",
    "serializer": "xarray.zarr",
    "load_kwargs": {},
    "dump_kwargs": {},
    "created_at": "2021-12-03T23:19:11.186053"
  },
  {
    "key": "foo[1]",
    "serializer": "xarray.zarr",
    "load_kwargs": {},
    "dump_kwargs": {},
    "created_at": "2021-12-03T23:19:11.186053"
  }
]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions