25
25
"scheduler_type" ,
26
26
"scheduler_id" ,
27
27
]
28
-
29
28
EXCLUDED_JOB_STATE_FIELDS = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS + ["job_input" ]
30
-
31
29
OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS = EXCLUDED_JOB_STATE_FIELDS + ["user" , "wsid" ]
32
30
33
31
EXTRA_JOB_STATE_FIELDS = ["batch_id" , "child_jobs" ]
@@ -100,7 +98,7 @@ def __init__(self, ee2_state, extra_data=None, children=None):
100
98
if ee2_state .get ("job_id" ) is None :
101
99
raise ValueError ("Cannot create a job without a job ID!" )
102
100
103
- self ._acc_state = ee2_state
101
+ self ._update_state ( ee2_state )
104
102
self .extra_data = extra_data
105
103
106
104
# verify parent-children relationship
@@ -325,20 +323,30 @@ def _update_state(self, state: dict) -> None:
325
323
"""
326
324
given a state data structure (as emitted by ee2), update the stored state in the job object
327
325
"""
328
- if state :
326
+ if not isinstance (state , dict ):
327
+ raise TypeError ("state must be a dict" )
329
328
329
+ # Check job_id match
330
+ if self ._acc_state :
330
331
if "job_id" in state and state ["job_id" ] != self .job_id :
331
332
raise ValueError (
332
333
f"Job ID mismatch in _update_state: job ID: { self .job_id } ; state ID: { state ['job_id' ]} "
333
334
)
334
335
335
- state = copy .deepcopy (state )
336
- if self ._acc_state is None :
337
- self ._acc_state = state
338
- else :
339
- self ._acc_state .update (state )
336
+ # Check if there would be no change in updating
337
+ # i.e., if state <= self._acc_state
338
+ if self ._acc_state is not None :
339
+ if {** self ._acc_state , ** state } == self ._acc_state :
340
+ return
341
+
342
+ state = copy .deepcopy (state )
343
+ if self ._acc_state is None :
344
+ self ._acc_state = state
345
+ else :
346
+ self ._acc_state .update (state )
347
+ self .last_updated = time .time_ns ()
340
348
341
- def state (self , force_refresh = False ):
349
+ def state (self , force_refresh = False , exclude = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS ):
342
350
"""
343
351
Queries the job service to see the state of the current job.
344
352
"""
@@ -347,47 +355,63 @@ def state(self, force_refresh=False):
347
355
state = self .query_ee2_state (self .job_id , init = False )
348
356
self ._update_state (state )
349
357
350
- return self ._internal_state (JOB_INIT_EXCLUDED_JOB_STATE_FIELDS )
358
+ return self ._internal_state (exclude )
351
359
352
360
def _internal_state (self , exclude = None ):
353
361
"""Wrapper for self._acc_state"""
354
362
state = copy .deepcopy (self ._acc_state )
355
363
self ._trim_ee2_state (state , exclude )
356
364
return state
357
365
358
- def output_state (self , state = None ) -> dict :
366
+ def output_state (self , state = None , no_refresh = False ) -> dict :
359
367
"""
360
- :param state: can be queried individually from ee2/cache with self.state(),
361
- but sometimes want it to be queried in bulk from ee2 upstream
362
- :return: dict, with structure
363
-
364
- {
365
- outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result
366
- jobState: {
367
- job_id: string,
368
- status: string,
369
- created: epoch ms,
370
- updated: epoch ms,
371
- queued: optional - epoch ms,
372
- finished: optional - epoc ms,
373
- terminated_code: optional - int,
374
- tag: string (release, beta, dev),
375
- parent_job_id: optional - string or null,
376
- run_id: string,
377
- cell_id: string,
378
- errormsg: optional - string,
379
- error (optional): {
380
- code: int,
381
- name: string,
382
- message: string (should be for the user to read),
383
- error: string, (likely a stacktrace)
384
- },
385
- error_code: optional - int
386
- }
387
- }
368
+ :param state: Supplied when the state is queried beforehand from EE2 in bulk,
369
+ or when it is retrieved from a cache. If not supplied, must be
370
+ queried with self.state() or self._internal_state()
371
+ :return: dict - with structure generally like (not accounting for error modes):
372
+ {
373
+ "job_id": string,
374
+ "jobState": {
375
+ "status": string - enum,
376
+ "created": epoch ms,
377
+ "updated": epoch ms,
378
+ "queued": epoch ms,
379
+ "running": epoch ms,
380
+ "finished": epoch ms,
381
+ "batch_job": bool,
382
+ "job_output": {
383
+ "version": string,
384
+ "result": [
385
+ {
386
+ "obj_ref": string,
387
+ "report_name": string,
388
+ "report_ref": string,
389
+ }
390
+ ],
391
+ "id": string
392
+ },
393
+ "batch_id": string,
394
+ "child_jobs": list,
395
+ "retry_ids": list,
396
+ "retry_count": int,
397
+ "job_id": string,
398
+ "created": epoch ms
399
+ },
400
+ "outputWidgetInfo": { # None if not finished
401
+ "name": string,
402
+ "tag": string - (release, beta, dev),
403
+ "params": {
404
+ "wsName": string,
405
+ "obj_ref": string,
406
+ "report_name": string,
407
+ "report_ref": string
408
+ "report_window_line_height": string
409
+ }
410
+ }
411
+ }
388
412
"""
389
413
if not state :
390
- state = self .state ()
414
+ state = self ._internal_state () if no_refresh else self . state ()
391
415
else :
392
416
self ._update_state (state )
393
417
state = self ._internal_state ()
0 commit comments