Skip to content

Commit cd0194c

Browse files
committed
add error state when no updated jobs
1 parent 3fdeaf5 commit cd0194c

File tree

5 files changed

+163
-93
lines changed

5 files changed

+163
-93
lines changed

src/biokbase/narrative/jobs/jobcomm.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ def cell_id_list(self):
117117

118118
@property
119119
def ts(self):
120-
"""This param is completely optional"""
120+
"""
121+
Optional field sent with STATUS requests indicating to filter out
122+
job states in the STATUS response that have not been updated since
123+
this epoch time (in ns)
124+
"""
121125
return self.rq_data.get(PARAM["TS"])
122126

123127

@@ -200,10 +204,7 @@ def _get_job_ids(self, req: JobRequest) -> List[str]:
200204
if req.has_batch_id():
201205
return self._jm.update_batch_job(req.batch_id)
202206

203-
try:
204-
return req.job_id_list
205-
except Exception as ex:
206-
raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex
207+
return req.job_id_list
207208

208209
def start_job_status_loop(
209210
self,

src/biokbase/narrative/jobs/jobmanager.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import copy
2+
import time
23
from datetime import datetime, timedelta, timezone
34
from typing import List, Tuple
45

@@ -29,13 +30,15 @@
2930
__version__ = "0.0.1"
3031

3132
JOB_NOT_REG_ERR = "Job ID is not registered"
33+
JOB_NOT_REG_2_ERR = "Cannot find job with ID %s" # TODO unify these
3234
JOB_NOT_BATCH_ERR = "Job ID is not for a batch job"
3335

3436
JOBS_TYPE_ERR = "List expected for job_id_list"
3537
JOBS_MISSING_ERR = "No valid job IDs provided"
3638

3739
CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided"
38-
DOES_NOT_EXIST = "does_not_exist"
40+
41+
NO_UPDATED_JOBS_ERR = "No updated jobs"
3942

4043

4144
class JobManager:
@@ -345,8 +348,17 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict:
345348
for job_id in job_ids:
346349
if self.get_job(job_id).last_updated < ts:
347350
del output_states[job_id]
351+
no_updated_jobs = ts is not None and job_ids and not output_states
352+
353+
# add error_ids first in the unlikely case one of the error_ids
354+
# is "error" which is a reserved key which is prioritized
355+
# for indicating an actual error event
356+
self.add_errors_to_results(output_states, error_ids)
348357

349-
return self.add_errors_to_results(output_states, error_ids)
358+
if no_updated_jobs:
359+
output_states["error"] = {"error": NO_UPDATED_JOBS_ERR}
360+
361+
return output_states
350362

351363
def get_all_job_states(self, ignore_refresh_flag=False) -> dict:
352364
"""
@@ -725,7 +737,7 @@ def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict:
725737
for error_id in error_ids:
726738
results[error_id] = {
727739
"job_id": error_id,
728-
"error": f"Cannot find job with ID {error_id}",
740+
"error": JOB_NOT_REG_2_ERR % error_id,
729741
}
730742
return results
731743

src/biokbase/narrative/tests/test_job_util.py

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def test_load_job_constants__valid(self):
6060

6161

6262
class MergeTest(unittest.TestCase):
63-
def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict):
63+
def _check(self, d0: dict, d1: dict, exp_merge: dict):
64+
d0_copy = copy.deepcopy(d0)
6465
d1_copy = copy.deepcopy(d1)
6566
merge_inplace(d0, d1)
6667
self.assertEqual(
@@ -71,11 +72,19 @@ def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict):
7172
d1,
7273
d1_copy
7374
)
75+
d0 = copy.deepcopy(d0_copy)
76+
dmerge = merge(d0, d1)
77+
self.assertEqual(
78+
dmerge,
79+
exp_merge
80+
)
81+
self.assertEqual(d0, d0_copy)
82+
self.assertEqual(d1, d1_copy)
7483

7584
def test_merge_inplace__empty(self):
7685
d0 = {}
7786
d1 = {}
78-
self._check_merge_inplace(
87+
self._check(
7988
d0,
8089
d1,
8190
{}
@@ -85,7 +94,7 @@ def test_merge_inplace__d0_empty(self):
8594
# flat
8695
d0 = {}
8796
d1 = {"level00": "l00"}
88-
self._check_merge_inplace(
97+
self._check(
8998
d0,
9099
d1,
91100
{"level00": "l00"}
@@ -99,7 +108,7 @@ def test_merge_inplace__d0_empty(self):
99108
"level10": "l10"
100109
}
101110
}
102-
self._check_merge_inplace(
111+
self._check(
103112
d0,
104113
d1,
105114
{
@@ -114,7 +123,7 @@ def test_merge_inplace__d1_empty(self):
114123
# flat
115124
d0 = {"level00": "l00"}
116125
d1 = {}
117-
self._check_merge_inplace(
126+
self._check(
118127
d0,
119128
d1,
120129
{"level00": "l00"}
@@ -128,7 +137,7 @@ def test_merge_inplace__d1_empty(self):
128137
}
129138
}
130139
d1 = {}
131-
self._check_merge_inplace(
140+
self._check(
132141
d0,
133142
d1,
134143
{
@@ -148,7 +157,7 @@ def test_merge_inplace__flat(self):
148157
"level01": "l01_",
149158
"level02": "l02"
150159
}
151-
self._check_merge_inplace(
160+
self._check(
152161
d0,
153162
d1,
154163
{
@@ -163,28 +172,41 @@ def test_merge_inplace__nested(self):
163172
"level00": {
164173
"level10": {
165174
"level20": "l20",
166-
"level21": "l21"
175+
"level21": "l21",
176+
"level23": {
177+
"level30": "l30"
178+
}
167179
}
168180
},
169181
"level01": "l01"
170182
}
171183
d1 = {
172184
"level00": {
173185
"level10": {
174-
"level22": "l22"
186+
"level21": "l21_",
187+
"level22": "l22",
188+
"level24": {
189+
"level30": "l30"
190+
}
175191
}
176192
},
177193
"level01": "l01_"
178194
}
179-
self._check_merge_inplace(
195+
self._check(
180196
d0,
181197
d1,
182198
{
183199
"level00": {
184200
"level10": {
185201
"level20": "l20",
186-
"level21": "l21",
187-
"level22": "l22"
202+
"level21": "l21_",
203+
"level22": "l22",
204+
"level23": {
205+
"level30": "l30"
206+
},
207+
"level24": {
208+
"level30": "l30"
209+
}
188210
}
189211
},
190212
"level01": "l01_"
@@ -205,7 +227,7 @@ def test_merge_inplace__xor_dicts(self):
205227
):
206228
merge_inplace(d0, d1)
207229

208-
def test_merge(self):
230+
def test_random(self):
209231
d0 = {
210232
"level00": "l00",
211233
"level01": {
@@ -222,13 +244,9 @@ def test_merge(self):
222244
}
223245
}
224246
}
225-
d0_copy = copy.deepcopy(d0)
226-
d1_copy = copy.deepcopy(d1)
227-
d0_merge = merge(d0, d1)
228-
self.assertEqual(d0, d0_copy)
229-
self.assertEqual(d1, d1_copy)
230-
self.assertEqual(
231-
d0_merge,
247+
self._check(
248+
d0,
249+
d1,
232250
{
233251
"level00": "l00",
234252
"level01": {

src/biokbase/narrative/tests/test_jobcomm.py

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import itertools
33
import os
44
import re
5+
import sys
56
import time
67
import unittest
78
from unittest import mock
@@ -25,7 +26,9 @@
2526
from biokbase.narrative.jobs.jobmanager import (
2627
JOB_NOT_BATCH_ERR,
2728
JOB_NOT_REG_ERR,
29+
JOB_NOT_REG_2_ERR,
2830
JOBS_MISSING_ERR,
31+
NO_UPDATED_JOBS_ERR,
2932
JobManager,
3033
)
3134
from biokbase.narrative.tests.generate_test_results import (
@@ -501,7 +504,7 @@ def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS):
501504
"""
502505
For STATUS responses, each output_state will have an extra field `last_checked`
503506
that is variable and is not in the test data. Check that here and delete before
504-
other checkd
507+
other checks
505508
"""
506509
for output_state in output_states.values():
507510
self.assertIn("last_checked", output_state)
@@ -683,12 +686,36 @@ def mock_check_jobs(params):
683686
msg,
684687
)
685688

689+
def _reset_last_updated(self):
690+
"""Set last_updated back a minute"""
691+
for job_id in self.jm._running_jobs:
692+
job = self.jm.get_job(job_id)
693+
job.last_updated -= 60 * 1e9
694+
self.assertTrue(job.last_updated > 0) # sanity check
695+
696+
def _check_last_updated(self, exp_updated):
697+
"""Make sure the right jobs had `last_updated` bumped"""
698+
exp_not_updated = list(set(ALL_JOBS) - set(exp_updated)) # exclusion
699+
now = time.time_ns()
700+
701+
exp_updated = [
702+
self.jm.get_job(job_id).last_updated for job_id in exp_updated
703+
]
704+
for ts in exp_updated:
705+
self.assertTrue(ts_are_close(ts, now))
706+
707+
exp_not_updated = [
708+
self.jm.get_job(job_id).last_updated for job_id in exp_not_updated
709+
]
710+
for ts in exp_not_updated:
711+
# was long time ago
712+
self.assertTrue(ts < now)
713+
self.assertFalse(ts_are_close(ts, now))
714+
686715
@mock.patch(CLIENTS, get_mock_client)
687-
def test_get_job_states__last_updated(self):
688-
"""
689-
Copied from test_jobmanager.py
690-
But also tests the last_checked field
691-
"""
716+
def test_get_job_states__by_last_updated(self):
717+
self._reset_last_updated()
718+
692719
# what FE will say was the last time the jobs were checked
693720
ts = time.time_ns()
694721

@@ -733,14 +760,83 @@ def mock_check_jobs(self_, params):
733760
job_state["jobState"]["updated"] += 1
734761
expected[JOB_NOT_FOUND] = {
735762
"job_id": JOB_NOT_FOUND,
736-
"error": f"Cannot find job with ID {JOB_NOT_FOUND}"
763+
"error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND
737764
}
738765

739-
self._check_pop_last_checked(output_states, ts)
766+
self._check_pop_last_checked(output_states, time.time_ns())
740767
self.assertEqual(
741768
expected,
742769
output_states
743770
)
771+
self._check_last_updated(updated_active_ids)
772+
773+
@mock.patch(CLIENTS, get_mock_client)
774+
def test_get_job_states__all_updated_jobs(self):
775+
"""
776+
If theoretically all the jobs were last checked at the beginning of time,
777+
all job states would be returned
778+
"""
779+
self._reset_last_updated()
780+
781+
def mock_check_jobs(self_, params):
782+
"""Mutate all given job states"""
783+
lookup_ids = params["job_ids"]
784+
self.assertCountEqual(ACTIVE_JOBS, lookup_ids) # sanity check
785+
786+
job_states_ret = get_test_jobs(lookup_ids)
787+
for _, job_state in job_states_ret.items():
788+
job_state["updated"] += 1
789+
return job_states_ret
790+
791+
rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": 0})
792+
with mock.patch.object(MockClients, "check_jobs", mock_check_jobs):
793+
output_states = self.jc._handle_comm_message(rq)
794+
795+
expected = {
796+
job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id])
797+
for job_id in ALL_JOBS
798+
}
799+
for job_id, job_state in expected.items():
800+
if job_id in ACTIVE_JOBS:
801+
job_state["jobState"]["updated"] += 1
802+
expected[JOB_NOT_FOUND] = {
803+
"job_id": JOB_NOT_FOUND,
804+
"error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND
805+
}
806+
807+
self._check_pop_last_checked(output_states, time.time_ns())
808+
self.assertEqual(
809+
expected,
810+
output_states
811+
)
812+
self._check_last_updated(ACTIVE_JOBS)
813+
814+
@mock.patch(CLIENTS, get_mock_client)
815+
def test_get_job_states__no_updated_jobs(self):
816+
"""
817+
If theoretically all the jobs were last checked at the end of time,
818+
no job states would be returned, and there would be an error state
819+
to indicate that
820+
"""
821+
self._reset_last_updated()
822+
823+
rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": sys.maxsize})
824+
output_states = self.jc._handle_comm_message(rq)
825+
826+
self._check_pop_last_checked(output_states, time.time_ns())
827+
self.assertEqual(
828+
{
829+
JOB_NOT_FOUND: {
830+
"job_id": JOB_NOT_FOUND,
831+
"error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND
832+
},
833+
"error": {
834+
"error": NO_UPDATED_JOBS_ERR
835+
}
836+
},
837+
output_states
838+
)
839+
self._check_last_updated([])
744840

745841
# -----------------------
746842
# get cell job states

0 commit comments

Comments
 (0)