10
10
11
11
@dataclass
12
12
class _TaskResult :
13
- """Container for task results with optional components"""
13
+ """
14
+ Container for the result of a task execution.
15
+ Used to communicate the outcome of job-related tasks.
16
+
17
+ :param job_id:
18
+ The ID of the job this result is associated with.
19
+
20
+ :param db_update:
21
+ Optional dictionary describing updates to apply to a job database,
22
+ such as status changes. Defaults to an empty dict.
23
+
24
+ :param stats_update:
25
+ Optional dictionary capturing statistical counters or metrics,
26
+ e.g., number of successful starts or errors. Defaults to an empty dict.
27
+ """
14
28
job_id : str # Mandatory
15
29
db_update : Dict [str , Any ] = field (default_factory = dict ) # Optional
16
30
stats_update : Dict [str , int ] = field (default_factory = dict ) # Optional
17
31
18
32
class Task (ABC ):
19
- """Abstract base class for asynchronous tasks with safe update generation"""
33
+ """
34
+ Abstract base class for asynchronous tasks.
35
+
36
+ A task encapsulates a unit of work, typically executed asynchronously,
37
+ and returns a `_TaskResult` with job-related metadata and updates.
38
+
39
+ Implementations must override the `execute` method to define the task logic.
40
+ """
20
41
21
42
@abstractmethod
22
43
def execute (self ) -> _TaskResult :
@@ -26,12 +47,33 @@ def execute(self) -> _TaskResult:
26
47
@dataclass
27
48
class _JobStartTask (Task ):
28
49
"""
29
- A task for starting jobs asynchronously.
50
+ Task for starting a backend job asynchronously.
51
+
52
+ Connects to an OpenEO backend using the provided URL and optional token,
53
+ retrieves the specified job, and attempts to start it.
54
+
55
+ Usage example:
56
+
57
+ .. code-block:: python
58
+
59
+ task = _JobStartTask(
60
+ job_id="1234",
61
+ root_url="https://openeo.test",
62
+ bearer_token="secret"
63
+ )
64
+ result = task.execute()
30
65
31
- Attributes:
32
- root_url (str): The URL of the backend.
33
- bearer_token (Optional[str]): An optional token for authentication.
34
- job_id (str): The identifier of the job to start.
66
+ :param job_id:
67
+ Identifier of the job to start on the backend.
68
+
69
+ :param root_url:
70
+ The root URL of the OpenEO backend to connect to.
71
+
72
+ :param bearer_token:
73
+ Optional Bearer token used for authentication.
74
+
75
+ :raises ValueError:
76
+ If any of the input parameters are invalid (e.g., empty strings).
35
77
"""
36
78
job_id : str
37
79
root_url : str
@@ -48,7 +90,16 @@ def __post_init__(self) -> None:
48
90
raise ValueError (f"job_id must be a non-empty string, got { self .job_id !r} " )
49
91
50
92
def execute (self ) -> _TaskResult :
51
- """Executes the job start task and returns a JobStartResult."""
93
+ """
94
+ Executes the job start process using the OpenEO connection.
95
+
96
+ Authenticates if a bearer token is provided, retrieves the job by ID,
97
+ and attempts to start it.
98
+
99
+ :returns:
100
+ A `_TaskResult` with status and statistics metadata, indicating
101
+ success or failure of the job start.
102
+ """
52
103
try :
53
104
conn = openeo .connect (self .root_url )
54
105
if self .bearer_token :
@@ -70,21 +121,42 @@ def execute(self) -> _TaskResult:
70
121
71
122
class _JobManagerWorkerThreadPool :
72
123
"""
73
- Manages a thread pool for executing tasks asynchronously and handles postprocessing.
124
+ Thread pool-based worker that manages the execution of asynchronous tasks.
125
+
126
+ Internally wraps a `ThreadPoolExecutor` and manages submission,
127
+ tracking, and result processing of tasks.
128
+
129
+ :param max_workers:
130
+ Maximum number of concurrent threads to use for execution.
131
+ Defaults to 2.
74
132
"""
75
133
def __init__ (self , max_workers : int = 2 ):
76
134
self ._executor = concurrent .futures .ThreadPoolExecutor (max_workers = max_workers )
77
135
self ._future_task_pairs : List [Tuple [concurrent .futures .Future , Task ]] = []
78
136
79
137
def submit_task (self , task : Task ) -> None :
80
138
"""
81
- ubmits a tasks to the internal Threadpool executor and keeps.
139
+ Submit a task to the thread pool executor.
140
+
141
+ Tasks are scheduled for asynchronous execution and tracked
142
+ internally to allow later processing of their results.
143
+
144
+ :param task:
145
+ An instance of `Task` to be executed.
82
146
"""
83
147
future = self ._executor .submit (task .execute )
84
148
self ._future_task_pairs .append ((future , task )) # Track pairs
85
149
86
150
def process_futures (self ) -> List [ _TaskResult ]:
151
+ """
152
+ Process and retrieve results from completed tasks.
153
+
154
+ This method checks which futures have finished without blocking,
155
+ collects their results.
87
156
157
+ :returns:
158
+ A list of `_TaskResult` objects from completed tasks.
159
+ """
88
160
results = []
89
161
to_keep = []
90
162
0 commit comments