@@ -31,6 +31,7 @@ def __init__(self, node_id: str, builder: GraphBuilder, session: 'Connection', m
31
31
self .session = session
32
32
self .graph = builder .processes
33
33
self .metadata = metadata
34
+ self .dynamic = DynamicCubeMethodDelegator (cube = self )
34
35
35
36
def __str__ (self ):
36
37
return "ImageCollection: %s" % self .node_id
@@ -1070,3 +1071,59 @@ def to_graphviz(self):
1070
1071
# TODO: add subgraph for "callback" arguments?
1071
1072
1072
1073
return graph
1074
+
1075
+
1076
+ class DynamicProcessException (Exception ):
1077
+ pass
1078
+
1079
+
1080
+ class _DynamicCubeMethod :
1081
+ """
1082
+ A dynamically detected process bound to a raster cube.
1083
+ The process should have a single "raster-cube" parameter.
1084
+ """
1085
+
1086
+ def __init__ (self , cube : ImageCollectionClient , process_id : str , parameters : List [dict ]):
1087
+ self .cube = cube
1088
+ self .process_id = process_id
1089
+ self .parameters = parameters
1090
+
1091
+ # Find raster-cube parameter.
1092
+ expected_schema = {"type" : "object" , "subtype" : "raster-cube" }
1093
+ names = [p ["name" ] for p in self .parameters if p ["schema" ] == expected_schema ]
1094
+ if len (names ) != 1 :
1095
+ raise DynamicProcessException ("Need one raster-cube parameter but found {c}" .format (c = len (names )))
1096
+ self .cube_parameter = names [0 ]
1097
+
1098
+ def __call__ (self , * args , ** kwargs ):
1099
+ """Call the "cube method": pass cube and other arguments to the process."""
1100
+ arguments = {
1101
+ self .cube_parameter : {"from_node" : self .cube .node_id }
1102
+ }
1103
+ # TODO: more advanced parameter checking (required vs optional), normalization based on type, ...
1104
+ for i , arg in enumerate (args ):
1105
+ arguments [self .parameters [i ]["name" ]] = arg
1106
+ for key , value in kwargs .items ():
1107
+ assert any (p ["name" ] == key for p in self .parameters )
1108
+ assert key not in arguments
1109
+ arguments [key ] = value
1110
+
1111
+ return self .cube .graph_add_process (
1112
+ process_id = self .process_id ,
1113
+ args = arguments ,
1114
+ )
1115
+
1116
+
1117
+ class DynamicCubeMethodDelegator :
1118
+ """
1119
+ Wrapper for a DataCube to group and delegate to dynamically detected processes
1120
+ (depending on a particular backend or API spec)
1121
+ """
1122
+
1123
+ def __init__ (self , cube : ImageCollectionClient ):
1124
+ self .cube = cube
1125
+
1126
+ def __getattr__ (self , process_id ):
1127
+ self .process_registry = self .cube .session .process_registry ()
1128
+ parameters = self .process_registry .get_parameters (process_id )
1129
+ return _DynamicCubeMethod (self .cube , process_id = process_id , parameters = parameters )
0 commit comments