Skip to content

Commit e2678e7

Browse files
committed
Align DoubleJobRegistry.*dependency* methods with JobRegistryInterface
refs: #863, #1123
1 parent 098534c commit e2678e7

File tree

5 files changed

+42
-36
lines changed

5 files changed

+42
-36
lines changed

openeogeotrellis/_version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.65.0a10"
1+
__version__ = "0.65.0a11"

openeogeotrellis/backend.py

+15-7
Original file line numberDiff line numberDiff line change
@@ -1595,7 +1595,9 @@ def job_results_status(job_results_dependency: dict) -> (str, Optional[str]):
15951595

15961596
def fail_job():
15971597
with self._double_job_registry as registry:
1598-
registry.set_dependency_status(job_id, user_id, DEPENDENCY_STATUS.ERROR)
1598+
registry.set_dependency_status(
1599+
job_id=job_id, user_id=user_id, dependency_status=DEPENDENCY_STATUS.ERROR
1600+
)
15991601
registry.set_status(job_id, user_id, JOB_STATUS.ERROR)
16001602

16011603
job_info["status"] = JOB_STATUS.ERROR # TODO: avoid mutation
@@ -1633,7 +1635,9 @@ def fail_job():
16331635
job_info.get("dependency_status") != DEPENDENCY_STATUS.AWAITING_RETRY
16341636
): # haven't retried yet: retry
16351637
with self._double_job_registry as registry:
1636-
registry.set_dependency_status(job_id, user_id, DEPENDENCY_STATUS.AWAITING_RETRY)
1638+
registry.set_dependency_status(
1639+
job_id=job_id, user_id=user_id, dependency_status=DEPENDENCY_STATUS.AWAITING_RETRY
1640+
)
16371641

16381642
retries = [retry for details, _, retry in batch_processes.values() if details.status() == "PARTIAL"]
16391643

