@@ -137,13 +137,41 @@ def operator_value(criterion: Dict[str, object]) -> (str, object):
137
137
138
138
collection = None
139
139
140
- # TODO: `user` might be None
141
- dependency_job_info = (extract_own_job_info (url , user_id = user .user_id , batch_jobs = batch_jobs ) if batch_jobs
142
- else None )
140
+ backend_config = get_backend_config ()
141
+ max_poll_time = time .time () + backend_config .job_dependencies_max_poll_delay_seconds
142
+
143
+ def get_dependency_job_info () -> Optional [BatchJobMetadata ]:
144
+ # TODO: `user` might be None
145
+ return (extract_own_job_info (url , user_id = user .user_id , batch_jobs = batch_jobs ) if batch_jobs
146
+ else None )
147
+
148
+ dependency_job_info = get_dependency_job_info ()
143
149
144
150
if dependency_job_info :
145
- # TODO: handle polling own job results as well (~ GpsBatchJobs.poll_job_dependencies)
146
151
logger .info (f"load_stac of results of own job { dependency_job_info .id } " )
152
+
153
+ while True :
154
+ partial_job_status = PARTIAL_JOB_STATUS .for_job_status (dependency_job_info .status )
155
+
156
+ logger .debug (f"OpenEO batch job results status of own job { dependency_job_info .id } : { partial_job_status } " )
157
+
158
+ if partial_job_status in [PARTIAL_JOB_STATUS .ERROR , PARTIAL_JOB_STATUS .CANCELED ]:
159
+ logger .error (f"Failing because own OpenEO batch job { dependency_job_info .id } failed" )
160
+ elif partial_job_status in [None , PARTIAL_JOB_STATUS .FINISHED ]:
161
+ break # not a partial job result or success: proceed
162
+
163
+ # still running: continue polling
164
+ if time .time () >= max_poll_time :
165
+ max_poll_delay_reached_error = (f"OpenEO batch job results dependency of"
166
+ f"own job { dependency_job_info .id } was not satisfied after"
167
+ f" { backend_config .job_dependencies_max_poll_delay_seconds } s, aborting" )
168
+
169
+ raise Exception (max_poll_delay_reached_error )
170
+
171
+ time .sleep (backend_config .job_dependencies_poll_interval_seconds )
172
+
173
+ dependency_job_info = get_dependency_job_info ()
174
+
147
175
intersecting_items = []
148
176
149
177
for asset_id , asset in batch_jobs .get_result_assets (job_id = dependency_job_info .id ,
@@ -185,22 +213,19 @@ def operator_value(criterion: Dict[str, object]) -> (str, object):
185
213
logger .info (f"load_stac of arbitrary URL { url } " )
186
214
187
215
# TODO: move this polling logic to a dedicated method
188
- backend_config = get_backend_config ()
189
- max_poll_time = time .time () + backend_config .job_dependencies_max_poll_delay_seconds
190
-
191
216
while True :
192
217
# TODO: add retry
193
218
stac_object = pystac .read_file (href = url ) # TODO: does this have a timeout set?
194
219
195
- job_results_status = (stac_object
220
+ partial_job_status = (stac_object
196
221
.to_dict (include_self_link = False , transform_hrefs = False )
197
222
.get ('openeo:status' ))
198
223
199
- logger .debug (f"OpenEO batch job results status for { url } : { job_results_status } " )
224
+ logger .debug (f"OpenEO batch job results status of { url } : { partial_job_status } " )
200
225
201
- if job_results_status in [PARTIAL_JOB_STATUS .ERROR , PARTIAL_JOB_STATUS .CANCELED ]:
226
+ if partial_job_status in [PARTIAL_JOB_STATUS .ERROR , PARTIAL_JOB_STATUS .CANCELED ]:
202
227
logger .error (f"Failing because OpenEO batch job with results at { url } failed" )
203
- elif job_results_status in [None , PARTIAL_JOB_STATUS .FINISHED ]:
228
+ elif partial_job_status in [None , PARTIAL_JOB_STATUS .FINISHED ]:
204
229
break # not a partial job result or success: proceed
205
230
206
231
# still running: continue polling
0 commit comments