Skip to content

Commit f49afaf

Browse files
committed
Revert "Revert "Revert "get results metadata from disk/object storage (#1260)"""
This reverts commit efd754a.
1 parent b63819c commit f49afaf

File tree

14 files changed

+128
-537
lines changed

14 files changed

+128
-537
lines changed

CHANGELOG.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ without compromising stable operations.
1515

1616
## In progress: 0.67.0
1717

18-
- Avoid workaround with EJR to obtain job results metadata in the context of a failover ([#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255))
19-
- Avoid 413 "Payload Too Large" response from EJR upon job results metadata update ([#1200](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1200))
2018
- Configurable usage of `async_task` ([eu-cdse/openeo-cdse-infra#387](https://github.com/eu-cdse/openeo-cdse-infra/issues/387))
2119
- Add job option "omit-derived-from-links" to omit "derived_from" links in batch job results metadata ([ESA-WEED-project/eo_processing#175](https://github.com/ESA-WEED-project/eo_processing/issues/175))
2220
- Better freeIPA configurability for proxy user lookup ([#1261](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1261), eu-cdse/openeo-cdse-infra#626)

openeogeotrellis/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.67.0a6"
1+
__version__ = "0.67.0a7"

openeogeotrellis/backend.py

Lines changed: 15 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import pkg_resources
3333
import pystac
3434
import requests
35-
import reretry
3635
import shapely.geometry.base
3736
from deprecated import deprecated
3837
from geopyspark import LayerType, Pyramid, TiledRasterLayer
@@ -97,7 +96,6 @@
9796
k8s_get_batch_job_cfg_secret_name,
9897
truncate_user_id_k8s,
9998
)
100-
from openeogeotrellis.integrations.s3proxy.asset_urls import PresignedS3AssetUrls
10199
from openeogeotrellis.integrations.stac import ResilientStacIO
102100
from openeogeotrellis.integrations.traefik import Traefik
103101
from openeogeotrellis.integrations.yarn_jobrunner import YARNBatchJobRunner
@@ -2079,12 +2077,6 @@ def as_boolean_arg(job_option_key: str, default_value: str) -> str:
20792077
)
20802078
log.info(f"mapped job_id {job_id} to application ID {spark_app_id}")
20812079
dbl_registry.set_application_id(job_id=job_id, user_id=user_id, application_id=spark_app_id)
2082-
dbl_registry.set_results_metadata_uri(
2083-
job_id=job_id,
2084-
user_id=user_id,
2085-
results_metadata_uri=f"s3://{bucket}/{str(job_work_dir).strip('/')}/{JOB_METADATA_FILENAME}",
2086-
)
2087-
20882080
status_response = {}
20892081
retry = 0
20902082
while "status" not in status_response and retry < 10:
@@ -2115,24 +2107,9 @@ def as_boolean_arg(job_option_key: str, default_value: str) -> str:
21152107
runner = YARNBatchJobRunner(principal=self._principal, key_tab=self._key_tab)
21162108
runner.set_default_sentinel_hub_credentials(self._default_sentinel_hub_client_id,self._default_sentinel_hub_client_secret)
21172109
vault_token = None if sentinel_hub_client_alias == 'default' else get_vault_token(sentinel_hub_client_alias)
2118-
job_work_dir = self.get_job_work_dir(job_id=job_id)
2119-
application_id = runner.run_job(
2120-
job_info,
2121-
job_id,
2122-
job_work_dir=job_work_dir,
2123-
log=log,
2124-
user_id=user_id,
2125-
api_version=api_version,
2126-
proxy_user=proxy_user or job_info.get("proxy_user", None),
2127-
vault_token=vault_token,
2128-
)
2110+
application_id = runner.run_job(job_info, job_id, job_work_dir = self.get_job_work_dir(job_id=job_id), log=log, user_id=user_id, api_version=api_version,proxy_user=proxy_user or job_info.get('proxy_user',None), vault_token=vault_token)
21292111
with self._double_job_registry as dbl_registry:
21302112
dbl_registry.set_application_id(job_id=job_id, user_id=user_id, application_id=application_id)
2131-
dbl_registry.set_results_metadata_uri(
2132-
job_id=job_id,
2133-
user_id=user_id,
2134-
results_metadata_uri=f"file://{job_work_dir}/{JOB_METADATA_FILENAME}",
2135-
)
21362113
dbl_registry.set_status(job_id=job_id, user_id=user_id, status=JOB_STATUS.QUEUED)
21372114

21382115

@@ -2582,29 +2559,24 @@ def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
25822559
25832560
:return: A mapping between a filename and a dict containing information about that file.
25842561
"""
2585-
with self._double_job_registry as registry:
2586-
job_dict = registry.get_job(job_id=job_id, user_id=user_id)
2587-
2588-
if job_dict["status"] != JOB_STATUS.FINISHED:
2562+
job_info = self.get_job_info(job_id=job_id, user_id=user_id)
2563+
if job_info.status != JOB_STATUS.FINISHED:
25892564
raise JobNotFinishedException
25902565

25912566
job_dir = self.get_job_output_dir(job_id=job_id)
25922567

2593-
results_metadata = self._load_results_metadata_from_uri(job_dict.get("results_metadata_uri"), job_id) # TODO: expose a getter?
2594-
if not results_metadata:
2595-
try:
2596-
logger.debug(f"Loading results metadata from job registry", extra={"job_id": job_id})
2597-
with self._double_job_registry as registry:
2598-
job_dict = registry.elastic_job_registry.get_job(job_id, user_id=user_id)
2599-
if "results_metadata" in job_dict:
2600-
results_metadata = job_dict["results_metadata"]
2601-
except Exception as e:
2602-
logger.warning(
2603-
"Could not retrieve result metadata from job registry %s", e, exc_info=True, extra={"job_id": job_id}
2604-
)
2605-
if not results_metadata:
2568+
results_metadata = None
2569+
try:
2570+
with self._double_job_registry as registry:
2571+
job_dict = registry.elastic_job_registry.get_job(job_id, user_id=user_id)
2572+
if "results_metadata" in job_dict:
2573+
results_metadata = job_dict["results_metadata"]
2574+
except Exception as e:
2575+
logger.warning(
2576+
"Could not retrieve result metadata from job tracker %s", e, exc_info=True, extra={"job_id": job_id}
2577+
)
2578+
if results_metadata is None or len(results_metadata) == 0:
26062579
results_metadata = self.load_results_metadata(job_id, user_id)
2607-
26082580
out_assets = results_metadata.get("assets", {})
26092581
out_metadata = out_assets.get("out", {})
26102582
bands = [Band(*properties) for properties in out_metadata.get("bands", [])]
@@ -2682,21 +2654,12 @@ def load_results_metadata(self, job_id: str, user_id: str) -> dict:
26822654
"""
26832655
Reads the metadata json file from the job directory and returns it.
26842656
"""
2685-
with self._double_job_registry as registry:
2686-
job_dict = registry.get_job(job_id=job_id, user_id=user_id)
2687-
2688-
results_metadata = self._load_results_metadata_from_uri(job_dict.get("results_metadata_uri"), job_id) # TODO: expose a getter?
2689-
if results_metadata is not None:
2690-
return results_metadata
26912657

26922658
metadata_file = self.get_results_metadata_path(job_id=job_id)
26932659

26942660
if ConfigParams().use_object_storage:
26952661
try:
2696-
logger.debug(
2697-
f"Loading results metadata from object storage at {metadata_file}", extra={"job_id": job_id}
2698-
)
2699-
contents = get_s3_file_contents(path=str(metadata_file))
2662+
contents = get_s3_file_contents(str(metadata_file))
27002663
return json.loads(contents)
27012664
except Exception:
27022665
logger.warning(
@@ -2705,7 +2668,6 @@ def load_results_metadata(self, job_id: str, user_id: str) -> dict:
27052668
extra={'job_id': job_id})
27062669

27072670
try:
2708-
logger.debug(f"Loading results metadata from file at {metadata_file}", extra={"job_id": job_id})
27092671
with open(metadata_file) as f:
27102672
return json.load(f)
27112673
except FileNotFoundError:
@@ -2714,55 +2676,6 @@ def load_results_metadata(self, job_id: str, user_id: str) -> dict:
27142676

27152677
return {}
27162678

2717-
@staticmethod
2718-
@reretry.retry(exceptions=FileNotFoundError, tries=5, delay=1, backoff=2, logger=logger)
2719-
def _load_results_metadata_from_file(metadata_file: Path):
2720-
with open(metadata_file) as f:
2721-
return json.load(f)
2722-
2723-
@staticmethod
2724-
def _load_results_metadata_from_uri(results_metadata_uri: Optional[str], job_id: str) -> Optional[dict]:
2725-
# TODO: reduce code duplication with load_results_metadata
2726-
import botocore.exceptions
2727-
2728-
if results_metadata_uri is None:
2729-
return None
2730-
2731-
logger.debug(f"Loading results metadata from URI {results_metadata_uri}", extra={"job_id": job_id})
2732-
2733-
uri_parts = urlparse(results_metadata_uri)
2734-
2735-
if uri_parts.scheme == "file":
2736-
file_path = Path(uri_parts.path)
2737-
try:
2738-
return GpsBatchJobs._load_results_metadata_from_file(file_path)
2739-
except FileNotFoundError:
2740-
logger.debug(
2741-
f"File with results metadata {file_path} does not exist; this is expected and not "
2742-
f"an error if the batch job did not have the chance to write it yet.",
2743-
exc_info=True,
2744-
extra={"job_id": job_id},
2745-
)
2746-
return None
2747-
2748-
if uri_parts.scheme == "s3":
2749-
bucket, key = PresignedS3AssetUrls.get_bucket_key_from_uri(results_metadata_uri)
2750-
try:
2751-
return json.loads(get_s3_file_contents(key, bucket))
2752-
except botocore.exceptions.ClientError as e:
2753-
if e.response["Error"]["Code"] != "NoSuchKey":
2754-
raise
2755-
2756-
logger.debug(
2757-
f"Object with results metadata {key} does not exist in bucket {bucket}; this is "
2758-
f"expected and not an error if the batch job did not have the chance to write it yet.",
2759-
exc_info=True,
2760-
extra={"job_id": job_id},
2761-
)
2762-
return None
2763-
2764-
raise ValueError(f"Unsupported results metadata URI: {results_metadata_uri}")
2765-
27662679
def _get_providers(self, job_id: str, user_id: str) -> List[dict]:
27672680
results_metadata = self.load_results_metadata(job_id, user_id)
27682681
return results_metadata.get("providers", [])

openeogeotrellis/deploy/batch_job.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
from shapely.geometry import mapping
4545

4646
from openeogeotrellis._version import __version__
47-
from openeogeotrellis.backend import GeoPySparkBackendImplementation
47+
from openeogeotrellis.backend import (
48+
GeoPySparkBackendImplementation,
49+
)
4850
from openeogeotrellis.collect_unique_process_ids_visitor import (
4951
CollectUniqueProcessIdsVisitor,
5052
)

openeogeotrellis/job_registry.py

Lines changed: 9 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
from __future__ import annotations
22
import contextlib
3-
from copy import deepcopy
3+
import datetime as dt
44
import json
55
import logging
66
import random
77
from datetime import datetime, timedelta
88
from decimal import Decimal
9-
from pathlib import Path
109
from typing import Any, List, Dict, Callable, Union, Optional, Iterator, Tuple
1110

1211
import kazoo
@@ -155,9 +154,6 @@ def set_dependencies(self, job_id: str, user_id: str, dependencies: List[Dict[st
155154
def remove_dependencies(self, job_id: str, user_id: str):
156155
self.patch(job_id, user_id, dependencies=None, dependency_status=None)
157156

158-
def set_results_metadata_uri(self, job_id: str, user_id: str, results_metadata_uri: str) -> None:
159-
self.patch(job_id, user_id, results_metadata_uri=results_metadata_uri)
160-
161157
def patch(
162158
self, job_id: str, user_id: str, auto_mark_done: bool = True, **kwargs
163159
) -> None:
@@ -627,15 +623,15 @@ def create_job(
627623
"api_version": api_version,
628624
"job_options": job_options,
629625
}
630-
return deepcopy(self.db[job_id])
626+
return self.db[job_id]
631627

632628
def get_job(self, job_id: str, *, user_id: Optional[str] = None) -> JobDict:
633629
job = self.db.get(job_id)
634630

635631
if not job or (user_id is not None and job['user_id'] != user_id):
636632
raise JobNotFoundException(job_id=job_id)
637633

638-
return deepcopy(job)
634+
return job
639635

640636
def delete_job(self, job_id: str, *, user_id: Optional[str] = None) -> None:
641637
self.get_job(job_id=job_id, user_id=user_id) # will raise on job not found
@@ -644,7 +640,7 @@ def delete_job(self, job_id: str, *, user_id: Optional[str] = None) -> None:
644640
def _update(self, job_id: str, **kwargs) -> JobDict:
645641
assert job_id in self.db
646642
self.db[job_id].update(**kwargs)
647-
return deepcopy(self.db[job_id])
643+
return self.db[job_id]
648644

649645
def set_status(
650646
self,
@@ -695,10 +691,7 @@ def set_results_metadata(
695691
usage: dict,
696692
results_metadata: Optional[Dict[str, Any]] = None,
697693
) -> None:
698-
if results_metadata:
699-
self._update(job_id=job_id, costs=costs, usage=usage, results_metadata=results_metadata)
700-
else:
701-
self._update(job_id=job_id, costs=costs, usage=usage)
694+
self._update(job_id=job_id, costs=costs, usage=usage, results_metadata=results_metadata)
702695

703696
def set_results_metadata_uri(
704697
self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str
@@ -714,7 +707,7 @@ def list_user_jobs(
714707
request_parameters: Optional[dict] = None,
715708
# TODO #959 settle on returning just `JobListing` and eliminate other options/code paths.
716709
) -> Union[JobListing, List[JobDict]]:
717-
jobs = [deepcopy(job) for job in self.db.values() if job["user_id"] == user_id]
710+
jobs = [job for job in self.db.values() if job["user_id"] == user_id]
718711
if limit:
719712
pagination_param = "page"
720713
page_number = int((request_parameters or {}).get(pagination_param, 0))
@@ -741,7 +734,7 @@ def list_active_jobs(
741734
active = [JOB_STATUS.CREATED, JOB_STATUS.QUEUED, JOB_STATUS.RUNNING]
742735
# TODO: implement support for max_age, max_updated_ago, fields
743736
return [
744-
deepcopy(job)
737+
job
745738
for job in self.db.values()
746739
if job["status"] in active and (not require_application_id or job.get("application_id") is not None)
747740
]
@@ -874,64 +867,10 @@ def get_job_metadata(self, job_id: str, user_id: str) -> BatchJobMetadata:
874867
with contextlib.suppress(JobNotFoundException):
875868
ejr_job_info = self.elastic_job_registry.get_job(job_id=job_id, user_id=user_id)
876869

877-
# TODO: replace with getter once introduced?
878-
results_metadata = self._load_results_metadata_from_uri(
879-
ejr_job_info.get("results_metadata_uri"), job_id
880-
)
881-
if results_metadata is not None:
882-
ejr_job_info["results_metadata"] = results_metadata
883-
884870
self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job_info, ejr_job_info=ejr_job_info)
885871
job_metadata = zk_job_info_to_metadata(zk_job_info) if zk_job_info else ejr_job_info_to_metadata(ejr_job_info)
886872
return job_metadata
887873

888-
@staticmethod
889-
def _load_results_metadata_from_uri(results_metadata_uri: Optional[str], job_id: str) -> Optional[dict]:
890-
# TODO: reduce code duplication with openeogeotrellis.backend.GpsBatchJobs._load_results_metadata_from_uri
891-
from openeogeotrellis.integrations.s3proxy.asset_urls import PresignedS3AssetUrls
892-
from openeogeotrellis.utils import get_s3_file_contents
893-
import botocore.exceptions
894-
from urllib.parse import urlparse
895-
896-
if results_metadata_uri is None:
897-
return None
898-
899-
_log.debug(f"Loading results metadata from URI {results_metadata_uri}", extra={"job_id": job_id})
900-
901-
uri_parts = urlparse(results_metadata_uri)
902-
903-
if uri_parts.scheme == "file":
904-
file_path = Path(uri_parts.path)
905-
try:
906-
with open(file_path) as f:
907-
return json.load(f)
908-
except FileNotFoundError:
909-
_log.debug(
910-
f"File with results metadata {file_path} does not exist; this is expected and not "
911-
f"an error if the batch job did not have the chance to write it yet.",
912-
exc_info=True,
913-
extra={"job_id": job_id},
914-
)
915-
return None
916-
917-
if uri_parts.scheme == "s3":
918-
bucket, key = PresignedS3AssetUrls.get_bucket_key_from_uri(results_metadata_uri)
919-
try:
920-
return json.loads(get_s3_file_contents(key, bucket))
921-
except botocore.exceptions.ClientError as e:
922-
if e.response["Error"]["Code"] != "NoSuchKey":
923-
raise
924-
925-
_log.debug(
926-
f"Object with results metadata {key} does not exist in bucket {bucket}; this is "
927-
f"expected and not an error if the batch job did not have the chance to write it yet.",
928-
exc_info=True,
929-
extra={"job_id": job_id},
930-
)
931-
return None
932-
933-
raise ValueError(f"Unsupported results metadata URI: {results_metadata_uri}")
934-
935874
def _check_zk_ejr_job_info(self, job_id: str, zk_job_info: Union[dict, None], ejr_job_info: Union[dict, None]):
936875
# TODO #236/#498 For now: compare job metadata between Zk and EJR
937876
fields = ["job_id", "status", "created"]
@@ -1020,17 +959,6 @@ def set_application_id(self, job_id: str, *, user_id: Optional[str] = None, appl
1020959
if self.elastic_job_registry:
1021960
self.elastic_job_registry.set_application_id(job_id=job_id, user_id=user_id, application_id=application_id)
1022961

1023-
def set_results_metadata_uri(self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str):
1024-
if self.zk_job_registry:
1025-
assert user_id, "user_id is required in ZkJobRegistry"
1026-
self.zk_job_registry.set_results_metadata_uri(
1027-
job_id=job_id, user_id=user_id, results_metadata_uri=results_metadata_uri
1028-
)
1029-
if self.elastic_job_registry:
1030-
self.elastic_job_registry.set_results_metadata_uri(
1031-
job_id=job_id, user_id=user_id, results_metadata_uri=results_metadata_uri
1032-
)
1033-
1034962
def mark_ongoing(self, job_id: str, user_id: str) -> None:
1035963
# TODO #863/#1123 can this method be eliminated (e.g. integrate it directly in ZkJobRegistry.set_status)?
1036964
if self.zk_job_registry:
@@ -1099,9 +1027,8 @@ def set_results_metadata(
10991027
) -> None:
11001028
if self.zk_job_registry:
11011029
assert user_id, "user_id is required in ZkJobRegistry"
1102-
self.zk_job_registry.patch(
1103-
job_id=job_id, user_id=user_id, **dict(results_metadata or {}, costs=costs, usage=usage)
1104-
)
1030+
self.zk_job_registry.patch(job_id=job_id, user_id=user_id,
1031+
**dict(results_metadata, costs=costs, usage=usage))
11051032

11061033
if self.elastic_job_registry:
11071034
self.elastic_job_registry.set_results_metadata(

0 commit comments

Comments
 (0)