Skip to content

Commit fe014b4

Browse files
committed
Issue #346 Some more ProcessArgs porting
for less boilerplate code and better/earlier error messages
1 parent aaffbe5 commit fe014b4

File tree

5 files changed

+138
-163
lines changed

5 files changed

+138
-163
lines changed

openeo_driver/ProcessGraphDeserializer.py

+112-156
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import time
1414
import warnings
1515
from pathlib import Path
16-
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union, Sequence
16+
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union, Sequence, Optional
1717

1818
import geopandas as gpd
1919
import numpy as np
@@ -931,22 +931,18 @@ def save_result(args: Dict, env: EvalEnv) -> SaveResult: # TODO: return type no
931931

932932
@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/save_ml_model.json"))
933933
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/save_ml_model.json"))
934-
def save_ml_model(args: dict, env: EvalEnv) -> MlModelResult:
935-
data: DriverMlModel = extract_arg(args, "data", process_id="save_ml_model")
936-
if not isinstance(data, DriverMlModel):
937-
raise ProcessParameterInvalidException(
938-
parameter="data", process="save_ml_model", reason=f"Invalid data type {type(data)!r} expected raster-cube."
939-
)
940-
options = args.get("options", {})
934+
def save_ml_model(args: ProcessArgs, env: EvalEnv) -> MlModelResult:
935+
data = args.get_required("data", expected_type=DriverMlModel)
936+
options = args.get_optional("options", default={}, expected_type=dict)
941937
return MlModelResult(ml_model=data, options=options)
942938

943939

944940
@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/load_ml_model.json"))
945941
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/load_ml_model.json"))
946-
def load_ml_model(args: dict, env: EvalEnv) -> DriverMlModel:
942+
def load_ml_model(args: ProcessArgs, env: EvalEnv) -> DriverMlModel:
947943
if env.get(ENV_DRY_RUN_TRACER):
948944
return DriverMlModel()
949-
job_id = extract_arg(args, "id")
945+
job_id = args.get_required("id", expected_type=str)
950946
return env.backend_implementation.load_ml_model(job_id)
951947

952948

@@ -1182,51 +1178,34 @@ def add_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
11821178

11831179

11841180
@process
1185-
def drop_dimension(args: dict, env: EvalEnv) -> DriverDataCube:
1186-
data_cube = extract_arg(args, 'data')
1187-
if not isinstance(data_cube, DriverDataCube):
1188-
raise ProcessParameterInvalidException(
1189-
parameter="data", process="drop_dimension",
1190-
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
1191-
)
1192-
return data_cube.drop_dimension(name=extract_arg(args, 'name'))
1181+
def drop_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1182+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1183+
name: str = args.get_required("name", expected_type=str)
1184+
return cube.drop_dimension(name=name)
11931185

11941186

11951187
@process
1196-
def dimension_labels(args: dict, env: EvalEnv) -> DriverDataCube:
1197-
data_cube = extract_arg(args, 'data')
1198-
if not isinstance(data_cube, DriverDataCube):
1199-
raise ProcessParameterInvalidException(
1200-
parameter="data", process="dimension_labels",
1201-
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
1202-
)
1203-
return data_cube.dimension_labels(dimension=extract_arg(args, 'dimension'))
1188+
def dimension_labels(args: ProcessArgs, env: EvalEnv) -> List[str]:
1189+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1190+
dimension: str = args.get_required("dimension", expected_type=str)
1191+
return cube.dimension_labels(dimension=dimension)
12041192

12051193

12061194
@process
1207-
def rename_dimension(args: dict, env: EvalEnv) -> DriverDataCube:
1208-
data_cube = extract_arg(args, 'data')
1209-
if not isinstance(data_cube, DriverDataCube):
1210-
raise ProcessParameterInvalidException(
1211-
parameter="data", process="rename_dimension",
1212-
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
1213-
)
1214-
return data_cube.rename_dimension(source=extract_arg(args, 'source'),target=extract_arg(args, 'target'))
1195+
def rename_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1196+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1197+
source: str = args.get_required("source", expected_type=str)
1198+
target: str = args.get_required("target", expected_type=str)
1199+
return cube.rename_dimension(source=source, target=target)
12151200

