Skip to content

Hv issue719 job manager threaded job start #736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 58 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3335070
Issue #719 job manager WIP: start jobs in worker thread
soxofaan Feb 14, 2025
ff80f58
Issue #719/#730 Some quick-fixes to avoid hanging tests
soxofaan Feb 14, 2025
d5c65c1
post_process queue
HansVRP Feb 19, 2025
d6bf073
fix concurrency
HansVRP Feb 19, 2025
c5f47bc
fix threaded unit test
HansVRP Feb 19, 2025
3162a3f
fix final unit test
HansVRP Feb 19, 2025
ac57284
fix final unit test
HansVRP Feb 19, 2025
138ef85
propose additional unit tests
HansVRP Feb 19, 2025
d0ffda7
fix linting
HansVRP Feb 19, 2025
1e4db19
improve logging
HansVRP Feb 20, 2025
4461d03
update
HansVRP Feb 20, 2025
5f43281
fix oversight
HansVRP Feb 20, 2025
55d9def
fix oversight
HansVRP Feb 20, 2025
eeaec40
fix unit test
HansVRP Feb 20, 2025
a5d5967
simplify
HansVRP Feb 20, 2025
07adae9
test
HansVRP Feb 24, 2025
69ceb80
fix log test
HansVRP Feb 24, 2025
a6f94bc
homogeneous unit tests
HansVRP Feb 25, 2025
703d150
unit tests
HansVRP Feb 26, 2025
85ea738
split off worker thread logic
HansVRP Feb 26, 2025
8eafa54
lastest updates
HansVRP Feb 28, 2025
0380b33
fix
HansVRP Feb 28, 2025
b589afb
introduce threadpool
HansVRP Feb 28, 2025
2dcb484
revise feedback
HansVRP Feb 28, 2025
d99996a
fix endless unit test
HansVRP Mar 14, 2025
eeb8ab0
fix tests
HansVRP Mar 14, 2025
da0e961
fix tests
HansVRP Mar 14, 2025
8529efc
clean up print statements
HansVRP Mar 14, 2025
c154706
clean up
HansVRP Mar 14, 2025
b3dbca4
clean up
HansVRP Mar 14, 2025
1157507
work on feedback
HansVRP Mar 17, 2025
768b2fc
fix unit tests
HansVRP Mar 17, 2025
af015ed
fix unit tests
HansVRP Mar 17, 2025
869d1c3
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
soxofaan Mar 17, 2025
fce55c8
PR #736 Code style cleanup
soxofaan Mar 17, 2025
64a4ad9
split off job class
HansVRP Mar 20, 2025
3e8a2e6
split off job class
HansVRP Mar 20, 2025
fabc346
add job manager unit test
HansVRP Mar 20, 2025
69e9dee
fix
HansVRP Mar 20, 2025
b367062
test printing caplog
HansVRP Mar 20, 2025
7ea8202
fix logging
HansVRP Mar 20, 2025
da728d1
generalize
HansVRP Mar 25, 2025
9f02ae6
status update
HansVRP Mar 26, 2025
c49feb1
atomic updates to dataframe
HansVRP Apr 3, 2025
a9c3c8b
simplification
HansVRP Apr 3, 2025
ed70d0f
add flexibility
HansVRP Apr 3, 2025
a9b1fb9
improve naming
HansVRP Apr 3, 2025
ecd04f4
clean up
HansVRP Apr 4, 2025
9fa693d
small refactor and clean up
HansVRP Apr 16, 2025
24ccda3
fixing unit tests
HansVRP Apr 17, 2025
7b6efe7
fix
HansVRP Apr 17, 2025
69147eb
don't break stac database
HansVRP Apr 17, 2025
8b51f84
fix
HansVRP Apr 17, 2025
14f1971
additional testing for processingworkerupdates
HansVRP Apr 17, 2025
48c4565
added 'integration test'
HansVRP Apr 17, 2025
0860840
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
HansVRP Apr 17, 2025
c28a2c5
always provide a task_resukt
HansVRP Apr 17, 2025
23ce981
docstrings
HansVRP Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 150 additions & 26 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import datetime
import json
import logging
import queue
import re
import threading
import time
import warnings
from pathlib import Path
Expand All @@ -31,13 +33,15 @@
import shapely.wkt
from requests.adapters import HTTPAdapter, Retry

