Skip to content

Hv issue719 job manager threaded job start #736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 58 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3335070
Issue #719 job manager WIP: start jobs in worker thread
soxofaan Feb 14, 2025
ff80f58
Issue #719/#730 Some quick-fixes to avoid hanging tests
soxofaan Feb 14, 2025
d5c65c1
post_process queue
HansVRP Feb 19, 2025
d6bf073
fix concurrency
HansVRP Feb 19, 2025
c5f47bc
fix threaded unit test
HansVRP Feb 19, 2025
3162a3f
fix final unit test
HansVRP Feb 19, 2025
ac57284
fix final unit test
HansVRP Feb 19, 2025
138ef85
propose additional unit tests
HansVRP Feb 19, 2025
d0ffda7
fix linting
HansVRP Feb 19, 2025
1e4db19
improve logging
HansVRP Feb 20, 2025
4461d03
update
HansVRP Feb 20, 2025
5f43281
fix oversight
HansVRP Feb 20, 2025
55d9def
fix oversight
HansVRP Feb 20, 2025
eeaec40
fix unit test
HansVRP Feb 20, 2025
a5d5967
simplify
HansVRP Feb 20, 2025
07adae9
test
HansVRP Feb 24, 2025
69ceb80
fix log test
HansVRP Feb 24, 2025
a6f94bc
homogeneous unit tests
HansVRP Feb 25, 2025
703d150
unit tests
HansVRP Feb 26, 2025
85ea738
split off worker thread logic
HansVRP Feb 26, 2025
8eafa54
lastest updates
HansVRP Feb 28, 2025
0380b33
fix
HansVRP Feb 28, 2025
b589afb
introduce threadpool
HansVRP Feb 28, 2025
2dcb484
revise feedback
HansVRP Feb 28, 2025
d99996a
fix endless unit test
HansVRP Mar 14, 2025
eeb8ab0
fix tests
HansVRP Mar 14, 2025
da0e961
fix tests
HansVRP Mar 14, 2025
8529efc
clean up print statements
HansVRP Mar 14, 2025
c154706
clean up
HansVRP Mar 14, 2025
b3dbca4
clean up
HansVRP Mar 14, 2025
1157507
work on feedback
HansVRP Mar 17, 2025
768b2fc
fix unit tests
HansVRP Mar 17, 2025
af015ed
fix unit tests
HansVRP Mar 17, 2025
869d1c3
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
soxofaan Mar 17, 2025
fce55c8
PR #736 Code style cleanup
soxofaan Mar 17, 2025
64a4ad9
split off job class
HansVRP Mar 20, 2025
3e8a2e6
split off job class
HansVRP Mar 20, 2025
fabc346
add job manager unit test
HansVRP Mar 20, 2025
69e9dee
fix
HansVRP Mar 20, 2025
b367062
test printing caplog
HansVRP Mar 20, 2025
7ea8202
fix logging
HansVRP Mar 20, 2025
da728d1
generalize
HansVRP Mar 25, 2025
9f02ae6
status update
HansVRP Mar 26, 2025
c49feb1
atomic updates to dataframe
HansVRP Apr 3, 2025
a9c3c8b
simplification
HansVRP Apr 3, 2025
ed70d0f
add flexibility
HansVRP Apr 3, 2025
a9b1fb9
improve naming
HansVRP Apr 3, 2025
ecd04f4
clean up
HansVRP Apr 4, 2025
9fa693d
small refactor and clean up
HansVRP Apr 16, 2025
24ccda3
fixing unit tests
HansVRP Apr 17, 2025
7b6efe7
fix
HansVRP Apr 17, 2025
69147eb
don't break stac database
HansVRP Apr 17, 2025
8b51f84
fix
HansVRP Apr 17, 2025
14f1971
additional testing for processingworkerupdates
HansVRP Apr 17, 2025
48c4565
added 'integration test'
HansVRP Apr 17, 2025
0860840
Merge remote-tracking branch 'origin/master' into hv_issue719-job-man…
HansVRP Apr 17, 2025
c28a2c5
always provide a task_resukt
HansVRP Apr 17, 2025
23ce981
docstrings
HansVRP Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 164 additions & 20 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
from requests.adapters import HTTPAdapter, Retry

from openeo import BatchJob, Connection
from openeo.extra.job_management._thread_worker import ( _JobManagerWorkerThreadPool,
_JobStartTask)