@@ -1742,11 +1746,15 @@ def processing_units_spent(value_estimate: Decimal, temporal_step: Optional[str]
17421746
logger.debug(f"Total cost of Sentinel Hub batch processes: {batch_process_processing_units} PU",
17431747
extra={'job_id': job_id, 'user_id': user_id})
17441748

1745-
registry.set_dependencies(job_id, user_id, dependencies)
1746-
registry.set_dependency_status(job_id, user_id, DEPENDENCY_STATUS.AVAILABLE)
1749+
registry.set_dependencies(job_id=job_id, user_id=user_id, dependencies=dependencies)
1750+
registry.set_dependency_status(
1751+
job_id=job_id, user_id=user_id, dependency_status=DEPENDENCY_STATUS.AVAILABLE
1752+
)
17471753

17481754
if batch_process_processing_units:
1749-
registry.set_dependency_usage(job_id, user_id, batch_process_processing_units)
1755+
registry.set_dependency_usage(
1756+
job_id=job_id, user_id=user_id, dependency_usage=batch_process_processing_units
1757+
)
17501758

17511759
self._start_job(job_id, User(user_id=user_id), lambda _: vault_token, dependencies)
17521760
else: # still some running: continue polling
@@ -1897,7 +1905,7 @@ def _start_job(self, job_id: str, user: User, get_vault_token: Callable[[str], s
18971905
else get_vault_token(sentinel_hub_client_alias),
18981906
)
18991907
dbl_registry.set_dependency_status(
1900-
job_id, user_id, DEPENDENCY_STATUS.AWAITING
1908+
job_id=job_id, user_id=user_id, dependency_status=DEPENDENCY_STATUS.AWAITING
19011909
)
19021910
dbl_registry.set_status(job_id, user_id, JOB_STATUS.QUEUED)
19031911

@@ -3091,7 +3099,7 @@ def cancel_job(self, job_id: str, user_id: str):
30913099
registry.set_status(job_id, user_id, JOB_STATUS.CANCELED)
30923100
else:
30933101
with self._double_job_registry as registry:
3094-
registry.remove_dependencies(job_id, user_id)
3102+
registry.remove_dependencies(job_id=job_id, user_id=user_id)
30953103
registry.set_status(job_id, user_id, JOB_STATUS.CANCELED)
30963104

30973105
def delete_job(self, job_id: str, user_id: str):

openeogeotrellis/job_registry.py

+24-26
Original file line numberDiff line numberDiff line change
@@ -663,18 +663,18 @@ def set_status(
663663
return self.db[job_id]
664664

665665
def set_dependencies(
666-
self, job_id: str, dependencies: List[Dict[str, str]]
667-
) -> JobDict:
668-
return self._update(job_id=job_id, dependencies=dependencies)
666+
self, job_id: str, *, user_id: Optional[str] = None, dependencies: List[Dict[str, str]]
667+
) -> None:
668+
self._update(job_id=job_id, dependencies=dependencies)
669669

670-
def remove_dependencies(self, job_id: str) -> JobDict:
671-
return self._update(job_id=job_id, dependencies=None, dependency_status=None)
670+
def remove_dependencies(self, job_id: str, *, user_id: Optional[str] = None) -> None:
671+
self._update(job_id=job_id, dependencies=None, dependency_status=None)
672672

673-
def set_dependency_status(self, job_id: str, dependency_status: str) -> JobDict:
674-
return self._update(job_id=job_id, dependency_status=dependency_status)
673+
def set_dependency_status(self, job_id: str, *, user_id: Optional[str] = None, dependency_status: str) -> None:
674+
self._update(job_id=job_id, dependency_status=dependency_status)
675675

676-
def set_dependency_usage(self, job_id: str, dependency_usage: Decimal) -> JobDict:
677-
return self._update(job_id, dependency_usage=str(dependency_usage))
676+
def set_dependency_usage(self, job_id: str, *, user_id: Optional[str] = None, dependency_usage: Decimal) -> None:
677+
self._update(job_id, dependency_usage=str(dependency_usage))
678678

679679
def set_proxy_user(self, job_id: str, proxy_user: str) -> JobDict:
680680
return self._update(job_id=job_id, proxy_user=proxy_user)
@@ -829,7 +829,7 @@ def get_job(self, job_id: str, *, user_id: Optional[str] = None) -> dict:
829829
# TODO: eliminate get_job/get_job_metadata duplication?
830830
zk_job = ejr_job = None
831831
if self.zk_job_registry:
832-
assert user_id
832+
assert user_id, "user_id is required in ZkJobRegistry"
833833
with contextlib.suppress(JobNotFoundException, ZkStrippedSpecification):
834834
zk_job = self.zk_job_registry.get_job(
835835
job_id=job_id, user_id=user_id, parse_specification=True, omit_raw_specification=True
@@ -881,47 +881,45 @@ def set_status(self, job_id: str, user_id: str, status: str,
881881

882882
def delete_job(self, job_id: str, *, user_id: Optional[str] = None) -> None:
883883
if self.zk_job_registry:
884-
assert user_id
884+
assert user_id, "user_id is required in ZkJobRegistry"
885885
self.zk_job_registry.delete(job_id=job_id, user_id=user_id)
886886
if self.elastic_job_registry:
887887
self.elastic_job_registry.delete_job(job_id=job_id, user_id=user_id)
888888

889889
def set_dependencies(
890-
self, job_id: str, user_id: str, dependencies: List[Dict[str, str]]
891-
):
890+
self, job_id: str, *, user_id: Optional[str] = None, dependencies: List[Dict[str, str]]
891+
) -> None:
892892
if self.zk_job_registry:
893+
assert user_id, "user_id is required in ZkJobRegistry"
893894
self.zk_job_registry.set_dependencies(job_id=job_id, user_id=user_id, dependencies=dependencies)
894895
if self.elastic_job_registry:
895-
self.elastic_job_registry.set_dependencies(
896-
job_id=job_id, dependencies=dependencies
897-
)
896+
self.elastic_job_registry.set_dependencies(job_id=job_id, user_id=user_id, dependencies=dependencies)
898897

899-
def remove_dependencies(self, job_id: str, user_id: str):
898+
def remove_dependencies(self, job_id: str, *, user_id: Optional[str] = None) -> None:
900899
if self.zk_job_registry:
900+
assert user_id, "user_id is required in ZkJobRegistry"
901901
self.zk_job_registry.remove_dependencies(job_id=job_id, user_id=user_id)
902902
if self.elastic_job_registry:
903-
self.elastic_job_registry.remove_dependencies(job_id=job_id)
903+
self.elastic_job_registry.remove_dependencies(job_id=job_id, user_id=user_id)
904904

905-
def set_dependency_status(
906-
self, job_id: str, user_id: str, dependency_status: str
907-
) -> None:
905+
def set_dependency_status(self, job_id: str, *, user_id: Optional[str] = None, dependency_status: str) -> None:
908906
if self.zk_job_registry:
907+
assert user_id, "user_id is required in ZkJobRegistry"
909908
self.zk_job_registry.set_dependency_status(
910909
job_id=job_id, user_id=user_id, dependency_status=dependency_status
911910
)
912911
if self.elastic_job_registry:
913912
self.elastic_job_registry.set_dependency_status(
914-
job_id=job_id, dependency_status=dependency_status
913+
job_id=job_id, user_id=user_id, dependency_status=dependency_status
915914
)
916915

917-
def set_dependency_usage(
918-
self, job_id: str, user_id: str, dependency_usage: Decimal
919-
):
916+
def set_dependency_usage(self, job_id: str, *, user_id: Optional[str] = None, dependency_usage: Decimal) -> None:
920917
if self.zk_job_registry:
918+
assert user_id, "user_id is required in ZkJobRegistry"
921919
self.zk_job_registry.set_dependency_usage(job_id=job_id, user_id=user_id, processing_units=dependency_usage)
922920
if self.elastic_job_registry:
923921
self.elastic_job_registry.set_dependency_usage(
924-
job_id=job_id, dependency_usage=dependency_usage
922+
job_id=job_id, user_id=user_id, dependency_usage=dependency_usage
925923
)
926924

927925
def set_proxy_user(self, job_id: str, user_id: str, proxy_user: str):

openeogeotrellis/job_tracker_v2.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ def _sync_job_status(
511511
stats[f"reached final status {job_metadata.status}"] += 1
512512
result_metadata = self._batch_jobs.load_results_metadata(job_id, user_id)
513513

514-
double_job_registry.remove_dependencies(job_id, user_id)
514+
double_job_registry.remove_dependencies(job_id=job_id, user_id=user_id)
515515

516516
# there can be duplicates if batch processes are recycled
517517
dependency_sources = list(set(get_deletable_dependency_sources(job_info)))

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
tests_require=tests_require,
5858
install_requires=[
5959
"openeo>=0.33.0",
60-
"openeo_driver>=0.133.0.a3.dev",
60+
"openeo_driver>=0.133.0.a5.dev",
6161
'pyspark==3.5.3; python_version>="3.8"',
6262
'pyspark>=2.3.1,<2.4.0; python_version<"3.8"',
6363
'geopyspark==0.4.9+openeo',

0 commit comments

Comments
 (0)