Skip to content

Commit 7a30776

Browse files
authored
load results metadata from persisted results_metadata_uri (#1308)
* load results metadata from persisted results_metadata_uri #1255 * fix failing tests #1255 * test persisting and loading from results_metadata_uri #1255 * retry reading from NFS and log attempts #1255 * allow for debugging #1255 * test utility functions #1255 * avoid patching results metadata into job entity #1255 * adapt version and CHANGELOG #1255 * cleanup #1255 * configurable retry settings to avoid delay in tests #1255
1 parent 4517eb2 commit 7a30776

File tree

13 files changed

+433
-90
lines changed

13 files changed

+433
-90
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ without compromising stable operations.
1616
## In progress: 0.68.0
1717

1818
- Experimental support for unified asset keys in job results STAC items by means of the "stac-version-experimental" job option ([#1111](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1111), [Open-EO/openeo-geotrellis-extensions#402](https://github.com/Open-EO/openeo-geotrellis-extensions/issues/402))
19+
- Avoid workaround with EJR to obtain job results metadata in the context of a failover ([#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255))
20+
- Avoid 413 "Payload Too Large" response from EJR upon job results metadata update ([#1200](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1200))
1921

2022

2123
## 0.67.0

openeogeotrellis/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.68.0a1"
1+
__version__ = "0.68.0a2"

openeogeotrellis/backend.py

Lines changed: 106 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import pkg_resources
3333
import pystac
3434
import requests
35+
import reretry
3536
import shapely.geometry.base
3637
from deprecated import deprecated
3738
from geopyspark import LayerType, Pyramid, TiledRasterLayer
@@ -96,6 +97,7 @@
9697
k8s_get_batch_job_cfg_secret_name,
9798
truncate_user_id_k8s,
9899
)
100+
from openeogeotrellis.integrations.s3proxy.asset_urls import PresignedS3AssetUrls
99101
from openeogeotrellis.integrations.stac import ResilientStacIO
100102
from openeogeotrellis.integrations.traefik import Traefik
101103
from openeogeotrellis.integrations.yarn_jobrunner import YARNBatchJobRunner
@@ -2082,6 +2084,12 @@ def as_boolean_arg(job_option_key: str, default_value: str) -> str:
20822084
)
20832085
log.info(f"mapped job_id {job_id} to application ID {spark_app_id}")
20842086
dbl_registry.set_application_id(job_id=job_id, user_id=user_id, application_id=spark_app_id)
2087+
dbl_registry.set_results_metadata_uri(
2088+
job_id=job_id,
2089+
user_id=user_id,
2090+
results_metadata_uri=f"s3://{bucket}/{str(job_work_dir).strip('/')}/{JOB_METADATA_FILENAME}",
2091+
)
2092+
20852093
status_response = {}
20862094
retry = 0
20872095
while "status" not in status_response and retry < 10:
@@ -2112,9 +2120,24 @@ def as_boolean_arg(job_option_key: str, default_value: str) -> str:
21122120
runner = YARNBatchJobRunner(principal=self._principal, key_tab=self._key_tab)
21132121
runner.set_default_sentinel_hub_credentials(self._default_sentinel_hub_client_id,self._default_sentinel_hub_client_secret)
21142122
vault_token = None if sentinel_hub_client_alias == 'default' else get_vault_token(sentinel_hub_client_alias)
2115-
application_id = runner.run_job(job_info, job_id, job_work_dir = self.get_job_work_dir(job_id=job_id), log=log, user_id=user_id, api_version=api_version,proxy_user=proxy_user or job_info.get('proxy_user',None), vault_token=vault_token)
2123+
job_work_dir = self.get_job_work_dir(job_id=job_id)
2124+
application_id = runner.run_job(
2125+
job_info,
2126+
job_id,
2127+
job_work_dir=job_work_dir,
2128+
log=log,
2129+
user_id=user_id,
2130+
api_version=api_version,
2131+
proxy_user=proxy_user or job_info.get("proxy_user", None),
2132+
vault_token=vault_token,
2133+
)
21162134
with self._double_job_registry as dbl_registry:
21172135
dbl_registry.set_application_id(job_id=job_id, user_id=user_id, application_id=application_id)
2136+
dbl_registry.set_results_metadata_uri(
2137+
job_id=job_id,
2138+
user_id=user_id,
2139+
results_metadata_uri=f"file://{job_work_dir}/{JOB_METADATA_FILENAME}",
2140+
)
21182141
dbl_registry.set_status(job_id=job_id, user_id=user_id, status=JOB_STATUS.QUEUED)
21192142

21202143

@@ -2564,31 +2587,14 @@ def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
25642587
25652588
:return: A mapping between a filename and a dict containing information about that file.
25662589
"""
2567-
job_info = self.get_job_info(job_id=job_id, user_id=user_id)
2568-
if job_info.status != JOB_STATUS.FINISHED:
2569-
raise JobNotFinishedException
2570-
2571-
job_dir = self.get_job_output_dir(job_id=job_id)
2590+
with self._double_job_registry as registry:
2591+
job_dict = registry.get_job(job_id=job_id, user_id=user_id)
25722592

2573-
results_metadata = None
2593+
if job_dict["status"] != JOB_STATUS.FINISHED:
2594+
raise JobNotFinishedException
25742595

2575-
if logger.isEnabledFor(logging.DEBUG) and not ConfigParams().use_object_storage:
2576-
# debug/assert what looks like some kind of NFS latency on Terrascope
2577-
debuggable_results_metadata = self.load_results_metadata(job_id=job_id, user_id=user_id)
2578-
if debuggable_results_metadata: # otherwise, will have logged a warning elsewhere
2579-
logger.debug(f"successfully loaded results metadata {debuggable_results_metadata}", extra={"job_id": job_id})
2596+
results_metadata = self.load_results_metadata(job_id, user_id, job_dict)
25802597

2581-
try:
2582-
with self._double_job_registry as registry:
2583-
job_dict = registry.elastic_job_registry.get_job(job_id, user_id=user_id)
2584-
if "results_metadata" in job_dict:
2585-
results_metadata = job_dict["results_metadata"]
2586-
except Exception as e:
2587-
logger.warning(
2588-
"Could not retrieve result metadata from job tracker %s", e, exc_info=True, extra={"job_id": job_id}
2589-
)
2590-
if results_metadata is None or len(results_metadata) == 0:
2591-
results_metadata = self.load_results_metadata(job_id, user_id)
25922598
out_assets = results_metadata.get("assets", {})
25932599
out_metadata = out_assets.get("out", {})
25942600
bands = [Band(*properties) for properties in out_metadata.get("bands", [])]
@@ -2610,6 +2616,8 @@ def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
26102616
# container that ran the job can already be gone.
26112617
# We only want to apply the cases below when we effectively have a job directory:
26122618
# it should exists and should be a directory.
2619+
job_dir = self.get_job_output_dir(job_id=job_id)
2620+
26132621
if job_dir.is_dir():
26142622
if os.path.isfile(job_dir / 'out'):
26152623
results_dict['out'] = {
@@ -2662,32 +2670,89 @@ def get_result_assets(self, job_id: str, user_id: str) -> Dict[str, dict]:
26622670
def get_results_metadata_path(self, job_id: str) -> Path:
26632671
return self.get_job_output_dir(job_id) / JOB_METADATA_FILENAME
26642672

2665-
def load_results_metadata(self, job_id: str, user_id: str) -> dict:
2673+
def load_results_metadata(self, job_id: str, user_id: str, job_dict: dict = None) -> dict:
2674+
if job_dict is None:
2675+
with self._double_job_registry as registry:
2676+
job_dict = registry.get_job(job_id=job_id, user_id=user_id)
2677+
2678+
results_metadata = None
2679+
2680+
if "results_metadata_uri" in job_dict:
2681+
results_metadata = self._load_results_metadata_from_file(job_id, job_dict["results_metadata_uri"]) # TODO: expose a getter?
2682+
2683+
if not results_metadata and "results_metadata" in job_dict:
2684+
logger.debug("Loading results metadata from job registry", extra={"job_id": job_id})
2685+
results_metadata = job_dict["results_metadata"]
2686+
2687+
if not results_metadata:
2688+
results_metadata = self._load_results_metadata_from_file(job_id, results_metadata_uri=None)
2689+
2690+
return results_metadata
2691+
2692+
def _load_results_metadata_from_file(self, job_id: str, results_metadata_uri: Optional[str]) -> dict:
26662693
"""
2667-
Reads the metadata json file from the job directory and returns it.
2694+
Reads the metadata json file either from the job directory or an explicit URI and returns it.
26682695
"""
26692696

2670-
metadata_file = self.get_results_metadata_path(job_id=job_id)
2671-
2672-
if ConfigParams().use_object_storage:
2697+
def try_get_results_metadata_from_object_storage(path: Union[Path, str], bucket: Optional[str]) -> dict:
26732698
try:
2674-
contents = get_s3_file_contents(str(metadata_file))
2699+
contents = get_s3_file_contents(path, bucket)
26752700
return json.loads(contents)
26762701
except Exception:
26772702
logger.warning(
2678-
"Could not retrieve result metadata from object storage %s",
2679-
metadata_file, exc_info=True,
2680-
extra={'job_id': job_id})
2703+
"Could not retrieve result metadata from object storage %s in bucket %s",
2704+
path,
2705+
bucket or "[default]",
2706+
exc_info=True,
2707+
stack_info=True,
2708+
extra={"job_id": job_id},
2709+
)
26812710

2682-
try:
2683-
with open(metadata_file) as f:
2684-
return json.load(f)
2685-
except FileNotFoundError:
2686-
logger.warning("Could not derive result metadata from %s", metadata_file, exc_info=True,
2687-
stack_info=True,
2688-
extra={'job_id': job_id})
2689-
2690-
return {}
2711+
return {}
2712+
2713+
def try_get_results_metadata_from_disk(path: Union[Path, str]) -> dict:
2714+
@reretry.retry(
2715+
exceptions=FileNotFoundError,
2716+
logger=logger,
2717+
**get_backend_config().read_results_metadata_file_retry_settings,
2718+
)
2719+
def read_results_metadata_file():
2720+
with open(path) as f:
2721+
return json.load(f)
2722+
2723+
try:
2724+
return read_results_metadata_file()
2725+
except FileNotFoundError:
2726+
logger.warning(
2727+
"Could not derive result metadata from %s",
2728+
path,
2729+
exc_info=True,
2730+
stack_info=True,
2731+
extra={"job_id": job_id},
2732+
)
2733+
2734+
return {}
2735+
2736+
if results_metadata_uri:
2737+
logger.debug("Loading results metadata from %s", results_metadata_uri, extra={"job_id": job_id})
2738+
uri_parts = urlparse(results_metadata_uri)
2739+
2740+
if uri_parts.scheme == "file":
2741+
return try_get_results_metadata_from_disk(uri_parts.path)
2742+
elif uri_parts.scheme == "s3":
2743+
bucket, key = PresignedS3AssetUrls.get_bucket_key_from_uri(results_metadata_uri)
2744+
return try_get_results_metadata_from_object_storage(key, bucket)
2745+
else:
2746+
raise NotImplementedError(results_metadata_uri)
2747+
2748+
metadata_file = self.get_results_metadata_path(job_id=job_id)
2749+
2750+
logger.debug("Loading results metadata from %s", metadata_file, extra={"job_id": job_id})
2751+
2752+
if ConfigParams().use_object_storage:
2753+
return try_get_results_metadata_from_object_storage(metadata_file, bucket=None)
2754+
2755+
return try_get_results_metadata_from_disk(metadata_file)
26912756

26922757
def _get_providers(self, job_id: str, user_id: str) -> List[dict]:
26932758
results_metadata = self.load_results_metadata(job_id, user_id)

openeogeotrellis/config/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,3 +308,5 @@ class GpsBackendConfig(OpenEoBackendConfig):
308308
freeipa_default_credentials_info: Optional[dict] = None
309309

310310
supports_async_tasks: bool = not _is_kube_deploy
311+
312+
read_results_metadata_file_retry_settings: dict = attrs.Factory(lambda: dict(tries=1)) # fail immediately

openeogeotrellis/job_registry.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ def set_dependencies(self, job_id: str, user_id: str, dependencies: List[Dict[st
154154
def remove_dependencies(self, job_id: str, user_id: str):
155155
self.patch(job_id, user_id, dependencies=None, dependency_status=None)
156156

157+
def set_results_metadata_uri(self, job_id: str, user_id: str, results_metadata_uri: str) -> None:
158+
self.patch(job_id, user_id, results_metadata_uri=results_metadata_uri)
159+
157160
def patch(
158161
self, job_id: str, user_id: str, auto_mark_done: bool = True, **kwargs
159162
) -> None:
@@ -959,6 +962,17 @@ def set_application_id(self, job_id: str, *, user_id: Optional[str] = None, appl
959962
if self.elastic_job_registry:
960963
self.elastic_job_registry.set_application_id(job_id=job_id, user_id=user_id, application_id=application_id)
961964

965+
def set_results_metadata_uri(self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str):
966+
if self.zk_job_registry:
967+
assert user_id, "user_id is required in ZkJobRegistry"
968+
self.zk_job_registry.set_results_metadata_uri(
969+
job_id=job_id, user_id=user_id, results_metadata_uri=results_metadata_uri
970+
)
971+
if self.elastic_job_registry:
972+
self.elastic_job_registry.set_results_metadata_uri(
973+
job_id=job_id, user_id=user_id, results_metadata_uri=results_metadata_uri
974+
)
975+
962976
def mark_ongoing(self, job_id: str, user_id: str) -> None:
963977
# TODO #863/#1123 can this method be eliminated (e.g. integrate it directly in ZkJobRegistry.set_status)?
964978
if self.zk_job_registry:

openeogeotrellis/job_tracker_v2.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ def update_statuses(self, fail_fast: bool = False) -> None:
429429
"job_options",
430430
"dependencies",
431431
"dependency_usage",
432+
"results_metadata_uri",
432433
],
433434
max_age=3 * 30,
434435
max_updated_ago=14,
@@ -578,21 +579,26 @@ def _sync_job_status(
578579
_log.debug(f"job_costs: calculated {job_costs}")
579580
stats["job_costs: calculated"] += 1
580581
stats[f"job_costs: nonzero={isinstance(job_costs, float) and job_costs>0}"] += 1
581-
# TODO: skip patching the job znode and read from this file directly?
582582
except Exception as e:
583583
log.exception(f"Failed to calculate job costs: {e}")
584584
stats["job_costs: failed"] += 1
585585
job_costs = None
586586

587587
total_usage = dict_merge_recursive(job_metadata.usage.to_dict(), result_metadata.get("usage", {}))
588-
try:
588+
589+
def set_results_metadata(results_metadata: dict):
590+
include_all_results_metadata = "results_metadata_uri" not in job_info
591+
589592
double_job_registry.set_results_metadata(
590593
job_id=job_id,
591594
user_id=user_id,
592595
costs=job_costs,
593596
usage=to_jsonable(dict(total_usage)),
594-
results_metadata=to_jsonable(result_metadata),
597+
results_metadata=to_jsonable(results_metadata) if include_all_results_metadata else None,
595598
)
599+
600+
try:
601+
set_results_metadata(result_metadata)
596602
except EjrApiResponseError as e:
597603
if e.status_code == 413:
598604
log.warning(
@@ -606,13 +612,7 @@ def _sync_job_status(
606612
if not result_metadata["links"]:
607613
del result_metadata["links"]
608614

609-
double_job_registry.set_results_metadata(
610-
job_id=job_id,
611-
user_id=user_id,
612-
costs=job_costs,
613-
usage=to_jsonable(dict(total_usage)),
614-
results_metadata=to_jsonable(result_metadata),
615-
)
615+
set_results_metadata(result_metadata)
616616
else:
617617
raise
618618

openeogeotrellis/utils.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -275,15 +275,14 @@ def s3_client():
275275
return s3_client
276276

277277

278-
def get_s3_file_contents(filename: Union[os.PathLike,str]) -> str:
279-
"""Get contents of a text file from the S3 bucket.
280-
281-
The bucket is set in ConfigParams().s3_bucket_name
278+
def get_s3_file_contents(filename: Union[os.PathLike, str], bucket: Optional[str] = None) -> str:
279+
"""
280+
Get contents of a text file in an S3 bucket; the bucket defaults to ConfigParams().s3_bucket_name.
282281
"""
283282
# TODO: move this to openeodriver.integrations.s3?
284283
s3_instance = s3_client()
285284
s3_file_object = s3_instance.get_object(
286-
Bucket=get_backend_config().s3_bucket_name,
285+
Bucket=bucket or get_backend_config().s3_bucket_name,
287286
Key=str(filename).strip("/"),
288287
)
289288
body = s3_file_object["Body"]

tests/integrations/test_s3_proxy.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from openeogeotrellis.integrations.s3proxy.asset_urls import PresignedS3AssetUrls
2+
3+
4+
def test_get_bucket_key_from_uri():
5+
bucket, key = PresignedS3AssetUrls.get_bucket_key_from_uri("s3://my-bucket/my/key")
6+
7+
assert bucket == "my-bucket"
8+
assert key == "my/key"

0 commit comments

Comments
 (0)