Skip to content

Hv issue719 job manager threaded job start #736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 58 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3335070
Issue #719 job manager WIP: start jobs in worker thread
soxofaan Feb 14, 2025
ff80f58
Issue #719/#730 Some quick-fixes to avoid hanging tests
soxofaan Feb 14, 2025
d5c65c1
post_process queue
HansVRP Feb 19, 2025
d6bf073
fix concurrency
HansVRP Feb 19, 2025
c5f47bc
fix threaded unit test
HansVRP Feb 19, 2025
3162a3f
fix final unit test
HansVRP Feb 19, 2025
ac57284
fix final unit test
HansVRP Feb 19, 2025
138ef85
propose additional unit tests
HansVRP Feb 19, 2025
d0ffda7
fix linting
HansVRP Feb 19, 2025
1e4db19
improve logging
HansVRP Feb 20, 2025
4461d03
update
HansVRP Feb 20, 2025
5f43281
fix oversight
HansVRP Feb 20, 2025
55d9def
fix oversight
HansVRP Feb 20, 2025
eeaec40
fix unit test
HansVRP Feb 20, 2025
a5d5967
simplify
HansVRP Feb 20, 2025
07adae9
test
HansVRP Feb 24, 2025
69ceb80
fix log test
HansVRP Feb 24, 2025
a6f94bc
homogeneous unit tests
HansVRP Feb 25, 2025
703d150
unit tests
HansVRP Feb 26, 2025
85ea738
split off worker thread logic
HansVRP Feb 26, 2025
8eafa54
lastest updates
HansVRP Feb 28, 2025
0380b33
fix
HansVRP Feb 28, 2025
b589afb
introduce threadpool
HansVRP Feb 28, 2025
2dcb484
revise feedback
HansVRP Feb 28, 2025
d99996a
fix endless unit test
HansVRP Mar 14, 2025
eeb8ab0
fix tests
HansVRP Mar 14, 2025
da0e961
fix tests
HansVRP Mar 14, 2025
8529efc
clean up print statements
HansVRP Mar 14, 2025
c154706
clean up
HansVRP Mar 14, 2025
b3dbca4
clean up
HansVRP Mar 14, 2025
1157507
work on feedback
HansVRP Mar 17, 2025
768b2fc
fix unit tests
HansVRP Mar 17, 2025
af015ed
fix unit tests
HansVRP Mar 17, 2025
869d1c3
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
soxofaan Mar 17, 2025
fce55c8
PR #736 Code style cleanup
soxofaan Mar 17, 2025
64a4ad9
split off job class
HansVRP Mar 20, 2025
3e8a2e6
split off job class
HansVRP Mar 20, 2025
fabc346
add job manager unit test
HansVRP Mar 20, 2025
69e9dee
fix
HansVRP Mar 20, 2025
b367062
test printing caplog
HansVRP Mar 20, 2025
7ea8202
fix logging
HansVRP Mar 20, 2025
da728d1
generalize
HansVRP Mar 25, 2025
9f02ae6
status update
HansVRP Mar 26, 2025
c49feb1
atomic updates to dataframe
HansVRP Apr 3, 2025
a9c3c8b
simplification
HansVRP Apr 3, 2025
ed70d0f
add flexibility
HansVRP Apr 3, 2025
a9b1fb9
improve naming
HansVRP Apr 3, 2025
ecd04f4
clean up
HansVRP Apr 4, 2025
9fa693d
small refactor and clean up
HansVRP Apr 16, 2025
24ccda3
fixing unit tests
HansVRP Apr 17, 2025
7b6efe7
fix
HansVRP Apr 17, 2025
69147eb
don't break stac database
HansVRP Apr 17, 2025
8b51f84
fix
HansVRP Apr 17, 2025
14f1971
additional testing for processingworkerupdates
HansVRP Apr 17, 2025
48c4565
added 'integration test'
HansVRP Apr 17, 2025
0860840
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
HansVRP Apr 17, 2025
c28a2c5
always provide a task_resukt
HansVRP Apr 17, 2025
23ce981
docstrings
HansVRP Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 106 additions & 20 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
from requests.adapters import HTTPAdapter, Retry

