Skip to content

Commit 7db2d96

Browse files
committed
✨ New Job and JobPool classes
xtl.jobs.jobs:Job - Reimplementation of xtl.automate.jobs:Job - Jobs are no longer bound to the execution of a BatchFile, rather they can also run Python code - Batch execution to be implemented in the future - Each job has its own logging.Logger associated with it, which can be configured xtl.jobs.pools:JobPool - New class for executing Jobs with concurrency limits and locks - Job instances are generated and configured directly by JobPool with the .submit() method - The running tasks can request an asyncio.Lock() - Job submission and execution to the pool is handled within a context manager - Results from the running jobs can either be aggregated or streamed xtl.config.settings:AutomateSettings - New `job_digits` option for storing the length of the Job IDs xtl.math.uuid:UUIDFactory - Small utility for creating UUIDs with non-standard alphabets or generating randomized strings
1 parent e6f10e1 commit 7db2d96

File tree

5 files changed

+747
-0
lines changed

5 files changed

+747
-0
lines changed

src/xtl/config/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class AutomateSettings(Settings):
8787
"""
8888

8989
# Model attributes
90+
job_digits: int = Option(default=5, desc='Number of digits to use for generation'
91+
'of random job IDs')
9092
compute_site: ComputeSite = Option(default=ComputeSite.LOCAL)
9193
permissions: AutomatePermissionsSettings = Option(
9294
default=AutomatePermissionsSettings())

src/xtl/jobs/__init__.py

Whitespace-only changes.

src/xtl/jobs/jobs.py

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
import abc
2+
from dataclasses import dataclass
3+
import logging
4+
from pathlib import Path
5+
from typing import Any, ClassVar, Generic, Optional, Type, TypeVar, TYPE_CHECKING, get_args, Sequence
6+
7+
if TYPE_CHECKING:
8+
from xtl.jobs.pools import JobPool
9+
10+
from xtl import settings, Logger
11+
from xtl.math.uuid import UUIDFactory
12+
13+
14+
uuid = UUIDFactory()
15+
logger_ = Logger(__name__)
16+
17+
18+
@dataclass
19+
class JobResults:
20+
"""
21+
Dataclass to hold the results of a single job.
22+
"""
23+
job_id: str
24+
"""The unique identifier of the job."""
25+
26+
data: Any | None
27+
"""Optional data returned by the job."""
28+
29+
error: Any | None
30+
"""Error that occured during job execution, if any."""
31+
32+
@property
33+
def success(self) -> bool:
34+
"""
35+
Whether the job completed successfully without errors.
36+
"""
37+
return self.error is None
38+
39+
40+
@dataclass
41+
class JobConfig:
42+
"""
43+
Base class for passing configuration to a job.
44+
"""
45+
...
46+
47+
48+
JobConfigType = TypeVar('JobConfigType', bound=JobConfig)
49+
50+
51+
class _DummyPool:
52+
"""
53+
A dummy context manager for job execution outside of a pool.
54+
"""
55+
async def __aenter__(self):
56+
return None
57+
58+
async def __aexit__(self, exc_type, exc_val, exc_tb):
59+
return None
60+
61+
62+
class Job(abc.ABC, Generic[JobConfigType]):
63+
_registry: ClassVar[dict[str, 'Job']] = {}
64+
"""Registry of all alive jobs of this class."""
65+
66+
# Note that this class variable is updated in __init_subclass__ when the subclass
67+
# is defined with a generic parameter, e.g., Job[Config].
68+
_config_class: ClassVar[Type[JobConfig]] = JobConfig
69+
"""The configuration class for this job type."""
70+
71+
_logging_level: ClassVar[int] = logging.INFO
72+
_logging_fmt: ClassVar[str] = '[%(name)s @ %(asctime)s]: %(message)s'
73+
_logging_fmt_date: ClassVar[str] = '%H:%M:%S'
74+
_logging_directory: ClassVar[Path] = Path.cwd()
75+
76+
_job_prefix: ClassVar[str] = 'xtl_job'
77+
78+
def __init__(self, job_id: str | None = None, logger: 'logging.Logger' = None):
79+
"""
80+
Abstract base class for asynchronous jobs execution.
81+
82+
:param job_id: Optional, a unique identifier for the job. If not provided,
83+
a unique ID will be generated.
84+
:param logger: Optional, a custom logger for the job. If not provided,
85+
a logger will be created using `job_id`.
86+
"""
87+
# Create a unique job ID
88+
self._job_id = str(job_id) if job_id else \
89+
uuid.random(length=settings.automate.job_digits)
90+
while self._job_id in self._registry:
91+
# Regenerate if necessary
92+
logger_.debug('Regenerating job ID: %s', self._job_id)
93+
self._job_id = uuid.random(length=settings.automate.job_digits)
94+
95+
# Attach a logger
96+
self._logger = logger or self.get_logger(self.job_id)
97+
98+
# Register the job in the class registry
99+
self.__class__._registry[self._job_id] = self
100+
101+
# Job state
102+
self._is_running = False
103+
self._is_complete = False
104+
self._error: Exception | None = None
105+
106+
# Pool integration
107+
self._pool: Optional['JobPool'] = None
108+
109+
# Initialize config
110+
self._config: JobConfigType | None = None
111+
112+
def __init_subclass__(cls, **kwargs):
113+
super().__init_subclass__(**kwargs)
114+
115+
# Check if the class was defined with a generic parameter
116+
config_class = None
117+
if hasattr(cls, '__orig_bases__'):
118+
# Iterate over the base classes
119+
for base in cls.__orig_bases__:
120+
# Check if the base class is a generic type, since Job is generic
121+
if hasattr(base, '__origin__'):
122+
# Check if the base class is Job
123+
if base.__origin__ is Job:
124+
# Get the type arguments of the Job generic
125+
args = get_args(base)
126+
if args and len(args) > 0:
127+
# The first argument should be the JobConfig type
128+
config_class = args[0]
129+
break
130+
131+
if config_class is not None:
132+
# Check if the config_class inherits from JobConfig
133+
if not issubclass(config_class, JobConfig):
134+
raise TypeError(f'{JobConfigType.__name__} must be a subclass of '
135+
f'{JobConfig.__name__}, '
136+
f'got {config_class.__name__}')
137+
# Update the _config_class of the subclass with the new config type
138+
cls._config_class = config_class
139+
else:
140+
# Default to JobConfig if no specific type is provided
141+
cls._config_class = JobConfig
142+
143+
def configure(self, config: JobConfigType) -> None:
144+
"""
145+
Pass configuration to the job.
146+
"""
147+
config_class = self.__class__._config_class
148+
if not isinstance(config, config_class):
149+
raise TypeError(f'Expected config of type {config_class.__name__}, '
150+
f'got {type(config).__name__}')
151+
self._config = config
152+
153+
@property
154+
def config(self) -> JobConfigType | None:
155+
"""
156+
Get the configuration of the job.
157+
"""
158+
return self._config
159+
160+
@classmethod
161+
def with_config(cls, config: JobConfigType = None, **kwargs) -> \
162+
'Job[JobConfigType]':
163+
"""
164+
Create a preconfigured job instance.
165+
"""
166+
if not isinstance(config, cls._config_class):
167+
raise TypeError(f'Expected config of type {cls._config_class.__name__}, '
168+
f'got {type(config).__name__}')
169+
170+
# Extract init parameters
171+
job_id = kwargs.pop('job_id', None)
172+
logger = kwargs.pop('logger', None)
173+
174+
# Create job instance
175+
job = cls(job_id=job_id, logger=logger)
176+
177+
# Set configuration if provided
178+
if config is not None:
179+
job.configure(config)
180+
181+
return job
182+
183+
@classmethod
184+
def map(cls, configs: Sequence[JobConfigType] | JobConfigType = None) -> \
185+
tuple['Job[JobConfigType]', ...]:
186+
"""
187+
Map a list of configurations to job instances.
188+
"""
189+
if not isinstance(configs, Sequence):
190+
if not isinstance(configs, cls._config_class):
191+
raise TypeError(f'Expected a sequence of {cls._config_class.__name__}, '
192+
f'got {type(configs).__name__}')
193+
configs = (configs,)
194+
return tuple(cls.with_config(config) for config in configs)
195+
196+
@property
197+
def job_id(self) -> str:
198+
"""
199+
Get the unique identifier of the job.
200+
"""
201+
return self._job_id
202+
203+
@property
204+
def is_running(self) -> bool:
205+
"""
206+
Check if the job is currently running.
207+
"""
208+
return self._is_running
209+
210+
@property
211+
def is_complete(self) -> bool:
212+
"""
213+
Check if the job execution has completed.
214+
"""
215+
return self._is_complete
216+
217+
@property
218+
def pool(self) -> Optional['JobPool']:
219+
"""
220+
Get the job pool associated with this job, if any.
221+
"""
222+
return self._pool
223+
224+
@pool.setter
225+
def pool(self, pool: Optional['JobPool']) -> None:
226+
# Avoid circular import by checking class name instead of importing JobPool
227+
if pool is not None and pool.__class__.__name__ != 'JobPool':
228+
raise TypeError(f'Expected a JobPool instance, '
229+
f'got {type(pool).__name__}')
230+
self._pool = pool
231+
232+
def __del__(self) -> None:
233+
del self.__class__._registry[self._job_id]
234+
235+
@abc.abstractmethod
236+
async def _execute(self) -> Any | None:
237+
"""
238+
The actual task of the job. Needs to be implemented by subclasses.
239+
240+
:returns: Optional, any data to be included in the JobResults.
241+
"""
242+
...
243+
244+
async def run(self) -> JobResults | None:
245+
"""
246+
Run the job asynchronously.
247+
"""
248+
if self._is_running:
249+
self._logger.warning("Job %s is already running", self._job_id)
250+
return None
251+
252+
if self._is_complete:
253+
self._logger.warning("Job %s has already completed", self._job_id)
254+
return None
255+
256+
result = None
257+
try:
258+
self._is_running = True
259+
self._logger.info('Launching job...')
260+
result = await self._execute()
261+
self._is_complete = True
262+
self._logger.info('Job completed successfully')
263+
except Exception as e:
264+
self._error = e
265+
self._logger.exception('Job failed due to an exception')
266+
finally:
267+
self._is_running = False
268+
if self._error:
269+
self._logger.info('Job aborted successfully')
270+
271+
return JobResults(job_id=self._job_id, data=result, error=self._error)
272+
273+
async def lock(self):
274+
"""
275+
Context manager to acquire a lock during job execution.
276+
"""
277+
if self._pool is None:
278+
# If no pool is set, return a dummy pool that does nothing
279+
return _DummyPool()
280+
# Use the pool's lock
281+
return self._pool.request_lock()
282+
283+
@property
284+
def logger(self) -> logging.Logger:
285+
"""
286+
Get the logger associated with this job.
287+
"""
288+
return self._logger
289+
290+
@classmethod
291+
def get_logger(cls, job_id: str, **kwargs) -> logging.Logger:
292+
"""
293+
Get or create a logger for the job with the given ID.
294+
"""
295+
# Recover existing loggers
296+
if job_id in cls._registry:
297+
return cls._registry[job_id].logger
298+
299+
# Create and configure new logger
300+
logger = logging.getLogger(job_id)
301+
cls._configure_logger(logger, **kwargs)
302+
303+
return logger
304+
305+
@classmethod
306+
def _configure_logger(cls, logger: logging.Logger, level: int = None,
307+
fmt: str = None, datefmt: str = None, path: str | Path = None,
308+
stream_handler: bool = True,
309+
file_handler: bool = False) -> None:
310+
"""
311+
Configure the logger for the job.
312+
"""
313+
# Prevent propagation
314+
logger.propagate = False
315+
316+
# Set log level
317+
level = level or cls._logging_level
318+
logger.setLevel(level)
319+
320+
# Create formatter
321+
fmt = fmt or cls._logging_fmt
322+
datefmt = datefmt or cls._logging_fmt_date
323+
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
324+
325+
# Create and attach handlers
326+
if stream_handler:
327+
import sys
328+
stream = logging.StreamHandler(stream=sys.stdout)
329+
stream.setFormatter(formatter)
330+
logger.addHandler(stream)
331+
if file_handler:
332+
log = path or cls._logging_directory / f'{cls._job_prefix}_{logger.name}.log'
333+
file = logging.FileHandler(filename=log, mode='a', encoding='utf-8')
334+
file.setFormatter(formatter)
335+
logger.addHandler(file)

0 commit comments

Comments
 (0)