from openeo.internal.processes.parse import (
Parameter,
Process,
parse_remote_process_definition,
)
from openeo.rest import OpenEoApiError
from openeo.rest.auth.auth import BearerAuth
from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,6 +109,7 @@ def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame:
"""
...


def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
raise NotImplementedError("No 'start_job' callable provided")

Expand Down Expand Up @@ -186,6 +191,7 @@ def start_job(

# Expected columns in the job DB dataframes.
# TODO: make this part of public API when settled?
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
_COLUMN_REQUIREMENTS: Mapping[str, _ColumnProperties] = {
"id": _ColumnProperties(dtype="str"),
"backend_name": _ColumnProperties(dtype="str"),
Expand Down Expand Up @@ -222,6 +228,7 @@ def __init__(
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._thread = None
self._worker_pool = None

def add_backend(
self,
Expand Down Expand Up @@ -358,21 +365,27 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
_log.info(f"Resuming `run_jobs` from existing {job_db}")

self._stop_thread = False
self._worker_pool = _JobManagerWorkerThreadPool()

def run_loop():

# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while (
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued", "queued_for_start", "running"]
).values()
)
> 0
and not self._stop_thread
):
self._job_update_loop(job_db=job_db, start_job=start_job)
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

# Show current stats and sleep
_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
time.sleep(1)
if self._stop_thread:
Expand All @@ -391,6 +404,8 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):

.. versionadded:: 0.32.0
"""
self._worker_pool.shutdown()

if self._thread is not None:
self._stop_thread = True
if timeout_seconds is _UNSET:
Expand Down Expand Up @@ -493,7 +508,16 @@ def run_jobs(
# TODO: support user-provided `stats`
stats = collections.defaultdict(int)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._worker_pool = _JobManagerWorkerThreadPool()

while (
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued_for_start", "queued", "running"]
).values()
)
> 0
):
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1