12161201

12171202
@process
1218-
def rename_labels(args: dict, env: EvalEnv) -> DriverDataCube:
1219-
data_cube = extract_arg(args, 'data')
1220-
if not isinstance(data_cube, DriverDataCube):
1221-
raise ProcessParameterInvalidException(
1222-
parameter="data", process="rename_labels",
1223-
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
1224-
)
1225-
return data_cube.rename_labels(
1226-
dimension=extract_arg(args, 'dimension'),
1227-
target=extract_arg(args, 'target'),
1228-
source=args.get('source',[])
1229-
)
1203+
def rename_labels(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1204+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1205+
dimension: str = args.get_required("dimension", expected_type=str)
1206+
target: List = args.get_required("target", expected_type=list)
1207+
source: Optional[list] = args.get_optional("source", default=None, expected_type=list)
1208+
return cube.rename_labels(dimension=dimension, target=target, source=source)
12301209

12311210

12321211
@process
@@ -1372,14 +1351,10 @@ def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
13721351

13731352

13741353
@process
1375-
def mask(args: dict, env: EvalEnv) -> DriverDataCube:
1376-
cube = extract_arg(args, 'data')
1377-
if not isinstance(cube, DriverDataCube):
1378-
raise ProcessParameterInvalidException(
1379-
parameter="data", process="mask", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
1380-
)
1381-
mask = extract_arg(args, 'mask')
1382-
replacement = args.get('replacement', None)
1354+
def mask(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1355+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1356+
mask: DriverDataCube = args.get_required("mask", expected_type=DriverDataCube)
1357+
replacement = args.get_optional("replacement", default=None)
13831358
return cube.mask(mask=mask, replacement=replacement)
13841359

13851360

@@ -1411,7 +1386,10 @@ def mask_polygon(args: dict, env: EvalEnv) -> DriverDataCube:
14111386
return image_collection
14121387

14131388

1414-
def _extract_temporal_extent(args: dict, field="extent", process_id="filter_temporal") -> Tuple[str, str]:
1389+
def _extract_temporal_extent(
1390+
args: Union[dict, ProcessArgs], field="extent", process_id="filter_temporal"
1391+
) -> Tuple[str, str]:
1392+
# TODO #346: make this a ProcessArgs method?
14151393
extent = extract_arg(args, name=field, process_id=process_id)
14161394
if len(extent) != 2:
14171395
raise ProcessParameterInvalidException(
@@ -1436,29 +1414,27 @@ def _extract_temporal_extent(args: dict, field="extent", process_id="filter_temp
14361414

14371415

14381416
@process
1439-
def filter_temporal(args: dict, env: EvalEnv) -> DriverDataCube:
1440-
cube = extract_arg(args, 'data')
1441-
if not isinstance(cube, DriverDataCube):
1442-
raise ProcessParameterInvalidException(
1443-
parameter="data", process="filter_temporal",
1444-
reason=f"Invalid data type {type(cube)!r} expected raster-cube."
1445-
)
1417+
def filter_temporal(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1418+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
14461419
extent = _extract_temporal_extent(args, field="extent", process_id="filter_temporal")
14471420
return cube.filter_temporal(start=extent[0], end=extent[1])
14481421

1422+
14491423
@process_registry_100.add_function(spec=read_spec("openeo-processes/1.x/proposals/filter_labels.json"))
14501424
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/filter_labels.json"))
1451-
def filter_labels(args: dict, env: EvalEnv) -> DriverDataCube:
1452-
cube = extract_arg(args, 'data')
1453-
if not isinstance(cube, DriverDataCube):
1454-
raise ProcessParameterInvalidException(
1455-
parameter="data", process="filter_labels",
1456-
reason=f"Invalid data type {type(cube)!r} expected cube."
1457-
)
1425+
def filter_labels(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1426+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1427+
# TODO: validation that condition is a process graph construct
1428+
condition = args.get_required("condition", expected_type=dict)
1429+
dimension = args.get_required("dimension", expected_type=str)
1430+
context = args.get_optional("context", default=None)
1431+
return cube.filter_labels(condition=condition, dimension=dimension, context=context, env=env)
14581432

1459-
return cube.filter_labels(condition=extract_arg(args,"condition"),dimension=extract_arg(args,"dimension"),context=args.get("context",None),env=env)
14601433

1461-
def _extract_bbox_extent(args: dict, field="extent", process_id="filter_bbox", handle_geojson=False) -> dict:
1434+
def _extract_bbox_extent(
1435+
args: Union[dict, ProcessArgs], field="extent", process_id="filter_bbox", handle_geojson=False
1436+
) -> dict:
1437+
# TODO #346: make this a ProcessArgs method?
14621438
extent = extract_arg(args, name=field, process_id=process_id)
14631439
if handle_geojson and extent.get("type") in [
14641440
"Polygon",
@@ -1483,24 +1459,16 @@ def _extract_bbox_extent(args: dict, field="extent", process_id="filter_bbox", h
14831459

14841460

14851461
@process
1486-
def filter_bbox(args: Dict, env: EvalEnv) -> DriverDataCube:
1487-
cube = extract_arg(args, 'data')
1488-
if not isinstance(cube, DriverDataCube):
1489-
raise ProcessParameterInvalidException(
1490-
parameter="data", process="filter_bbox", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
1491-
)
1462+
def filter_bbox(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1463+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
14921464
spatial_extent = _extract_bbox_extent(args, "extent", process_id="filter_bbox")
14931465
return cube.filter_bbox(**spatial_extent)
14941466

14951467

14961468
@process
1497-
def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:
1498-
cube = extract_arg(args, 'data')
1499-
geometries = extract_arg(args, 'geometries')
1500-
if not isinstance(cube, DriverDataCube):
1501-
raise ProcessParameterInvalidException(
1502-
parameter="data", process="filter_spatial", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
1503-
)
1469+
def filter_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1470+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1471+
geometries = args.get_required("geometries")
15041472

15051473
if isinstance(geometries, dict):
15061474
if "type" in geometries and geometries["type"] != "GeometryCollection":
@@ -1529,32 +1497,22 @@ def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:
15291497

15301498

15311499
@process
1532-
def filter_bands(args: Dict, env: EvalEnv) -> Union[DriverDataCube, DriverVectorCube]:
1533-
cube: Union[DriverDataCube, DriverVectorCube] = extract_arg(args, "data")
1534-
if not isinstance(cube, DriverDataCube) and not isinstance(cube, DriverVectorCube):
1535-
raise ProcessParameterInvalidException(
1536-
parameter="data", process="filter_bands", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
1537-
)
1538-
bands = extract_arg(args, "bands", process_id="filter_bands")
1500+
def filter_bands(args: ProcessArgs, env: EvalEnv) -> Union[DriverDataCube, DriverVectorCube]:
1501+
cube: Union[DriverDataCube, DriverVectorCube] = args.get_required(
1502+
"data", expected_type=(DriverDataCube, DriverVectorCube)
1503+
)
1504+
bands = args.get_required("bands", expected_type=list)
15391505
return cube.filter_bands(bands=bands)
15401506

15411507

15421508
@process
1543-
def apply_kernel(args: Dict, env: EvalEnv) -> DriverDataCube:
1544-
image_collection = extract_arg(args, 'data')
1545-
kernel = np.asarray(extract_arg(args, 'kernel'))
1546-
factor = args.get('factor', 1.0)
1547-
border = args.get('border', 0)
1548-
if not isinstance(image_collection, DriverDataCube):
1549-
raise ProcessParameterInvalidException(
1550-
parameter="data", process="apply_kernel",
1551-
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
1552-
)
1553-
if border == "0":
1554-
# R-client sends `0` border as a string
1555-
border = 0
1556-
replace_invalid = args.get('replace_invalid', 0)
1557-
return image_collection.apply_kernel(kernel=kernel, factor=factor, border=border, replace_invalid=replace_invalid)
1509+
def apply_kernel(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1510+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1511+
kernel = np.asarray(args.get_required("kernel", expected_type=list))
1512+
factor = args.get_optional("factor", default=1.0, expected_type=(int, float))
1513+
border = args.get_optional("border", default=0, expected_type=int)
1514+
replace_invalid = args.get_optional("replace_invalid", default=0, expected_type=(int, float))
1515+
return cube.apply_kernel(kernel=kernel, factor=factor, border=border, replace_invalid=replace_invalid)
15581516

15591517

15601518
@process
@@ -1586,16 +1544,30 @@ def resample_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
15861544

15871545

15881546
@process
1589-
def resample_cube_spatial(args: dict, env: EvalEnv) -> DriverDataCube:
1590-
image_collection = extract_arg(args, 'data')
1591-
target_image_collection = extract_arg(args, 'target')
1592-
method = args.get('method', 'near')
1593-
if not isinstance(image_collection, DriverDataCube):
1594-
raise ProcessParameterInvalidException(
1595-
parameter="data", process="resample_cube_spatial",
1596-
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
1597-
)
1598-
return image_collection.resample_cube_spatial(target=target_image_collection, method=method)
1547+
def resample_cube_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1548+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1549+
target: DriverDataCube = args.get_required("target", expected_type=DriverDataCube)
1550+
method = args.get_enum(
1551+
"method",
1552+
options=[
1553+
"average",
1554+
"bilinear",
1555+
"cubic",
1556+
"cubicspline",
1557+
"lanczos",
1558+
"max",
1559+
"med",
1560+
"min",
1561+
"mode",
1562+
"near",
1563+
"q1",
1564+
"q3",
1565+
"rms",
1566+
"sum",
1567+
],
1568+
default="near",
1569+
)
1570+
return cube.resample_cube_spatial(target=target, method=method)
15991571

16001572

16011573
@process
@@ -1694,20 +1666,17 @@ def run_udf(args: dict, env: EvalEnv):
16941666

16951667

16961668
@process
1697-
def linear_scale_range(args: dict, env: EvalEnv) -> DriverDataCube:
1698-
image_collection = extract_arg(args, 'x')
1699-
1700-
inputMin = extract_arg(args, "inputMin")
1701-
inputMax = extract_arg(args, "inputMax")
1702-
outputMax = args.get("outputMax", 1.0)
1703-
outputMin = args.get("outputMin", 0.0)
1704-
if not isinstance(image_collection, DriverDataCube):
1705-
raise ProcessParameterInvalidException(
1706-
parameter="data", process="linear_scale_range",
1707-
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
1708-
)
1709-
1710-
return image_collection.linear_scale_range(inputMin, inputMax, outputMin, outputMax)
1669+
def linear_scale_range(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
1670+
# TODO: eliminate this top-level linear_scale_range process implementation (should be used as `apply` callback)
1671+
_log.warning("DEPRECATED: linear_scale_range usage directly on cube is deprecated/non-standard.")
1672+
cube: DriverDataCube = args.get_required("x", expected_type=DriverDataCube)
1673+
# Note: non-standard camelCase parameter names (https://github.com/Open-EO/openeo-processes/issues/302)
1674+
input_min = args.get_required("inputMin")
1675+
input_max = args.get_required("inputMax")
1676+
output_min = args.get_optional("outputMin", default=0.0)
1677+
output_max = args.get_optional("outputMax", default=1.0)
1678+
# TODO linear_scale_range is defined on GeopysparkDataCube, but not on DriverDataCube
1679+
return cube.linear_scale_range(input_min, input_max, output_min, output_max)
17111680

17121681

17131682
@process
@@ -1984,14 +1953,10 @@ def get_geometries(args: Dict, env: EvalEnv) -> Union[DelayedVector, dict]:
19841953
.param('data', description="A raster data cube.", schema={"type": "object", "subtype": "raster-cube"})
19851954
.returns("vector-cube", schema={"type": "object", "subtype": "vector-cube"})
19861955
)
1987-
def raster_to_vector(args: Dict, env: EvalEnv):
1988-
image_collection = extract_arg(args, 'data')
1989-
if not isinstance(image_collection, DriverDataCube):
1990-
raise ProcessParameterInvalidException(
1991-
parameter="data", process="raster_to_vector",
1992-
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
1993-
)
1994-
return image_collection.raster_to_vector()
1956+
def raster_to_vector(args: ProcessArgs, env: EvalEnv):
1957+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
1958+
# TODO: raster_to_vector is only defined on GeopysparkDataCube, not DriverDataCube
1959+
return cube.raster_to_vector()
19951960

19961961

19971962
@non_standard_process(
@@ -2231,13 +2196,8 @@ def discard_result(args: ProcessArgs, env: EvalEnv):
22312196

22322197
@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/mask_scl_dilation.json"))
22332198
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/mask_scl_dilation.json"))
2234-
def mask_scl_dilation(args: Dict, env: EvalEnv):
2235-
cube: DriverDataCube = extract_arg(args, 'data')
2236-
if not isinstance(cube, DriverDataCube):
2237-
raise ProcessParameterInvalidException(
2238-
parameter="data", process="mask_scl_dilation",
2239-
reason=f"Invalid data type {type(cube)!r} expected raster-cube."
2240-
)
2199+
def mask_scl_dilation(args: ProcessArgs, env: EvalEnv):
2200+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
22412201
if hasattr(cube, "mask_scl_dilation"):
22422202
the_args = args.copy()
22432203
del the_args["data"]
@@ -2268,13 +2228,8 @@ def to_scl_dilation_mask(args: ProcessArgs, env: EvalEnv):
22682228

22692229
@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/mask_l1c.json"))
22702230
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/mask_l1c.json"))
2271-
def mask_l1c(args: Dict, env: EvalEnv):
2272-
cube: DriverDataCube = extract_arg(args, 'data')
2273-
if not isinstance(cube, DriverDataCube):
2274-
raise ProcessParameterInvalidException(
2275-
parameter="data", process="mask_l1c",
2276-
reason=f"Invalid data type {type(cube)!r} expected raster-cube."
2277-
)
2231+
def mask_l1c(args: ProcessArgs, env: EvalEnv):
2232+
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
22782233
if hasattr(cube, "mask_l1c"):
22792234
return cube.mask_l1c()
22802235
else:
@@ -2369,10 +2324,11 @@ def load_result(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
23692324

23702325
@process_registry_100.add_function(spec=read_spec("openeo-processes/1.x/proposals/inspect.json"))
23712326
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/inspect.json"))
2372-
def inspect(args: dict, env: EvalEnv):
2373-
data = extract_arg(args, "data")
2374-
message = args.get("message", "")
2375-
level = args.get("level", "info")
2327+
def inspect(args: ProcessArgs, env: EvalEnv):
2328+
data = args.get_required("data")
2329+
message = args.get_optional("message", default="")
2330+
code = args.get_optional("code", default="User")
2331+
level = args.get_optional("level", default="info")
23762332
if message:
23772333
_log.log(level=logging.getLevelName(level.upper()), msg=message)
23782334
data_message = str(data)

0 commit comments

Comments
 (0)