Skip to content

Commit 1f55f97

Browse files
committed
Issue #115 Improve handling of failed sub batch job creation
1 parent 90303c1 commit 1f55f97

File tree

3 files changed

+94
-50
lines changed

3 files changed

+94
-50
lines changed

src/openeo_aggregator/partitionedjobs/tracking.py

+53-49
Original file line numberDiff line numberDiff line change
@@ -112,57 +112,61 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
112112
}
113113
}
114114

115-
for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
116-
process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id
117-
):
118-
subjobs[sjob_id] = subjob
119-
dependencies[sjob_id] = subjob_dependencies
120-
try:
121-
# TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
122-
con = self._backends.get_connection(subjob.backend_id)
123-
with con.authenticated_from_request(request=flask.request), con.override(
124-
default_timeout=CONNECTION_TIMEOUT_JOB_START
125-
):
126-
with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info):
127-
job = con.create_job(
128-
process_graph=subjob.process_graph,
129-
title=f"Crossbackend job {pjob_id}:{sjob_id}",
130-
plan=metadata.get("plan"),
131-
budget=metadata.get("budget"),
132-
additional=job_options,
133-
)
134-
_log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
135-
batch_jobs[sjob_id] = job
136-
title = f"Partitioned job {pjob_id=} {sjob_id=}"
137-
self._db.insert_sjob(
138-
user_id=user_id,
139-
pjob_id=pjob_id,
140-
sjob_id=sjob_id,
141-
subjob=subjob,
142-
title=title,
143-
status=STATUS_CREATED,
144-
)
145-
self._db.set_backend_job_id(
146-
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
147-
)
148-
create_stats[STATUS_CREATED] += 1
149-
except Exception as exc:
150-
_log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
151-
msg = f"Create failed: {exc}"
152-
self._db.set_sjob_status(
153-
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
154-
)
155-
create_stats[STATUS_ERROR] += 1
115+
try:
116+
for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
117+
process_graph=process["process_graph"],
118+
get_replacement=get_replacement,
119+
main_subgraph_id=main_subgraph_id,
120+
):
121+
subjobs[sjob_id] = subjob
122+
dependencies[sjob_id] = subjob_dependencies
123+
try:
124+
title = f"Partitioned job {pjob_id=} {sjob_id=}"
125+
self._db.insert_sjob(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, subjob=subjob, title=title)
126+
127+
# TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
128+
con = self._backends.get_connection(subjob.backend_id)
129+
with con.authenticated_from_request(request=flask.request), con.override(
130+
default_timeout=CONNECTION_TIMEOUT_JOB_START
131+
):
132+
with TimingLogger(
133+
title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info
134+
):
135+
job = con.create_job(
136+
process_graph=subjob.process_graph,
137+
title=f"Crossbackend job {pjob_id}:{sjob_id}",
138+
plan=metadata.get("plan"),
139+
budget=metadata.get("budget"),
140+
additional=job_options,
141+
)
142+
_log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
143+
batch_jobs[sjob_id] = job
144+
self._db.set_sjob_status(
145+
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_CREATED
146+
)
147+
self._db.set_backend_job_id(
148+
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
149+
)
150+
create_stats[STATUS_CREATED] += 1
151+
except Exception as exc:
152+
_log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
153+
msg = f"Create failed: {exc}"
154+
self._db.set_sjob_status(
155+
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
156+
)
157+
create_stats[STATUS_ERROR] += 1
156158

157-
# TODO: this is currently unused, don't bother building it at all?
158-
partitioned_job = PartitionedJob(
159-
process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
160-
)
159+
# TODO: this is currently unused, don't bother building it at all?
160+
partitioned_job = PartitionedJob(
161+
process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
162+
)
161163

162-
pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
163-
self._db.set_pjob_status(
164-
user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
165-
)
164+
pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
165+
self._db.set_pjob_status(
166+
user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
167+
)
168+
except Exception as exc:
169+
self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_ERROR, message=str(exc))
166170

167171
return pjob_id
168172

tests/partitionedjobs/conftest.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__(self, requests_mock, backend_url: str, job_id_template: str = "job{
6565
self.job_id_template = job_id_template
6666
self.jobs: Dict[Tuple[str, str], DummyBatchJobData] = {}
6767
self.users: Dict[str, str] = {}
68+
self.fail_create_job = False
6869

6970
def register_user(self, bearer_token: str, user_id: str):
7071
self.users[bearer_token] = user_id
@@ -77,7 +78,7 @@ def get_user_id(self, request: requests.Request):
7778

7879
def get_job_data(self, user_id, job_id) -> DummyBatchJobData:
7980
if (user_id, job_id) not in self.jobs:
80-
raise JobNotFoundException
81+
raise JobNotFoundException(job_id=job_id)
8182
return self.jobs[user_id, job_id]
8283

8384
def setup_basic_requests_mocks(self):
@@ -127,6 +128,8 @@ def _handle_get_jobs(self, request: requests.Request, context):
127128

128129
def _handle_post_jobs(self, request: requests.Request, context):
129130
"""`POST /jobs` handler (create job)"""
131+
if self.fail_create_job:
132+
raise RuntimeError("nope!")
130133
user_id = self.get_user_id(request)
131134
job_id = self.job_id_template.format(i=len(self.jobs))
132135
assert (user_id, job_id) not in self.jobs

tests/partitionedjobs/test_api.py

+37
Original file line numberDiff line numberDiff line change
@@ -873,3 +873,40 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
873873
},
874874
}
875875
)
876+
877+
@now.mock
878+
def test_failing_create(self, flask_app, api100, zk_db, dummy1):
879+
"""Run what happens when creation of sub batch job fails on upstream backend"""
880+
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
881+
dummy1.fail_create_job = True
882+
883+
pg = {
884+
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
885+
"lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
886+
"merge": {
887+
"process_id": "merge_cubes",
888+
"arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
889+
"result": True,
890+
},
891+
}
892+
893+
res = api100.post(
894+
"/jobs",
895+
json={
896+
"process": {"process_graph": pg},
897+
"job_options": {"split_strategy": "crossbackend"},
898+
},
899+
).assert_status_code(201)
900+
901+
pjob_id = "pj-20220119-123456"
902+
expected_job_id = f"agg-{pjob_id}"
903+
assert res.headers["OpenEO-Identifier"] == expected_job_id
904+
905+
res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
906+
assert res.json == {
907+
"id": expected_job_id,
908+
"process": {"process_graph": pg},
909+
"status": "error",
910+
"created": self.now.rfc3339,
911+
"progress": 0,
912+
}

0 commit comments

Comments
 (0)