Skip to content

Commit 9587438

Browse files
committed
Proof of concept simple job progress estimation
ref: Open-EO/openeo-geopyspark-driver#772
1 parent eabdff6 commit 9587438

File tree

6 files changed

+107
-2
lines changed

6 files changed

+107
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and start a new "In Progress" section above it.
2121

2222
## In progress
2323

24+
- Add `simple_job_progress_estimation` config for simple job progress estimation ([Open-EO/openeo-geopyspark-driver#772](https://github.com/Open-EO/openeo-geopyspark-driver/issues/772))
25+
2426

2527
## 0.126.0
2628

openeo_driver/backend.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from openeo_driver.processes import ProcessRegistry
3636
from openeo_driver.users import User
3737
from openeo_driver.users.oidc import OidcProvider
38+
from openeo_driver.util.date_math import simple_job_progress_estimation
39+
from openeo_driver.util.logging import just_log_exceptions
3840
from openeo_driver.utils import read_json, dict_item, EvalEnv, extract_namedtuple_fields_from_dict, \
3941
get_package_versions
4042

@@ -300,6 +302,7 @@ class BatchJobMetadata(NamedTuple):
300302
job_options: Optional[dict] = None
301303
title: Optional[str] = None
302304
description: Optional[str] = None
305+
# Progress as percent value: 0.0 (just started) - 100.0 (fully completed)
303306
progress: Optional[float] = None
304307
updated: Optional[datetime] = None
305308
plan: Optional[str] = None
@@ -375,8 +378,21 @@ def to_api_dict(self, full=True, api_version: ComparableVersion = None) -> dict:
375378
result["created"] = rfc3339.datetime(self.created) if self.created else None
376379
result["updated"] = rfc3339.datetime(self.updated) if self.updated else None
377380

381+
progress = self.progress
382+
if (
383+
self.status == JOB_STATUS.RUNNING
384+
and self.started
385+
and progress is None
386+
and get_backend_config().simple_job_progress_estimation
387+
):
388+
# TODO: is there a cleaner place to inject this fallback progress estimation?
389+
with just_log_exceptions(log=logger, name="simple_job_progress_estimation"):
390+
progress = 100 * simple_job_progress_estimation(
391+
started=self.started, average_run_time=get_backend_config().simple_job_progress_estimation
392+
)
378393
# Clamp "progress" for certain "status" values according to the spec.
379-
result["progress"] = {"created": 0, "queued": 0, "finished": 100}.get(self.status, self.progress)
394+
progress = {"created": 0, "queued": 0, "finished": 100}.get(self.status, progress)
395+
result["progress"] = progress
380396

381397
if full:
382398
usage = self.usage or {}

openeo_driver/config/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ class OpenEoBackendConfig:
7676

7777
ejr_retry_settings: dict = attrs.Factory(lambda: dict(tries=4, delay=2, backoff=2))
7878

79+
"Experimental: simple job progress fallback estimation. Specify average batch job completion time (wall clock) in seconds."
80+
simple_job_progress_estimation: Optional[float] = None
81+
7982

8083
def check_config_definition(config_class: Type[OpenEoBackendConfig]):
8184
"""

openeo_driver/util/date_math.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,33 @@ def month_shift(
2727
except ValueError:
2828
# Handle month overflow (e.g clip Feb 31 to 28)
2929
return month_shift(d=d.replace(day=1), months=months + 1) - dt.timedelta(days=1)
30+
31+
32+
def simple_job_progress_estimation(started: dt.datetime, average_run_time: float) -> float:
33+
"""
34+
Simple progress estimation,
35+
assuming job run time is distributed exponentially (with lambda = 1 / average run time)
36+
37+
- estimated remaining run time = average run time
38+
(note that this is regardless of current run time so far,
39+
this is mathematical consequence of assuming an exponential distribution)
40+
- estimated total run time = current run time + average run time
41+
- estimated progress = current run time / (current run time + average run time)
42+
43+
:param started: start time of the job
44+
:param average_run_time: average run time of jobs in seconds
45+
:return: progress as a fraction in range [0, 1]
46+
"""
47+
# TODO: also support string input?
48+
# TODO: also support other timezones than UTC or naive?
49+
50+
if started.tzinfo is None:
51+
# Convert naive to UTC
52+
started = started.replace(tzinfo=dt.timezone.utc)
53+
54+
now = dt.datetime.now(tz=dt.timezone.utc)
55+
elapsed = (now - started).total_seconds()
56+
if elapsed <= 0 or average_run_time <= 0:
57+
return 0.0
58+
progress = elapsed / (elapsed + average_run_time)
59+
return progress

tests/test_views.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import pystac.validation.stac_validator
1717
import pytest
1818
import re_assert
19+
import time_machine
1920
import werkzeug.exceptions
2021
from openeo.utils.version import ComparableVersion
2122

@@ -1186,6 +1187,7 @@ def _fresh_job_registry(next_job_id="job-1234", output_root: Optional[Path] = No
11861187
status='running',
11871188
process={'process_graph': {'foo': {'process_id': 'foo', 'arguments': {}}}},
11881189
created=datetime(2017, 1, 1, 9, 32, 12),
1190+
started=datetime(2017, 1, 1, 12, 0, 0),
11891191
),
11901192
(TEST_USER, '53c71345-09b4-46b4-b6b0-03fd6fe1f199'): BatchJobMetadata(
11911193
id='53c71345-09b4-46b4-b6b0-03fd6fe1f199',
@@ -1459,6 +1461,18 @@ def test_get_job_info_invalid(self, api):
14591461
resp = api.get('/jobs/deadbeef-f00', headers=self.AUTH_HEADER).assert_error(404, "JobNotFound")
14601462
assert resp.json["message"] == "The batch job 'deadbeef-f00' does not exist."
14611463

1464+
@pytest.mark.parametrize("backend_config_overrides", [{"simple_job_progress_estimation": 600}])
1465+
def test_get_job_info_simple_job_progress_estimation(self, api100, backend_config_overrides):
1466+
with self._fresh_job_registry(), time_machine.travel(datetime(2017, 1, 1, 12, 5, 0)):
1467+
resp = api100.get("/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc", headers=self.AUTH_HEADER)
1468+
assert resp.assert_status_code(200).json == {
1469+
"id": "07024ee9-7847-4b8a-b260-6c879a2b3cdc",
1470+
"status": "running",
1471+
"progress": pytest.approx(33.33, abs=0.1),
1472+
"created": "2017-01-01T09:32:12Z",
1473+
"process": {"process_graph": {"foo": {"process_id": "foo", "arguments": {}}}},
1474+
}
1475+
14621476
def test_list_user_jobs_100(self, api100):
14631477
with self._fresh_job_registry():
14641478
resp = api100.get('/jobs', headers=self.AUTH_HEADER)

tests/util/test_date_math.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import datetime as dt
22

33
import pandas as pd
4+
import pytest
5+
import time_machine
46

5-
from openeo_driver.util.date_math import month_shift
7+
from openeo_driver.util.date_math import month_shift, simple_job_progress_estimation
68

79

810
def test_month_shift_date():
@@ -63,3 +65,41 @@ def test_month_shift_pandas_timestamp():
6365
def test_month_shift_overflow_pandas_timestamp():
6466
assert month_shift(pd.to_datetime("2022-01-31"), months=1) == pd.Timestamp(2022, 2, 28)
6567
assert month_shift(pd.to_datetime("2022-03-31"), months=-1) == pd.Timestamp(2022, 2, 28)
68+
69+
70+
@time_machine.travel("2024-12-06T12:00:00+00")
71+
@pytest.mark.parametrize(
72+
"tzinfo",
73+
[
74+
None, # Naive
75+
dt.timezone.utc, # Explicit UTC
76+
],
77+
)
78+
def test_simple_job_progress_estimation_basic(tzinfo):
79+
# Started 1 second ago
80+
assert simple_job_progress_estimation(
81+
dt.datetime(2024, 12, 6, 11, 59, 59, tzinfo=tzinfo),
82+
average_run_time=600,
83+
) == pytest.approx(0.0, abs=0.01)
84+
# Started 5 minutes ago
85+
assert simple_job_progress_estimation(
86+
dt.datetime(2024, 12, 6, 11, 55, tzinfo=tzinfo),
87+
average_run_time=600,
88+
) == pytest.approx(0.33, abs=0.01)
89+
# Long overdue
90+
assert simple_job_progress_estimation(
91+
dt.datetime(2024, 12, 5, tzinfo=tzinfo),
92+
average_run_time=600,
93+
) == pytest.approx(1.0, abs=0.01)
94+
95+
96+
@time_machine.travel("2024-12-06T12:00:00+00")
97+
def test_simple_job_progress_estimation_negative():
98+
# OMG a job from the future.
99+
assert (
100+
simple_job_progress_estimation(
101+
started=dt.datetime(2024, 12, 8),
102+
average_run_time=600,
103+
)
104+
== 0.0
105+
)

0 commit comments

Comments
 (0)