Skip to content

Commit dba96de

Browse files
committed
Issue #40: add delegator for dynamically detected cube processes
adds a property `dynamic` to ImageCollectionClient instances, which allows to call processes that are dynamically detected from the backend process listing (and not necessarily predefined in the client)
1 parent cd204f4 commit dba96de

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

openeo/rest/connection.py

+30
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def __init__(self, url, auth: AuthBase = None, session: requests.Session = None)
174174
"""
175175
super().__init__(root_url=url, auth=auth, session=session)
176176
self._cached_capabilities = None
177+
self._process_registry = None
177178

178179
# Initial API version check.
179180
if self._api_version.below(self._MINIMUM_API_VERSION):
@@ -267,6 +268,15 @@ def capabilities(self) -> 'Capabilities':
267268

268269
return self._cached_capabilities
269270

271+
def process_registry(self) -> 'ProcessRegistry':
272+
"""
273+
Load all processes supported by the backend (lazy/cached)
274+
:return: ProcessRegistry
275+
"""
276+
if self._process_registry is None:
277+
self._process_registry = ProcessRegistry.from_connection(connection=self)
278+
return self._process_registry
279+
270280
@deprecated("Use 'list_output_formats' instead")
271281
def list_file_types(self) -> dict:
272282
return self.list_output_formats()
@@ -554,3 +564,23 @@ def session(userid=None, endpoint: str = "https://openeo.org/openeo") -> Connect
554564
"""
555565
return connect(url=endpoint)
556566

567+
568+
class ProcessRegistry:
569+
"""
570+
Registry of process specs (e.g. the processes supported by a backend)
571+
"""
572+
def __init__(self, processes: dict):
573+
self._reg = processes
574+
575+
@classmethod
576+
def from_connection(cls, connection=Connection):
577+
"""Factory to load process registry from given backend connection."""
578+
# Get as list from API
579+
processes = connection.get('/processes').json()['processes']
580+
# Make it a dictionary for more efficient retrieval
581+
processes = {p['id']: p for p in processes}
582+
return cls(processes=processes)
583+
584+
def get_parameters(self, process_id: str) -> List[dict]:
585+
"""Get parameters for given process_id."""
586+
return self._reg[process_id]["parameters"]

openeo/rest/imagecollectionclient.py

+57
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(self, node_id: str, builder: GraphBuilder, session: 'Connection', m
3131
self.session = session
3232
self.graph = builder.processes
3333
self.metadata = metadata
34+
self.dynamic = DynamicCubeMethodDelegator(cube=self)
3435

3536
def __str__(self):
3637
return "ImageCollection: %s" % self.node_id
@@ -1070,3 +1071,59 @@ def to_graphviz(self):
10701071
# TODO: add subgraph for "callback" arguments?
10711072

10721073
return graph
1074+
1075+
1076+
class DynamicProcessException(Exception):
1077+
pass
1078+
1079+
1080+
class _DynamicCubeMethod:
1081+
"""
1082+
A dynamically detected process bound to a raster cube.
1083+
The process should have a single "raster-cube" parameter.
1084+
"""
1085+
1086+
def __init__(self, cube: ImageCollectionClient, process_id: str, parameters: List[dict]):
1087+
self.cube = cube
1088+
self.process_id = process_id
1089+
self.parameters = parameters
1090+
1091+
# Find raster-cube parameter.
1092+
expected_schema = {"type": "object", "subtype": "raster-cube"}
1093+
names = [p["name"] for p in self.parameters if p["schema"] == expected_schema]
1094+
if len(names) != 1:
1095+
raise DynamicProcessException("Need one raster-cube parameter but found {c}".format(c=len(names)))
1096+
self.cube_parameter = names[0]
1097+
1098+
def __call__(self, *args, **kwargs):
1099+
"""Call the "cube method": pass cube and other arguments to the process."""
1100+
arguments = {
1101+
self.cube_parameter: {"from_node": self.cube.node_id}
1102+
}
1103+
# TODO: more advanced parameter checking (required vs optional), normalization based on type, ...
1104+
for i, arg in enumerate(args):
1105+
arguments[self.parameters[i]["name"]] = arg
1106+
for key, value in kwargs.items():
1107+
assert any(p["name"] == key for p in self.parameters)
1108+
assert key not in arguments
1109+
arguments[key] = value
1110+
1111+
return self.cube.graph_add_process(
1112+
process_id=self.process_id,
1113+
args=arguments,
1114+
)
1115+
1116+
1117+
class DynamicCubeMethodDelegator:
1118+
"""
1119+
Wrapper for a DataCube to group and delegate to dynamically detected processes
1120+
(depending on a particular backend or API spec)
1121+
"""
1122+
1123+
def __init__(self, cube: ImageCollectionClient):
1124+
self.cube = cube
1125+
1126+
def __getattr__(self, process_id):
1127+
self.process_registry = self.cube.session.process_registry()
1128+
parameters = self.process_registry.get_parameters(process_id)
1129+
return _DynamicCubeMethod(self.cube, process_id=process_id, parameters=parameters)

tests/rest/test_imagecollectionclient.py

+28
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
def session040(requests_mock):
1515
requests_mock.get(API_URL + "/", json={"api_version": "0.4.0"})
1616
session = openeo.connect(API_URL)
17+
# Reset graph builder
18+
GraphBuilder.id_counter = {}
1719
return session
1820

1921

@@ -82,3 +84,29 @@ def result_callback(request, context):
8284
path = tmpdir.join("tmp.tiff")
8385
session040.load_collection("SENTINEL2").download(str(path), format="GTIFF")
8486
assert path.read() == "tiffdata"
87+
88+
89+
def test_dynamic_cube_method(session040, requests_mock):
90+
processes = [
91+
{
92+
"id": "make_larger",
93+
"description": "multiply a raster cube with a factor",
94+
"parameters": [
95+
{"name": "data", "schema": {"type": "object", "subtype": "raster-cube"}},
96+
{"name": "factor", "schema": {"type": "float"}},
97+
]}
98+
]
99+
requests_mock.get(API_URL + '/processes', json={"processes": processes})
100+
requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"})
101+
102+
cube = session040.load_collection("SENTINEL2")
103+
evi = cube.dynamic.make_larger(factor=42)
104+
assert set(evi.graph.keys()) == {"loadcollection1", "makelarger1"}
105+
assert evi.graph["makelarger1"] == {
106+
"process_id": "make_larger",
107+
"arguments": {
108+
"data": {"from_node": "loadcollection1"},
109+
"factor": 42,
110+
},
111+
"result": False
112+
}

0 commit comments

Comments
 (0)