Skip to content

Commit 56bf3f5

Browse files
committed
Issue #115 Initial implementation of "crossbackend" splitting through API
1 parent 329469b commit 56bf3f5

File tree

8 files changed

+444
-57
lines changed

8 files changed

+444
-57
lines changed

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
"requests",
2929
"attrs",
3030
"openeo>=0.17.0",
31-
"openeo_driver>=0.57.1.dev",
31+
"openeo_driver>=0.65.0.dev",
3232
"flask~=2.0",
3333
"gunicorn~=20.0",
3434
"python-json-logger>=2.0.0",

src/openeo_aggregator/backend.py

+70-12
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,17 @@
8585
)
8686
from openeo_aggregator.metadata.reporter import LoggerReporter
8787
from openeo_aggregator.partitionedjobs import PartitionedJob
88+
from openeo_aggregator.partitionedjobs.crossbackend import (
89+
CrossBackendSplitter,
90+
SubGraphId,
91+
)
8892
from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
8993
from openeo_aggregator.partitionedjobs.tracking import (
9094
PartitionedJobConnection,
9195
PartitionedJobTracker,
9296
)
9397
from openeo_aggregator.utils import (
98+
Clock,
9499
FlatPG,
95100
PGWithMetadata,
96101
dict_merge,
@@ -620,18 +625,29 @@ def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
620625
})
621626

622627
def create_job(
623-
self, user_id: str, process: dict, api_version: str,
624-
metadata: dict, job_options: dict = None
628+
self,
629+
user_id: str,
630+
process: dict,
631+
api_version: str,
632+
metadata: dict,
633+
job_options: Optional[dict] = None,
625634
) -> BatchJobMetadata:
626635
if "process_graph" not in process:
627636
raise ProcessGraphMissingException()
628637