from openeo import BatchJob, Connection
from openeo.extra.job_management._thread_worker import _JobManagerWorkerThreadPool, _JobStartTask
from openeo.internal.processes.parse import (
Parameter,
Process,
parse_remote_process_definition,
)
from openeo.rest import OpenEoApiError
from openeo.rest.auth.auth import BearerAuth
from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -186,6 +188,7 @@ def start_job(

# Expected columns in the job DB dataframes.
# TODO: make this part of public API when settled?
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
"id": _ColumnProperties(dtype="str"),
"backend_name": _ColumnProperties(dtype="str"),
Expand Down Expand Up @@ -222,6 +225,7 @@ def __init__(
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._thread = None
self._worker_pool = None

def add_backend(
self,
Expand Down Expand Up @@ -358,21 +362,27 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
_log.info(f"Resuming `run_jobs` from existing {job_db}")

self._stop_thread = False
self._worker_pool = _JobManagerWorkerThreadPool()

def run_loop():

# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while (
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued", "queued_for_start", "running"]
).values()
)
> 0
and not self._stop_thread
):
self._job_update_loop(job_db=job_db, start_job=start_job)
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

# Show current stats and sleep
_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
time.sleep(1)
if self._stop_thread:
Expand All @@ -391,6 +401,8 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):

.. versionadded:: 0.32.0
"""
self._worker_pool.shutdown()

if self._thread is not None:
self._stop_thread = True
if timeout_seconds is _UNSET:
Expand Down Expand Up @@ -493,7 +505,16 @@ def run_jobs(
# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._worker_pool = _JobManagerWorkerThreadPool()

while (
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued", "queued_for_start", "running"]
).values()
)
> 0
):
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

Expand All @@ -502,6 +523,10 @@ def run_jobs(
time.sleep(self.poll_sleep)
stats["sleep"] += 1


# TODO; run post process after shutdown once more to ensure completion?
self._worker_pool.shutdown()

return stats

def _job_update_loop(
Expand All @@ -524,7 +549,7 @@ def _job_update_loop(
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"])
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
Expand All @@ -541,7 +566,13 @@ def _job_update_loop(
stats["job_db persist"] += 1
total_added += 1

# Act on jobs
# Process launched jobs
worker_pool_output = self._worker_pool.process_futures()
self._postprocess_futures(worker_pool_output, not_started, stats)
job_db.persist(not_started)
stats["job_db persist"] += 1

# TODO: move this back closer to the `_track_statuses` call above, once job done/error handling is also handled in threads?
for job, row in jobs_done:
self.on_job_done(job, row)

Expand All @@ -551,7 +582,6 @@ def _job_update_loop(
for job, row in jobs_cancel:
self.on_job_cancel(job, row)


def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs

Expand Down Expand Up @@ -598,21 +628,31 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
df.loc[i, "start_time"] = rfc3339.utcnow()
if job:
df.loc[i, "id"] = job.job_id
_log.info(f"Job created: {job.job_id}")
with ignore_connection_errors(context="get status"):
status = job.status()
stats["job get status"] += 1
df.loc[i, "status"] = status
if status == "created":
# start job if not yet done by callback
try:
job.start()
stats["job start"] += 1
df.loc[i, "status"] = job.status()
stats["job get status"] += 1
job_con = job.connection
task = _JobStartTask(
root_url=job_con.root_url,
bearer_token=job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
job_id=job.job_id
)
_log.info(f"Submitting task {task} to thread pool")
self._worker_pool.submit_task(task)

stats["job_queued_for_start"] += 1
#TODO find a better status name; its confusing that we have a start job function which moves to created, to qued for start, to queued, to running.
#perhaps submitted_to_backend?
df.loc[i, "status"] = "queued_for_start"
except OpenEoApiError as e:
_log.error(e)
df.loc[i, "status"] = "start_failed"
stats["job start error"] += 1
_log.info(f"Failed submitting task {task} to thread pool with error: {e}")
df.loc[i, "status"] = "queued_for_start_failed"
stats["job queued for start failed"] += 1
else:
# TODO: what is this "skipping" about actually?
df.loc[i, "status"] = "skipped"
Expand Down Expand Up @@ -668,25 +708,72 @@ def on_job_cancel(self, job: BatchJob, row):
"""
pass

def _postprocess_futures(self, worker_pool_results, df, stats):
"""
Processes completed tasks from the worker thread pool and updates job statuses in the job dataframe.

This method iterates over the results of completed tasks from the worker pool (e.g., job start attempts).
For each task, it updates the job's status in the DataFrame based on whether the start was succesful
and the job moved to the queued status or wheter the job start failed.

:param pool_output:
A list of tuples, where each tuple contains:
- task: An instance of `_JobStartTask` representing the job start task.
- result: A tuple containing:
- job_id (str): The ID of the job associated with the task.
- success (bool): Whether the task succeeded.
- data (str): the resulting status or an error message.

:param df:
The dataFrame that specifies the jobs and tracks their statuses. The status of jobs in the DataFrame is updated based on the task results.

:param stats:
The dictionary for tracking statistics related to job processing. The statistics are updated based on the task results.

"""

for task, result in worker_pool_results:
job_id, success, data = result

