From f4032d1ffd1c091271f4748ec6eb6081de05ea2d Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Tue, 13 Feb 2024 19:57:22 +0100 Subject: [PATCH 1/2] first implementation of run_udf --- .../process_implementations/__init__.py | 1 + .../process_implementations/udf/__init__.py | 1 + .../process_implementations/udf/udf.py | 23 +++++++++++++++++++ openeo_processes_dask/specs/openeo-processes | 2 +- 4 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 openeo_processes_dask/process_implementations/udf/__init__.py create mode 100644 openeo_processes_dask/process_implementations/udf/udf.py diff --git a/openeo_processes_dask/process_implementations/__init__.py b/openeo_processes_dask/process_implementations/__init__.py index aa7c3d15..87641b20 100644 --- a/openeo_processes_dask/process_implementations/__init__.py +++ b/openeo_processes_dask/process_implementations/__init__.py @@ -7,6 +7,7 @@ from .cubes import * from .logic import * from .math import * +from .udf import * try: from .ml import * diff --git a/openeo_processes_dask/process_implementations/udf/__init__.py b/openeo_processes_dask/process_implementations/udf/__init__.py new file mode 100644 index 00000000..6ccc776a --- /dev/null +++ b/openeo_processes_dask/process_implementations/udf/__init__.py @@ -0,0 +1 @@ +from .udf import run_udf diff --git a/openeo_processes_dask/process_implementations/udf/udf.py b/openeo_processes_dask/process_implementations/udf/udf.py new file mode 100644 index 00000000..1b2834df --- /dev/null +++ b/openeo_processes_dask/process_implementations/udf/udf.py @@ -0,0 +1,23 @@ +import xarray as xr +import dask.array as da + +from typing import Optional + +from openeo_processes_dask.process_implementations.data_model import RasterCube +from openeo.udf import UdfData +from openeo.udf.run_code import run_udf_code +from openeo.udf.xarraydatacube import XarrayDataCube + +__all__ = ["run_udf"] + + +def run_udf(data: da.Array, udf: str, runtime: str, context: Optional[dict] = None +) -> RasterCube: + data = XarrayDataCube(xr.DataArray(data)) + data = UdfData(proj={"EPSG": 900913}, datacube_list=[data], user_context=context) + result = run_udf_code(code=udf, data=data) + cubes = result.get_datacube_list() + if len(cubes) != 1: + raise ValueError(f"The provided UDF should return one datacube, but got: {result}") + result_array: xr.DataArray = cubes[0].array + return result_array \ No newline at end of file diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 12444d8b..03d1b2aa 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 12444d8bbc1a983e6b8df0ba956d057dc2d2224e +Subproject commit 03d1b2aa8f98c8868f736025c2bfab91ef58c8ee From 92bde6a54a6b3d66e43fd80cbc2113d3f953b14b Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Tue, 7 Jan 2025 11:17:43 +0100 Subject: [PATCH 2/2] added run_udf unit test --- tests/conftest.py | 2 +- tests/mockdata.py | 4 ++-- tests/test_udf.py | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 tests/test_udf.py diff --git a/tests/conftest.py b/tests/conftest.py index f7dfcf73..8b67d916 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -95,7 +95,7 @@ def polygon_geometry_small( @pytest.fixture def temporal_interval(interval=["2018-05-01", "2018-06-01"]) -> TemporalInterval: - return TemporalInterval.parse_obj(interval) + return TemporalInterval(interval) @pytest.fixture diff --git a/tests/mockdata.py b/tests/mockdata.py index ebd2c85d..1b11ff4d 100644 --- a/tests/mockdata.py +++ b/tests/mockdata.py @@ -42,8 +42,8 @@ def create_fake_rastercube( with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) t_coords = pd.date_range( - start=np.datetime64(temporal_extent.__root__[0].__root__), - end=np.datetime64(temporal_extent.__root__[1].__root__), + start=temporal_extent.start.to_numpy(), + end=temporal_extent.end.to_numpy(), periods=data.shape[2], ).values diff --git a/tests/test_udf.py b/tests/test_udf.py new file mode 100644 index 00000000..752dc3c9 --- /dev/null +++ b/tests/test_udf.py @@ -0,0 +1,39 @@ +import numpy as np +import openeo +import pytest +import xarray as xr + +from openeo_processes_dask.process_implementations.udf.udf import run_udf +from tests.general_checks import general_output_checks +from tests.mockdata import create_fake_rastercube + + +@pytest.mark.parametrize("size", [(6, 5, 4, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_run_udf(temporal_interval, bounding_box, random_raster_data): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + udf = """ +from openeo.udf import XarrayDataCube + +def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: + return cube +""" + + output_cube = run_udf(data=input_cube, udf=udf, runtime="Python") + + general_output_checks( + input_cube=input_cube, + output_cube=output_cube, + verify_attrs=True, + verify_crs=True, + expected_results=input_cube, + ) + + xr.testing.assert_equal(output_cube, input_cube)