629638
# TODO: better, more generic/specific job_option(s)?
630-
if job_options and (
631-
job_options.get(JOB_OPTION_SPLIT_STRATEGY)
632-
or job_options.get(JOB_OPTION_TILE_GRID)
633-
):
634-
return self._create_partitioned_job(
639+
if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)):
640+
if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend":
641+
# TODO this is temporary feature flag to trigger "crossbackend" splitting
642+
return self._create_crossbackend_job(
643+
user_id=user_id,
644+
process=process,
645+
api_version=api_version,
646+
metadata=metadata,
647+
job_options=job_options,
648+
)
649+
else:
650+
return self._create_partitioned_job(
635651
user_id=user_id,
636652
process=process,
637653
api_version=api_version,
@@ -690,8 +706,9 @@ def _create_job_standard(
690706
raise OpenEOApiException(f"Failed to create job on backend {backend_id!r}: {e!r}")
691707
return BatchJobMetadata(
692708
id=JobIdMapping.get_aggregator_job_id(backend_job_id=job.job_id, backend_id=backend_id),
693-
# Note: required, but unused metadata
694-
status="dummy", created="dummy", process={"dummy": "dummy"}
709+
# Note: additional required, but unused metadata
710+
status="dummy",
711+
created="dummy",
695712
)
696713

697714
def _create_partitioned_job(
@@ -719,11 +736,52 @@ def _create_partitioned_job(
719736
raise ValueError("Could not determine splitting strategy from job options")
720737
pjob: PartitionedJob = splitter.split(process=process, metadata=metadata, job_options=job_options)
721738

722-
job_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)
739+
pjob_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)
740+
741+
return BatchJobMetadata(
742+
id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
743+
# Note: additional required, but unused metadata
744+
status="dummy",
745+
created="dummy",
746+
)
747+
748+
def _create_crossbackend_job(
749+
self,
750+
user_id: str,
751+
process: PGWithMetadata,
752+
api_version: str,
753+
metadata: dict,
754+
job_options: Optional[dict] = None,
755+
) -> BatchJobMetadata:
756+
"""
757+
Advanced/handled batch job creation:
758+
759+
- split original job in (possibly) multiple sub-jobs,
760+
e.g. split the process graph based on `load_collection` availability
761+
- distribute sub-jobs across (possibly) multiple back-ends
762+
- keep track of them through a "parent job" in a `PartitionedJobTracker`.
763+
"""
764+
if not self.partitioned_job_tracker:
765+
raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")
766+
767+
def backend_for_collection(collection_id) -> str:
768+
return self._catalog.get_backends_for_collection(cid=collection_id)[0]
769+
770+
splitter = CrossBackendSplitter(
771+
backend_for_collection=backend_for_collection,
772+
# TODO: job option for `always_split` feature?
773+
always_split=True,
774+
)
775+
776+
pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob(
777+
user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter
778+
)
723779

724780
return BatchJobMetadata(
725-
id=JobIdMapping.get_aggregator_job_id(backend_job_id=job_id, backend_id=JobIdMapping.AGG),
726-
status="dummy", created="dummy", process={"dummy": "dummy"}
781+
id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
782+
# Note: additional required, but unused metadata
783+
status="dummy",
784+
created="dummy",
727785
)
728786

729787
def _get_connection_and_backend_job_id(

src/openeo_aggregator/partitionedjobs/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def to_subjobs_dict(
3434
"""Helper to convert a collection of SubJobs to a dictionary"""
3535
# TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple)
3636
if isinstance(subjobs, Sequence):
37+
# TODO: eliminate this `Sequence` code path, and just always work with dict?
3738
return {f"{i:04d}": j for i, j in enumerate(subjobs)}
3839
elif isinstance(subjobs, dict):
3940
return {str(k): v for k, v in subjobs.items()}

src/openeo_aggregator/partitionedjobs/crossbackend.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def split_streaming(
9393
(e.g. creating openEO batch jobs on the fly and injecting the corresponding batch job ids appropriately).
9494
9595
:return: tuple containing:
96-
- subgraph id
96+
- subgraph id, recommended to handle it as opaque id (but usually format '{backend_id}:{node_id}')
9797
- SubJob
9898
- dependencies as list of subgraph ids
9999
"""

src/openeo_aggregator/partitionedjobs/tracking.py

+106-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import collections
22
import contextlib
3+
import dataclasses
34
import datetime
45
import logging
56
import threading
6-
from typing import List, Optional
7+
from typing import Dict, List, Optional, Union
78

89
import flask
910
from openeo.api.logs import LogEntry
10-
from openeo.rest.job import ResultAsset
11+
from openeo.rest.job import BatchJob, ResultAsset
1112
from openeo.util import TimingLogger, rfc3339
1213
from openeo_driver.errors import JobNotFinishedException
1314
from openeo_driver.users import User
@@ -21,10 +22,15 @@
2122
STATUS_INSERTED,
2223
STATUS_RUNNING,
2324
PartitionedJob,
25+
SubJob,
26+
)
27+
from openeo_aggregator.partitionedjobs.crossbackend import (
28+
CrossBackendSplitter,
29+
SubGraphId,
2430
)
2531
from openeo_aggregator.partitionedjobs.splitting import TileGridSplitter
2632
from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB
27-
from openeo_aggregator.utils import _UNSET, timestamp_to_rfc3339
33+
from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339
2834

2935
_log = logging.getLogger(__name__)
3036

@@ -57,6 +63,103 @@ def create(self, user_id: str, pjob: PartitionedJob, flask_request: flask.Reques
5763
self.create_sjobs(user_id=user_id, pjob_id=pjob_id, flask_request=flask_request)
5864
return pjob_id
5965

66+
def create_crossbackend_pjob(
67+
self,
68+
*,
69+
user_id: str,
70+
process: PGWithMetadata,
71+
metadata: dict,
72+
job_options: Optional[dict] = None,
73+
splitter: CrossBackendSplitter,
74+
) -> str:
75+
"""
76+
crossbackend partitioned job creation is different from original partitioned
77+
job creation due to dependencies between jobs.
78+
First the batch jobs have to be created in the right order on the respective backends
79+
before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB
80+
"""
81+
# Start with reserving a new partitioned job id based on initial metadata
82+
pjob_node_value = self._db.serialize(
83+
user_id=user_id,
84+
created=Clock.time(),
85+
process=process,
86+
metadata=metadata,
87+
job_options=job_options,
88+
)
89+
pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value)
90+
self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True)
91+
92+
# Create batch jobs on respective backends, and build the PartitionedJob components along the way
93+
subjobs: Dict[str, SubJob] = {}
94+
dependencies: Dict[str, List[str]] = {}
95+
batch_jobs: Dict[SubGraphId, BatchJob] = {}
96+
create_stats = collections.Counter()
97+
98+
def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
99+
# TODO: use `load_stac` iso `load_result`, and use canonical URL?
100+
nonlocal batch_jobs
101+
job_id = batch_jobs[subgraph_id].job_id
102+
return {
103+
node_id: {
104+
"process_id": "load_result",
105+
"arguments": {"id": job_id},
106+
}
107+
}
108+
109+
for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
110+
process_graph=process["process_graph"], get_replacement=get_replacement
111+
):
112+
subjobs[sjob_id] = subjob
113+
dependencies[sjob_id] = subjob_dependencies
114+
try:
115+
# TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
116+
con = self._backends.get_connection(subjob.backend_id)
117+
with con.authenticated_from_request(request=flask.request), con.override(
118+
default_timeout=CONNECTION_TIMEOUT_JOB_START
119+
):
120+
with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info):
121+
job = con.create_job(
122+
process_graph=subjob.process_graph,
123+
title=f"Crossbackend job {pjob_id}:{sjob_id}",
124+
plan=metadata.get("plan"),
125+
budget=metadata.get("budget"),
126+
additional=job_options,
127+
)
128+
_log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
129+
batch_jobs[sjob_id] = job
130+
title = f"Partitioned job {pjob_id=} {sjob_id=}"
131+
self._db.insert_sjob(
132+
user_id=user_id,
133+
pjob_id=pjob_id,
134+
sjob_id=sjob_id,
135+
subjob=subjob,
136+
title=title,
137+
status=STATUS_CREATED,
138+
)
139+
self._db.set_backend_job_id(
140+
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
141+
)
142+
create_stats[STATUS_CREATED] += 1
143+
except Exception as exc:
144+
_log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
145+
msg = f"Create failed: {exc}"
146+
self._db.set_sjob_status(
147+
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
148+
)
149+
create_stats[STATUS_ERROR] += 1
150+
151+
# TODO: this is currently unused, don't bother building it at all?
152+
partitioned_job = PartitionedJob(
153+
process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
154+
)
155+
156+
pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
157+
self._db.set_pjob_status(
158+
user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
159+
)
160+
161+
return pjob_id
162+
60163
def create_sjobs(self, user_id: str, pjob_id: str, flask_request: flask.Request):
61164
"""Create all sub-jobs on remote back-end for given partitioned job"""
62165
pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id)

0 commit comments

Comments
 (0)