# Find rows with matching job id and a status of 'queued_for_start'
idx = df.index[(df["id"] == job_id) & (df["status"] == "queued_for_start")]

if not idx.empty:
# Set new status based on task success
new_status = "queued" if success else "start_failed"
df.loc[idx, "status"] = new_status
_log.info(f"Updated job {job_id} status to {new_status} in dataframe.")
else:
_log.warning(f"No entry for job {job_id} with status 'queued_for_start' found in dataframe, passing on.")

# Log details and update statistics
if isinstance(task, _JobStartTask) and success:
_log.info(f"Job {job_id} started sucessfully with status: {data}")
#TODO would it be better to add stats[ job queued] += 1 here?
stats["job start"] += 1
elif isinstance(task, _JobStartTask) and not success:
_log.info(f"Job {job_id} start failed with exeption: {data}")
#TODO would it be better to add stats[ job queued failed] += 1 here?
stats["job start failed"] += 1

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
try:
# Ensure running start time is valid
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:

_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()

except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

Expand Down Expand Up @@ -715,7 +802,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
"""
stats = stats if stats is not None else collections.defaultdict(int)

active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
active = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"]).copy()

jobs_done = []
jobs_error = []
Expand Down Expand Up @@ -749,7 +836,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
stats["job canceled"] += 1
jobs_cancel.append((the_job, active.loc[i]))

if previous_status in {"created", "queued"} and new_status == "running":
if previous_status in {"created", "queued", "queued_for_start"} and new_status == "running":
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

Expand Down Expand Up @@ -782,7 +869,6 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =

return jobs_done, jobs_error, jobs_cancel


def _format_usage_stat(job_metadata: dict, field: str) -> str:
value = deep_get(job_metadata, "usage", field, "value", default=0)
unit = deep_get(job_metadata, "usage", field, "unit", default="")
Expand Down
95 changes: 95 additions & 0 deletions openeo/extra/job_management/_thread_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#%%

import openeo
import dataclasses

import logging
import concurrent.futures
from typing import Tuple, Optional
from typing import Any, List, Tuple


_log = logging.getLogger(__name__)

@dataclasses.dataclass
class _JobStartTask:
root_url: str
bearer_token: Optional[str]
job_id: str

def __post_init__(self):
"""Validates task parameters upon initialization."""
if not isinstance(self.root_url, str) or not self.root_url.strip():
raise ValueError(f"root_url must be a non-empty string, got {self.root_url!r}")
if self.bearer_token is not None and (not isinstance(self.bearer_token, str) or not self.bearer_token.strip()):
raise ValueError(f"bearer_token must be a non-empty string or None, got {self.bearer_token!r}")
if not isinstance(self.job_id, str) or not self.job_id.strip():
raise ValueError(f"job_id must be a non-empty string, got {self.job_id!r}")

def execute(self) -> Tuple[str, bool, str]:
"""Executes the job start task and returns the result."""
try:
conn = openeo.connect(self.root_url)
if self.bearer_token:
conn.authenticate_bearer_token(self.bearer_token)
job = conn.job(self.job_id)
job.start()
status = job.status()
return (self.job_id, True, status)
except Exception as e:
return (self.job_id, False, str(e))

class _JobManagerWorkerThreadPool:
"""
A worker thread pool for processing job management tasks asynchronously.

This thread pool is designed to handle tasks such as starting jobs on a backend.
It uses a `ThreadPoolExecutor` to process tasks concurrently and tracks their results
using futures.


:param _executor: (concurrent.futures.ThreadPoolExecutor): The thread pool executor used to manage worker threads.
:param _futures: (List[concurrent.futures.Future, str]): A list of futures.
"""

def __init__(self, max_workers: int = 2):
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self._futures: List[concurrent.futures.Future] = []

def submit_task(self, task) -> None:
"""Submits a task to the thread pool."""
_log.info(f"Submitting task {task} to thread pool")
future = self._executor.submit(task.execute)
future.task = task
self._futures.append(future)

def process_futures(self) -> List[Tuple]:
"""Processes completed futures and returns tuples of (task, result)."""
if not self._futures:
return []

done, _ = concurrent.futures.wait(
self._futures,
timeout=0,
return_when=concurrent.futures.FIRST_COMPLETED,
)

results = []
for future in done:
try:
result = future.result()
results.append((future.task, result)) # Return the task and its result
except Exception as e:
_log.exception(f"Unexpected error processing future: {e}")
results.append((future.task, str(e)))
self._futures.remove(future)

return results

def shutdown(self):
"""Shuts down the thread pool, warning about unprocessed futures."""
_log.info("Shutting down worker thread pool")
if self._futures:
_log.warning(f"Shutting down with {len(self._futures)} unprocessed futures")
self._executor.shutdown(wait=True)

Loading
Loading