import openeo
from openeo import BatchJob, Connection
from openeo.internal.processes.parse import (
Parameter,
Process,
parse_remote_process_definition,
)
from openeo.rest import OpenEoApiError
from openeo.rest.auth.auth import BearerAuth
from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -223,6 +227,9 @@ def __init__(
)
self._thread = None

self._work_queue = queue.Queue()
self._result_queue = queue.Queue()

def add_backend(
self,
name: str,
Expand Down Expand Up @@ -364,19 +371,30 @@ def run_loop():
# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while (
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
and not self._stop_thread
):
self._job_update_loop(job_db=job_db, start_job=start_job)
# TODO: multiple workers instead of a single one? Work with thread pool?
worker_thread = _JobManagerWorkerThread(work_queue=self._work_queue, result_queue=self._result_queue)
worker_thread.start()

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

# Show current stats and sleep
_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
time.sleep(1)
if self._stop_thread:
break
time.sleep(self.poll_sleep)
stats["sleep"] += 1

worker_thread.stop_event.set()
worker_thread.join()

_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
time.sleep(1)
if self._stop_thread:
break



self._thread = Thread(target=run_loop)
self._thread.start()
Expand Down Expand Up @@ -493,6 +511,10 @@ def run_jobs(
# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

# TODO: multiple workers instead of a single one? Work with thread pool?
worker_thread = _JobManagerWorkerThread(work_queue=self._work_queue, result_queue=self._result_queue)
worker_thread.start()

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1
Expand All @@ -502,6 +524,9 @@ def run_jobs(
time.sleep(self.poll_sleep)
stats["sleep"] += 1

worker_thread.stop_event.set()
worker_thread.join()

return stats

def _job_update_loop(
Expand All @@ -517,11 +542,7 @@ def _job_update_loop(

stats = stats if stats is not None else collections.defaultdict(int)

with ignore_connection_errors(context="get statuses"):
jobs_done, jobs_error, jobs_cancel = self._track_statuses(job_db, stats=stats)
stats["track_statuses"] += 1

not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
not_started = job_db.get_by_status(statuses=["not_started"], max=50).copy()
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
Expand All @@ -535,13 +556,23 @@ def _job_update_loop(
to_add = self.backends[backend_name].parallel_jobs - backend_load
for i in not_started.index[total_added : total_added + to_add]:
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
stats["job launch"] += 1

job_db.persist(not_started.loc[i : i + 1])

stats["job launch"] += 1
stats["job_db persist"] += 1
total_added += 1

# Act on jobs
# TODO: move this back closer to the `_track_statuses` call above, once job done/error handling is also handled in threads?

while not self._result_queue.empty():
full_frame = job_db.get_by_status(statuses=["not_started", "created", "queued"]).copy()
self._process_result_queue(job_db, full_frame, stats)

with ignore_connection_errors(context="get statuses"):
jobs_done, jobs_error, jobs_cancel = self._track_statuses(job_db, stats=stats)
stats["track_statuses"] += 1

for job, row in jobs_done:
self.on_job_done(job, row)

Expand All @@ -551,6 +582,45 @@ def _job_update_loop(
for job, row in jobs_cancel:
self.on_job_cancel(job, row)


def _process_result_queue(self, job_db: JobDatabaseInterface, dataframe: pd.DataFrame, stats: Optional[dict] = None):
"""
Process results from the result queue, update job statuses, and persist changes.

:param job_db: The job database interface to persist changes.
:param not_started: DataFrame containing jobs that are not yet started.
:param stats: Dictionary to track statistics.
"""
while not self._result_queue.empty():
try:
work_result = self._result_queue.get_nowait()
except queue.Empty:
break

if isinstance(work_result, tuple) and len(work_result) == 2:
job_id, success, data = work_result[1]

# Find the row in the job_db that matches the job_id
idx = dataframe.index[dataframe["id"] == job_id]

if idx.empty:
_log.error(f"Job ID {job_id} not found in the job dataframe.")
continue

if success:
dataframe.loc[idx, "status"] = data
stats["job start"] += 1
else:
dataframe.loc[idx, "status"] = "start_failed"
stats["job start failed"] += 1

# Persist the updated rows
job_db.persist(dataframe.loc[idx])
_log.info(f"Updated job {job_id} to status {dataframe.loc[idx].iloc[0]['status']}")

else:
_log.error(f"Unexpected work result: {work_result}")


def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs
Expand Down Expand Up @@ -584,7 +654,7 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
connection = self._get_connection(backend_name, resilient=True)

stats["start_job call"] += 1
job = start_job(
job: BatchJob = start_job(
row=row,
connection_provider=self._get_connection,
connection=connection,
Expand All @@ -605,14 +675,23 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
if status == "created":
# start job if not yet done by callback
try:
job.start()
stats["job start"] += 1
job_con = job.connection
self._work_queue.put(
(
_JobManagerWorkerThread.WORK_TYPE_START_JOB,
(
job_con.root_url,
job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
job.job_id,
),
)
)
stats["job queued for start"] += 1
df.loc[i, "status"] = job.status()
stats["job get status"] += 1
except OpenEoApiError as e:
_log.error(e)
df.loc[i, "status"] = "start_failed"
stats["job start error"] += 1
df.loc[i, "status"] = "queued_start_failed"
stats["job queued for start error"] += 1
else:
# TODO: what is this "skipping" about actually?
df.loc[i, "status"] = "skipped"
Expand Down Expand Up @@ -673,20 +752,20 @@ def _cancel_prolonged_job(self, job: BatchJob, row):
try:
# Ensure running start time is valid
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:

_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()

except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

Expand Down Expand Up @@ -783,6 +862,51 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
return jobs_done, jobs_error, jobs_cancel


class _JobManagerWorkerThread(threading.Thread):
WORK_TYPE_START_JOB = "start_job"

def __init__(self, work_queue: queue.Queue, result_queue: queue.Queue):
super().__init__(daemon=True)
self.work_queue = work_queue
self.result_queue = result_queue
self.stop_event = threading.Event()
self.polling_time = 5
# TODO: add customization options for timeout/sleep?

def run(self):
_log.info('Worker thread started, waiting for tasks')
while not self.stop_event.is_set():
try:
work_type, work_args = self.work_queue.get(timeout=self.polling_time)
_log.info('f"Received task: {work_type} with args: {work_args}"s')
if work_type == self.WORK_TYPE_START_JOB:
self._start_job(work_args)
else:
raise ValueError(f"Unknown work item: {work_type!r}")
except queue.Empty:
_log.debug('No tasks for worker thread, sleep')
time.sleep(self.polling_time)

def _start_job(self, work_args: tuple):
root_url, bearer, job_id = work_args
_log.info(f"Starting job {job_id} on backend: {root_url}")
try:
connection = openeo.connect(url=root_url)
if bearer:
_log.info(f"Authenticating with bearer token for job {job_id}")
connection.authenticate_bearer_token(bearer_token=bearer)

job = connection.job(job_id)
job.start()
status = job.status()
_log.info(f"Job {job_id} started successfully. Status: {status}")
except Exception as e:
self.result_queue.put(item=(self.WORK_TYPE_START_JOB, (job_id, False, repr(e))))
_log.error(f"Error while starting job {job_id}: {e}")
else:
self.result_queue.put(item=(self.WORK_TYPE_START_JOB, (job_id, True, status)))


def _format_usage_stat(job_metadata: dict, field: str) -> str:
value = deep_get(job_metadata, "usage", field, "value", default=0)
unit = deep_get(job_metadata, "usage", field, "unit", default="")
Expand Down
Loading
Loading