Expand All @@ -502,6 +526,10 @@ def run_jobs(
time.sleep(self.poll_sleep)
stats["sleep"] += 1


# TODO; run post process after shutdown once more to ensure completion?
self._worker_pool.shutdown()

return stats

def _job_update_loop(
Expand All @@ -524,7 +552,7 @@ def _job_update_loop(
not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"])
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
Expand All @@ -541,7 +569,9 @@ def _job_update_loop(
stats["job_db persist"] += 1
total_added += 1

# Act on jobs
self._process_threadworker_updates(self._worker_pool, job_db, stats)

# TODO: move this back closer to the `_track_statuses` call above, once job done/error handling is also handled in threads?
for job, row in jobs_done:
self.on_job_done(job, row)

Expand All @@ -551,7 +581,6 @@ def _job_update_loop(
for job, row in jobs_cancel:
self.on_job_cancel(job, row)


def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs

Expand Down Expand Up @@ -598,26 +627,92 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
df.loc[i, "start_time"] = rfc3339.utcnow()
if job:
df.loc[i, "id"] = job.job_id
_log.info(f"Job created: {job.job_id}")
with ignore_connection_errors(context="get status"):
status = job.status()
stats["job get status"] += 1
df.loc[i, "status"] = status
if status == "created":
# start job if not yet done by callback
try:
job.start()
stats["job start"] += 1
df.loc[i, "status"] = job.status()
stats["job get status"] += 1
job_con = job.connection
task = _JobStartTask(
root_url=job_con.root_url,
bearer_token=job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
job_id=job.job_id,
)
_log.info(f"Submitting task {task} to thread pool")
self._worker_pool.submit_task(task)

stats["job_queued_for_start"] += 1
df.loc[i, "status"] = "queued_for_start"
except OpenEoApiError as e:
_log.error(e)
df.loc[i, "status"] = "start_failed"
stats["job start error"] += 1
_log.info(f"Failed submitting task {task} to thread pool with error: {e}")
df.loc[i, "status"] = "queued_for_start_failed"
stats["job queued for start failed"] += 1
else:
# TODO: what is this "skipping" about actually?
df.loc[i, "status"] = "skipped"
stats["start_job skipped"] += 1

def _process_threadworker_updates(
self,
worker_pool: _JobManagerWorkerThreadPool,
job_db: JobDatabaseInterface,
stats: dict
) -> None:
"""Processes asynchronous job updates from worker threads and applies them to the job database and statistics.

This wrapper function is responsible for:
1. Collecting completed results from the worker thread pool
2. applying database updates for each job result
3. applying statistics updates
4. Handles errors with comprehensive logging

:param worker_pool:
Thread pool instance managing the asynchronous job operations.
Should provide a `process_futures()` method returning completed job results.

:param job_db:
Job database implementing the :py:class:`JobDatabaseInterface` interface.
Used to persist job status updates and metadata.
Must support the `_update_row(job_id: str, updates: dict)` method.

:param stats:
Dictionary tracking operational statistics that will be updated in-place.
Expected to handle string keys with integer values.
Statistics will be updated with counts from completed job results.

:return:
None: All updates are applied in-place to the job_db and stats parameters.
.
"""
results = worker_pool.process_futures()
stats_updates = collections.defaultdict(int)

for result in results:
try:
# Handle job database updates
if result.db_update:
_log.debug(f"Processing update for job {result.job_id}")
job_db._update_row(job_id=result.job_id, updates=result.db_update)

# Aggregate statistics updates
if result.stats_update:
for key, count in result.stats_update.items():
stats_updates[key] += int(count)


except Exception as e:
_log.error(
f"Failed aggregating the updates for update for job {result.job_id}: {str(e)}")

# Apply all stat updates
for key, count in stats_updates.items():
stats[key] = stats.get(key, 0) + count



def on_job_done(self, job: BatchJob, row):
"""
Handles jobs that have finished. Can be overridden to provide custom behaviour.
Expand Down Expand Up @@ -673,20 +768,19 @@ def _cancel_prolonged_job(self, job: BatchJob, row):
try:
# Ensure running start time is valid
job_running_start_time = rfc3339.parse_datetime(row.get("running_start_time"), with_timezone=True)

# Parse the current time into a datetime object with timezone info
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

# Calculate the elapsed time between job start and now
elapsed = current_time - job_running_start_time

if elapsed > self._cancel_running_job_after:

_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()

except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

Expand Down Expand Up @@ -715,7 +809,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
"""
stats = stats if stats is not None else collections.defaultdict(int)

active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy()
active = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"]).copy()

jobs_done = []
jobs_error = []
Expand Down Expand Up @@ -749,7 +843,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
stats["job canceled"] += 1
jobs_cancel.append((the_job, active.loc[i]))

if previous_status in {"created", "queued"} and new_status == "running":
if previous_status in {"created", "queued", "queued_for_start"} and new_status == "running":
stats["job started running"] += 1
active.loc[i, "running_start_time"] = rfc3339.utcnow()

Expand Down Expand Up @@ -782,7 +876,6 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =

return jobs_done, jobs_error, jobs_cancel


def _format_usage_stat(job_metadata: dict, field: str) -> str:
value = deep_get(job_metadata, "usage", field, "value", default=0)
unit = deep_get(job_metadata, "usage", field, "unit", default="")
Expand Down Expand Up @@ -877,6 +970,55 @@ def _merge_into_df(self, df: pd.DataFrame):
else:
self._df = df

def _update_row(self, job_id: str, updates: dict):
"""
Propagates dataframe updates provided in a dictionary to the row relevant for said job_id.

:param job_id: a job_id.
:param updates: a dictionary containing status updates.

:return: DataFrame with jobs filtered by status.
"""
if self._df is None:
raise ValueError("Job database not initialized")

# Create boolean mask for target row
mask = self._df["id"] == job_id
match_count = mask.sum()

# Handle row identification issues
#TODO: make this more robust, e.g. falling back on the row index?
if match_count == 0:
_log.error(f"Job {job_id!r} not found in database")
return
if match_count > 1:
_log.error(f"Duplicate job ID {job_id!r} found in database")
return

# Get valid columns
valid_columns = set(self._df.columns)
filtered_updates = {}

# Validate update keys s
for key, value in updates.items():
if key in valid_columns:
filtered_updates[key] = value
else:
_log.warning(f"Ignoring invalid column {key!r} in update for job {job_id}")

# Bulk update
if not filtered_updates:
return
try:
# Update all columns in a single operation
self._df.loc[mask, list(filtered_updates.keys())] = list(filtered_updates.values())
self.persist(self._df)
except Exception as e:
_log.error(f"Failed to persist row update for job {job_id}: {e}")





class CsvJobDatabase(FullDataFrameJobDatabase):
"""
Expand Down Expand Up @@ -932,6 +1074,8 @@ def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
self.df.to_csv(self.path, index=False)




class ParquetJobDatabase(FullDataFrameJobDatabase):
"""
Expand Down
Loading