From f4c5dadfe61ad080bf03c3db76bcc644b1783988 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Tue, 14 Jan 2025 16:54:58 +0100 Subject: [PATCH 1/3] threading of download --- openeo/extra/job_management/__init__.py | 48 +++++++++++++++++++------ 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 41b8fdfd3..56525cd67 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -21,7 +21,7 @@ Optional, Union, ) - +import os import numpy import pandas as pd import requests @@ -492,7 +492,7 @@ def run_jobs( # TODO: support user-provided `stats` stats = collections.defaultdict(int) - while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: + while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running", "downloading"]).values()) > 0: self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats) stats["run_jobs loop"] += 1 @@ -523,7 +523,7 @@ def _job_update_loop( not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy() if len(not_started) > 0: # Check number of jobs running at each backend - running = job_db.get_by_status(statuses=["created", "queued", "running"]) + running = job_db.get_by_status(statuses=["created", "queued", "running"]) #TODO I believe we need to get downloading out? stats["job_db get_by_status"] += 1 per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") @@ -606,27 +606,44 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No df.loc[i, "status"] = "skipped" stats["start_job skipped"] += 1 + def on_job_done(self, job: BatchJob, row): """ Handles jobs that have finished. Can be overridden to provide custom behaviour. Default implementation downloads the results into a folder containing the title. + Default implementation runs the download in a separate thread. :param job: The job that has finished. :param row: DataFrame row containing the job's metadata. """ - # TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use? - job_metadata = job.describe() job_dir = self.get_job_dir(job.job_id) metadata_path = self.get_job_metadata_path(job.job_id) - self.ensure_job_dir_exists(job.job_id) - job.get_results().download_files(target=job_dir) + # Start download in a separate thread + downloader = Thread(target=lambda: ( + self._job_download(job, job_dir, row) # Invoke the download logic directly + )) + downloader.start() + + # Write the job metadata to a file with metadata_path.open("w", encoding="utf-8") as f: json.dump(job_metadata, f, ensure_ascii=False) + def _job_download(self, job, job_dir, row): + """ + Download the job's results and update the job status after the download completes. + """ + try: + # Start downloading the job's results + job.get_results().download_files(target=job_dir) + + except Exception as e: + # If the download fails, set the status to 'error' + _log.error(f"Error downloading job {job.job_id}: {e}") + def on_job_error(self, job: BatchJob, row): """ Handles jobs that stopped with errors. Can be overridden to provide custom behaviour. @@ -696,6 +713,7 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) + def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None): """ Tracks status (and stats) of running jobs (in place). @@ -703,7 +721,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = """ stats = stats if stats is not None else collections.defaultdict(int) - active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy() + active = job_db.get_by_status(statuses=["created", "queued", "running", "downloading"]).copy() for i in active.index: job_id = active.loc[i, "id"] backend_name = active.loc[i, "backend_name"] @@ -720,10 +738,20 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})" ) - if new_status == "finished": - stats["job finished"] += 1 + + #--------------------------------------- + + if new_status == "finished" and previous_status != "downloading": + new_status = "downloading" self.on_job_done(the_job, active.loc[i]) + if previous_status == "downloading": + if self.get_job_metadata_path(job_id).exists(): + new_status = "finished" + stats["job finished"] += 1 + else: + new_status = "downloading" + if previous_status != "error" and new_status == "error": stats["job failed"] += 1 self.on_job_error(the_job, active.loc[i]) From 789b83cf7955a11fefdd4bfce469b335e07976eb Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 15 Jan 2025 20:43:18 +0100 Subject: [PATCH 2/3] refactoring --- openeo/extra/job_management/__init__.py | 50 ++++++++++++------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 56525cd67..a0596fcf7 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -21,7 +21,7 @@ Optional, Union, ) -import os + import numpy import pandas as pd import requests @@ -492,7 +492,7 @@ def run_jobs( # TODO: support user-provided `stats` stats = collections.defaultdict(int) - while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running", "downloading"]).values()) > 0: + while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats) stats["run_jobs loop"] += 1 @@ -523,7 +523,7 @@ def _job_update_loop( not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy() if len(not_started) > 0: # Check number of jobs running at each backend - running = job_db.get_by_status(statuses=["created", "queued", "running"]) #TODO I believe we need to get downloading out? + running = job_db.get_by_status(statuses=["created", "queued", "running"]) stats["job_db get_by_status"] += 1 per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") @@ -617,21 +617,32 @@ def on_job_done(self, job: BatchJob, row): :param job: The job that has finished. :param row: DataFrame row containing the job's metadata. """ + _log.info(f"Job {job.job_id} completed. Preparing to handle completion.") + job_metadata = job.describe() job_dir = self.get_job_dir(job.job_id) metadata_path = self.get_job_metadata_path(job.job_id) self.ensure_job_dir_exists(job.job_id) - # Start download in a separate thread - downloader = Thread(target=lambda: ( - self._job_download(job, job_dir, row) # Invoke the download logic directly - )) - downloader.start() - - # Write the job metadata to a file + # Save metadata + _log.info(f"Saving metadata for job {job.job_id} to {metadata_path}") with metadata_path.open("w", encoding="utf-8") as f: json.dump(job_metadata, f, ensure_ascii=False) + # Define download logic inline + def download_task(): + try: + _log.info(f"Starting download for job {job.job_id} to directory {job_dir}") + job.get_results().download_files(target=job_dir) + _log.info(f"Successfully downloaded job {job.job_id} results to {job_dir}") + except Exception as e: + _log.error(f"Error downloading job {job.job_id}: {e}") + + # Start the download in a separate thread + _log.info(f"Starting download thread for job {job.job_id}") + downloader = Thread(target=download_task, daemon=True) + downloader.start() + def _job_download(self, job, job_dir, row): """ Download the job's results and update the job status after the download completes. @@ -713,7 +724,6 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) - def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None): """ Tracks status (and stats) of running jobs (in place). @@ -721,7 +731,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = """ stats = stats if stats is not None else collections.defaultdict(int) - active = job_db.get_by_status(statuses=["created", "queued", "running", "downloading"]).copy() + active = job_db.get_by_status(statuses=["created", "queued", "running"]).copy() for i in active.index: job_id = active.loc[i, "id"] backend_name = active.loc[i, "backend_name"] @@ -738,19 +748,9 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})" ) - - #--------------------------------------- - - if new_status == "finished" and previous_status != "downloading": - new_status = "downloading" - self.on_job_done(the_job, active.loc[i]) - - if previous_status == "downloading": - if self.get_job_metadata_path(job_id).exists(): - new_status = "finished" - stats["job finished"] += 1 - else: - new_status = "downloading" + if new_status == "finished": + stats["job finished"] += 1 + self.on_job_done(the_job, active.loc[i]) if previous_status != "error" and new_status == "error": stats["job failed"] += 1 From 18428085622528528862efc61a540bf32aacab59 Mon Sep 17 00:00:00 2001 From: Hans Vanrompay Date: Wed, 15 Jan 2025 20:46:40 +0100 Subject: [PATCH 3/3] refactoring --- openeo/extra/job_management/__init__.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index a0596fcf7..464f8c4f5 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -643,18 +643,6 @@ def download_task(): downloader = Thread(target=download_task, daemon=True) downloader.start() - def _job_download(self, job, job_dir, row): - """ - Download the job's results and update the job status after the download completes. - """ - try: - # Start downloading the job's results - job.get_results().download_files(target=job_dir) - - except Exception as e: - # If the download fails, set the status to 'error' - _log.error(f"Error downloading job {job.job_id}: {e}") - def on_job_error(self, job: BatchJob, row): """ Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.