Skip to content

Commit a96b615

Browse files
committed
Issue #366 add support for extracting job option defaults from remote process definitions
1 parent 93dfaaa commit a96b615

File tree

9 files changed

+260
-35
lines changed

9 files changed

+260
-35
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ and start a new "In Progress" section above it.
2121

2222
## In progress
2323

24+
25+
## 0.126.0
26+
2427
- Add STAC collections conformance class ([#195](https://github.com/Open-EO/openeo-python-driver/issues/195))
2528
- update openeo_driver/specs/openeo-api/1.x submodule to tag `1.2.0` ([#195](https://github.com/Open-EO/openeo-python-driver/issues/195))
29+
- Extract job option defaults from UDPs and remote process descriptions ([#366](https://github.com/Open-EO/openeo-python-driver/issues/366), based on Process Parameter Extension: [Open-EO/openeo-api#471](https://github.com/Open-EO/openeo-api/pull/471))
2630

2731

2832
## 0.125.0

openeo_driver/ProcessGraphDeserializer.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
CollectionNotFoundException, ProcessUnsupportedException,
5959
)
6060
from openeo_driver.processes import ProcessRegistry, ProcessSpec, DEFAULT_NAMESPACE, ProcessArgs
61-
from openeo_driver.processgraph import get_process_definition_from_url
61+
from openeo_driver.processgraph import get_process_definition_from_url, ProcessDefinition
6262
from openeo_driver.save_result import (
6363
JSONResult,
6464
SaveResult,
@@ -71,6 +71,7 @@
7171
from openeo_driver.specs import SPECS_ROOT, read_spec
7272
from openeo_driver.util.date_math import month_shift
7373
from openeo_driver.util.geometry import geojson_to_geometry, geojson_to_multipolygon, spatial_extent_union, BoundingBox
74+
from openeo_driver.util.http import is_http_url
7475
from openeo_driver.util.utm import auto_utm_epsg_for_geometry
7576
from openeo_driver.utils import EvalEnv, smart_bool
7677

@@ -1789,7 +1790,7 @@ def apply_process(process_id: str, args: dict, namespace: Union[str, None], env:
17891790
args = {name: convert_node(expr, env=env) for (name, expr) in sorted(args.items())}
17901791

17911792
# when all arguments and dependencies are resolved, we can run the process
1792-
if namespace and any(namespace.startswith(p) for p in ["http://", "https://"]):
1793+
if is_http_url(namespace):
17931794
if namespace.startswith("http://"):
17941795
_log.warning(f"HTTP protocol for namespace based remote process definitions is discouraged: {namespace!r}")
17951796
# TODO: security aspects: only allow for certain users, only allow whitelisted domains, support content hash verification ...?
@@ -2067,7 +2068,7 @@ def evaluate_process_from_url(process_id: str, namespace: str, args: dict, env:
20672068
:param namespace: URL of process definition
20682069
"""
20692070
try:
2070-
process_definition = get_process_definition_from_url(process_id=process_id, url=namespace)
2071+
process_definition: ProcessDefinition = get_process_definition_from_url(process_id=process_id, url=namespace)
20712072
except OpenEOApiException:
20722073
raise
20732074
except Exception as e:

openeo_driver/_version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.125.0a1"
1+
__version__ = "0.126.0a1"

openeo_driver/processgraph.py

+76-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,33 @@
11
import logging
22
import requests
3-
from typing import NamedTuple, List, Optional
3+
from typing import NamedTuple, List, Optional, Union
44
from openeo_driver.errors import OpenEOApiException
5+
from openeo_driver.util.http import is_http_url
56

67
_log = logging.getLogger(__name__)
78

89

10+
class ProcessGraphFlatDict(dict):
11+
"""
12+
Wrapper for the classic "flat dictionary" representation
13+
of an openEO process graph, e.g.
14+
15+
{
16+
"lc1": {"process_id": "load_collection", "arguments": {...
17+
"rd1": {"process_id": "reduce_dimension", "arguments": {...
18+
...
19+
}
20+
21+
- To be used as type annotation where one wants to clarify
22+
what exact kind of dictionary-based process graph representation is expected.
23+
- Implemented as a subclass of `dict` to be directly compatible
24+
with existing, legacy code that expects a simple dictionary.
25+
"""
26+
27+
# TODO: move this to openeo python client library?
28+
pass
29+
30+
931
class ProcessDefinition(NamedTuple):
1032
"""
1133
Like `UserDefinedProcessMetadata`, but with different defaults
@@ -21,6 +43,11 @@ class ProcessDefinition(NamedTuple):
2143
# Definition what the process returns
2244
returns: Optional[dict] = None
2345

46+
# TODO: official naming of these "processing parameter" related properties is undecided at the moment.
47+
# see https://github.com/Open-EO/openeo-api/pull/471#discussion_r1904253964
48+
default_job_options: Optional[dict] = None
49+
default_synchronous_options: Optional[dict] = None
50+
2451

2552
def get_process_definition_from_url(process_id: str, url: str) -> ProcessDefinition:
2653
"""
@@ -69,9 +96,57 @@ def get_process_definition_from_url(process_id: str, url: str) -> ProcessDefinit
6996
message=f"No valid process definition for {process_id!r} found at {url!r}.",
7097
)
7198

99+
# TODO: official property name for these "processing parameters" is undecided at the moment.
100+
# see https://github.com/Open-EO/openeo-api/pull/471#discussion_r1904253964
101+
if "default_job_parameters" in spec:
102+
_log.warning("Extracting experimental 'default_job_parameters' from process definition.")
103+
default_job_options = spec["default_job_parameters"]
104+
else:
105+
default_job_options = None
106+
if "default_synchronous_parameters" in spec:
107+
_log.warning("Extracting experimental 'default_synchronous_parameters' from process definition.")
108+
default_synchronous_options = spec["default_synchronous_parameters"]
109+
else:
110+
default_synchronous_options = None
111+
72112
return ProcessDefinition(
73113
id=process_id,
74114
process_graph=spec["process_graph"],
75115
parameters=spec.get("parameters", []),
76116
returns=spec.get("returns"),
117+
default_job_options=default_job_options,
118+
default_synchronous_options=default_synchronous_options,
77119
)
120+
121+
122+
def extract_default_job_options_from_process_graph(
123+
process_graph: ProcessGraphFlatDict, processing_mode: str = "batch_job"
124+
) -> Union[dict, None]:
125+
"""
126+
Extract default job options from a process definitions in process graph.
127+
based on "Processing Parameters" extension.
128+
129+
:param process_graph: process graph in flat graph format
130+
:param processing_mode: "batch_job" or "synchronous"
131+
"""
132+
133+
job_options = []
134+
for node in process_graph.values():
135+
namespace = node.get("namespace")
136+
process_id = node["process_id"]
137+
if is_http_url(namespace):
138+
process_definition = get_process_definition_from_url(process_id=process_id, url=namespace)
139+
if processing_mode == "batch_job" and process_definition.default_job_options:
140+
job_options.append(process_definition.default_job_options)
141+
elif processing_mode == "synchronous" and process_definition.default_synchronous_options:
142+
job_options.append(process_definition.default_synchronous_options)
143+
144+
if len(job_options) == 0:
145+
return None
146+
elif len(job_options) == 1:
147+
return job_options[0]
148+
else:
149+
# TODO: how to combine multiple default for same parameters?
150+
raise NotImplementedError(
151+
"Merging multiple default job options from different process definitions is not yet implemented."
152+
)

openeo_driver/util/http.py

+8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@
88
from openeo.util import repr_truncate
99

1010

11+
def is_http_url(data: Union[str, None], *, allow_http: bool = True, allow_https: bool = True) -> bool:
12+
"""Does given string look like a valid HTTP(S) URL?"""
13+
if not isinstance(data, str):
14+
return False
15+
data = data.lower()
16+
return (allow_http and data.startswith("http://")) or (allow_https and data.startswith("https://"))
17+
18+
1119
def requests_with_retry(
1220
total: int = 3, # the number of retries, not attempts (which is retries + 1)
1321
backoff_factor: float = 1,

openeo_driver/views.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from openeo.utils.version import ComparableVersion
2828
from openeo.util import dict_no_none, deep_get, Rfc3339, TimingLogger
29+
from openeo_driver.processgraph import extract_default_job_options_from_process_graph, ProcessGraphFlatDict
2930
from openeo_driver.urlsigning import UrlSigner
3031
from openeo_driver.backend import (
3132
ServiceMetadata,
@@ -590,11 +591,12 @@ def me(user: User):
590591
)
591592

592593

593-
def _extract_process_graph(post_data: dict) -> dict:
594+
def _extract_process_graph(post_data: dict) -> ProcessGraphFlatDict:
594595
"""
595596
Extract process graph dictionary from POST data
596597
597598
see https://github.com/Open-EO/openeo-api/pull/262
599+
:return: process graph in flat graph format
598600
"""
599601
try:
600602
if requested_api_version().at_least("1.0.0"):
@@ -607,7 +609,7 @@ def _extract_process_graph(post_data: dict) -> dict:
607609
if not isinstance(pg, dict):
608610
# TODO: more validity checks for (flat) process graph?
609611
raise ProcessGraphInvalidException
610-
return pg
612+
return ProcessGraphFlatDict(pg)
611613

612614

613615
def _extract_job_options(post_data: dict, to_ignore: typing.Container[str]) -> Union[dict, None]:
@@ -874,12 +876,20 @@ def register_views_batch_jobs(
874876
def create_job(user: User):
875877
# TODO: wrap this job specification in a 1.0-style ProcessGrahpWithMetadata?
876878
post_data = request.get_json()
879+
process_graph = _extract_process_graph(post_data)
877880
# TODO: preserve original non-process_graph process fields too?
878-
process = {"process_graph": _extract_process_graph(post_data)}
879-
# TODO: this "job_options" is not part of official API. See https://github.com/Open-EO/openeo-api/issues/276
881+
process = {"process_graph": process_graph}
882+
880883
job_options = _extract_job_options(
881884
post_data, to_ignore=["process", "process_graph", "title", "description", "plan", "budget", "log_level"]
882885
)
886+
job_option_defaults = extract_default_job_options_from_process_graph(
887+
process_graph=process_graph, processing_mode="batch_job"
888+
)
889+
if job_option_defaults:
890+
_log.info(f"Extending {job_options=} with extracted {job_option_defaults=}")
891+
job_options = {**job_option_defaults, **(job_options or {})}
892+
883893
metadata = {k: post_data[k] for k in ["title", "description", "plan", "budget"] if k in post_data}
884894
metadata["log_level"] = post_data.get("log_level", DEFAULT_LOG_LEVEL_PROCESSING)
885895
job_info = backend_implementation.batch_jobs.create_job(

tests/test_processgraph.py

+47-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from openeo_driver.errors import OpenEOApiException
2-
from openeo_driver.processgraph import get_process_definition_from_url
2+
from openeo_driver.processgraph import (
3+
get_process_definition_from_url,
4+
extract_default_job_options_from_process_graph,
5+
ProcessGraphFlatDict,
6+
)
37

48
import pytest
59

@@ -53,3 +57,45 @@ def test_get_process_definition_from_url_listing(self, requests_mock):
5357
}
5458
assert pd.parameters == [{"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}}]
5559
assert pd.returns == {"schema": {"type": "number"}}
60+
61+
62+
def test_extract_default_job_options_from_process_graph(requests_mock):
63+
requests_mock.get(
64+
"https://share.test/add3.json",
65+
json={
66+
"id": "add3",
67+
"process_graph": {
68+
"add": {"process_id": "add", "arguments": {"x": {"from_parameter": "x", "y": 3}, "result": True}}
69+
},
70+
"parameters": [
71+
{"name": "x", "schema": {"type": "number"}},
72+
],
73+
"returns": {"schema": {"type": "number"}},
74+
"default_job_parameters": {
75+
"memory": "2GB",
76+
"cpu": "yellow",
77+
},
78+
"default_synchronous_parameters": {
79+
"cpu": "green",
80+
},
81+
},
82+
)
83+
84+
pg = ProcessGraphFlatDict(
85+
{
86+
"add12": {"process_id": "add", "arguments": {"x": 1, "y": 2}},
87+
"add3": {
88+
"process_id": "add3",
89+
"namespace": "https://share.test/add3.json",
90+
"arguments": {"x": {"from_node": "add12"}},
91+
"result": True,
92+
},
93+
}
94+
)
95+
assert extract_default_job_options_from_process_graph(pg, processing_mode="batch_job") == {
96+
"memory": "2GB",
97+
"cpu": "yellow",
98+
}
99+
assert extract_default_job_options_from_process_graph(pg, processing_mode="synchronous") == {
100+
"cpu": "green",
101+
}

0 commit comments

Comments
 (0)