Skip to content

Commit 9d37558

Browse files
committed
Start of service-oriented application mode
1 parent cc8405c commit 9d37558

File tree

7 files changed

+147
-49
lines changed

7 files changed

+147
-49
lines changed

pyrunner/core/config.py

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -81,32 +81,33 @@ def __init__(self):
8181
'exec_to_id' : { 'type': int , 'preserve': False, 'env': None, 'value': None, 'default': None },
8282

8383
# Preservable runtime params
84-
'app_version' : { 'type': str , 'preserve': True, 'env': 'APP_VERSION' , 'value': None, 'default': '0.0.0' },
85-
'app_name' : { 'type': str , 'preserve': True, 'env': 'APP_NAME' , 'value': None, 'default': "PyrunnerApp_{}".format(uuid.uuid4()) },
86-
'app_start_time' : { 'type': str , 'preserve': True, 'env': None , 'value': None, 'default': None },
87-
'app_root_dir' : { 'type': str , 'preserve': True, 'env': 'APP_ROOT_DIR' , 'value': None, 'default': None },
88-
'config_dir' : { 'type': str , 'preserve': True, 'env': 'APP_CONFIG_DIR' , 'value': None, 'default': None },
89-
'temp_dir' : { 'type': str , 'preserve': True, 'env': 'APP_TEMP_DIR' , 'value': None, 'default': None },
90-
'log_dir' : { 'type': str , 'preserve': True, 'env': 'APP_LOG_DIR' , 'value': None, 'default': None },
91-
'root_log_dir' : { 'type': str , 'preserve': True, 'env': 'APP_ROOT_LOG_DIR' , 'value': None, 'default': None },
92-
'worker_dir' : { 'type': str , 'preserve': True, 'env': 'APP_WORKER_DIR' , 'value': None, 'default': None },
93-
'nozip' : { 'type': bool, 'preserve': False, 'env': 'APP_NOZIP' , 'value': None, 'default': False },
94-
'dump_logs' : { 'type': bool, 'preserve': False, 'env': 'APP_DUMP_LOGS' , 'value': None, 'default': False },
95-
'email' : { 'type': str , 'preserve': False, 'env': 'APP_EMAIL' , 'value': None, 'default': None },
96-
'silent' : { 'type': bool, 'preserve': False, 'env': 'APP_SILENT' , 'value': None, 'default': False },
97-
'debug' : { 'type': bool, 'preserve': False, 'env': 'APP_DEBUG' , 'value': None, 'default': False },
98-
'tickrate' : { 'type': int , 'preserve': False, 'env': 'APP_TICKRATE' , 'value': None, 'default': 1 },
99-
'time_between_tasks' : { 'type': int , 'preserve': True, 'env': 'APP_TIME_BETWEEN_TASKS', 'value': None, 'default': 0 },
100-
'save_interval' : { 'type': int , 'preserve': False, 'env': 'APP_SAVE_INTERVAL' , 'value': None, 'default': 10 },
101-
'max_procs' : { 'type': int , 'preserve': False, 'env': 'APP_MAX_PROCS' , 'value': None, 'default': -1 },
102-
'log_retention' : { 'type': int , 'preserve': True, 'env': 'APP_LOG_RETENTION' , 'value': None, 'default': 30 },
103-
'dryrun' : { 'type': bool, 'preserve': False, 'env': 'APP_DRYRUN' , 'value': None, 'default': False },
104-
'email_on_fail' : { 'type': bool, 'preserve': False, 'env': 'APP_EMAIL_ON_FAIL' , 'value': None, 'default': True },
105-
'email_on_success' : { 'type': bool, 'preserve': False, 'env': 'APP_EMAIL_ON_SUCCESS' , 'value': None, 'default': True },
106-
'notify_on_fail' : { 'type': bool, 'preserve': False, 'env': 'APP_NOTIFY_ON_FAIL' , 'value': None, 'default': True },
107-
'notify_on_success' : { 'type': bool, 'preserve': False, 'env': 'APP_NOTIFY_ON_SUCCESS' , 'value': None, 'default': True },
108-
'as_service' : { 'type': bool, 'preserve': True, 'env': 'APP_AS_SERVICE' , 'value': None, 'default': False },
109-
'test_mode' : { 'type': bool, 'preserve': True, 'env': 'APP_TEST_MODE' , 'value': None, 'default': False }
84+
'app_version' : { 'type': str , 'preserve': True, 'env': 'APP_VERSION' , 'value': None, 'default': '0.0.0' },
85+
'app_name' : { 'type': str , 'preserve': True, 'env': 'APP_NAME' , 'value': None, 'default': "PyrunnerApp_{}".format(uuid.uuid4()) },
86+
'app_start_time' : { 'type': str , 'preserve': True, 'env': None , 'value': None, 'default': None },
87+
'app_root_dir' : { 'type': str , 'preserve': True, 'env': 'APP_ROOT_DIR' , 'value': None, 'default': None },
88+
'config_dir' : { 'type': str , 'preserve': True, 'env': 'APP_CONFIG_DIR' , 'value': None, 'default': None },
89+
'temp_dir' : { 'type': str , 'preserve': True, 'env': 'APP_TEMP_DIR' , 'value': None, 'default': None },
90+
'log_dir' : { 'type': str , 'preserve': True, 'env': 'APP_LOG_DIR' , 'value': None, 'default': None },
91+
'root_log_dir' : { 'type': str , 'preserve': True, 'env': 'APP_ROOT_LOG_DIR' , 'value': None, 'default': None },
92+
'worker_dir' : { 'type': str , 'preserve': True, 'env': 'APP_WORKER_DIR' , 'value': None, 'default': None },
93+
'nozip' : { 'type': bool, 'preserve': False, 'env': 'APP_NOZIP' , 'value': None, 'default': False },
94+
'dump_logs' : { 'type': bool, 'preserve': False, 'env': 'APP_DUMP_LOGS' , 'value': None, 'default': False },
95+
'email' : { 'type': str , 'preserve': False, 'env': 'APP_EMAIL' , 'value': None, 'default': None },
96+
'silent' : { 'type': bool, 'preserve': False, 'env': 'APP_SILENT' , 'value': None, 'default': False },
97+
'debug' : { 'type': bool, 'preserve': False, 'env': 'APP_DEBUG' , 'value': None, 'default': False },
98+
'tickrate' : { 'type': int , 'preserve': False, 'env': 'APP_TICKRATE' , 'value': None, 'default': 1 },
99+
'time_between_tasks' : { 'type': int , 'preserve': True, 'env': 'APP_TIME_BETWEEN_TASKS' , 'value': None, 'default': 0 },
100+
'save_interval' : { 'type': int , 'preserve': False, 'env': 'APP_SAVE_INTERVAL' , 'value': None, 'default': 10 },
101+
'max_procs' : { 'type': int , 'preserve': False, 'env': 'APP_MAX_PROCS' , 'value': None, 'default': -1 },
102+
'log_retention' : { 'type': int , 'preserve': True, 'env': 'APP_LOG_RETENTION' , 'value': None, 'default': 30 },
103+
'dryrun' : { 'type': bool, 'preserve': False, 'env': 'APP_DRYRUN' , 'value': None, 'default': False },
104+
'email_on_fail' : { 'type': bool, 'preserve': False, 'env': 'APP_EMAIL_ON_FAIL' , 'value': None, 'default': True },
105+
'email_on_success' : { 'type': bool, 'preserve': False, 'env': 'APP_EMAIL_ON_SUCCESS' , 'value': None, 'default': True },
106+
'notify_on_fail' : { 'type': bool, 'preserve': False, 'env': 'APP_NOTIFY_ON_FAIL' , 'value': None, 'default': True },
107+
'notify_on_success' : { 'type': bool, 'preserve': False, 'env': 'APP_NOTIFY_ON_SUCCESS' , 'value': None, 'default': True },
108+
'as_service' : { 'type': bool, 'preserve': True, 'env': 'APP_AS_SERVICE' , 'value': None, 'default': False },
109+
'service_exec_interval': { 'type': int , 'preserve': False, 'env': 'APP_SERVICE_EXEC_INTERVAL', 'value': None, 'default': 1 },
110+
'test_mode' : { 'type': bool, 'preserve': True, 'env': 'APP_TEST_MODE' , 'value': None, 'default': False }
110111
}
111112
self._iter_keys = None
112113

@@ -214,6 +215,26 @@ def __contains__(self, key):
214215
"""
215216
return key in self._attr
216217

218+
def is_set(self, key):
219+
"""
220+
Determines if key is set either by env var or manually set variable.
221+
222+
Args:
223+
key (str): the key name for which to check if is set
224+
225+
Returns:
226+
Boolean indicating whether or not key is set. False if relying on default value, True otherwise.
227+
"""
228+
229+
detl = self._attr.get(key)
230+
if not detl:
231+
raise KeyError('Config object does not store key: {}'.format(key))
232+
233+
if detl['value'] is None and detl['env'] is None:
234+
return False
235+
else:
236+
return True
237+
217238
def items(self, only_preserve=True):
218239
"""
219240
Converts the Config object into a simple dictionary containing only simple key:value pairs.

pyrunner/core/engine.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import pyrunner.core.constants as constants
1818
from pyrunner.core.config import Config
1919
from pyrunner.core.context import Context
20-
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PULSE, SIG_RESTART
20+
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PULSE, SIG_REVIVE
2121
from multiprocessing import Manager
2222

2323
import sys, time
@@ -96,9 +96,15 @@ def initiate(self, **kwargs):
9696
self._abort_all_workers()
9797
return -1
9898

99-
# Check for restart signals; restart failed nodes, if any
100-
if signal_handler.consume(SIG_RESTART):
101-
pass
99+
# Check for revive signals; revive failed nodes, if any
100+
if signal_handler.consume(SIG_REVIVE):
101+
for node in self.register.failed_nodes.copy():
102+
node.revive()
103+
self.register.failed_nodes.remove(node)
104+
self.register.pending_nodes.add(node)
105+
for node in self.register.defaulted_nodes.copy():
106+
self.register.defaulted_nodes.remove(node)
107+
self.register.pending_nodes.add(node)
102108

103109
# Poll running nodes for completion/failure
104110
for node in self.register.running_nodes.copy():

pyrunner/core/node.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ def __init__(self, id=-1, name=None):
4848
self._proc = None
4949
self._context = None
5050

51+
# Service execution mode properties
52+
self._as_service = False
53+
self._exec_interval = 1
54+
5155
self._module = None
5256
self._worker = None
5357
self._worker_instance = None
@@ -69,6 +73,10 @@ def __lt__(self, other):
6973
def is_runnable(self):
7074
return time.time() >= self._wait_until
7175

76+
def revive(self):
77+
self._attempts = 0
78+
self._wait_until = time.time() + self._exec_interval
79+
7280
def execute(self):
7381
"""
7482
Spawns a new process via the `run` method of defined Worker class.
@@ -94,7 +102,7 @@ def execute(self):
94102
raise TypeError('{}.{} is not an extension of pyrunner.Worker'.format(self.module, self.worker))
95103

96104
# Launch the "run" method of the provided Worker under a new process.
97-
self._worker_instance = self.worker_class(self.context, self.logfile, self.argv)
105+
self._worker_instance = self.worker_class(self.context, self.logfile, self.argv, self.as_service)
98106
self._proc = multiprocessing.Process(target=self._worker_instance.protected_run, daemon=False)
99107
self._proc.start()
100108
except Exception as e:
@@ -339,4 +347,22 @@ def child_nodes(self):
339347

340348
@property
341349
def worker_class(self):
342-
return getattr(importlib.import_module(self.module), self.worker)
350+
return getattr(importlib.import_module(self.module), self.worker)
351+
352+
@property
353+
def as_service(self):
354+
return getattr(self, '_as_service', False)
355+
@as_service.setter
356+
def as_service(self, value):
357+
self._as_service = bool(value)
358+
return self
359+
360+
@property
361+
def exec_interval(self):
362+
return getattr(self, '_exec_interval', 0)
363+
@exec_interval.setter
364+
def exec_interval(self, value):
365+
if int(value) < 0:
366+
raise ValueError('exec_interval must be >= 0')
367+
self._exec_interval = int(value)
368+
return self

pyrunner/core/pyrunner.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from pyrunner.core.engine import ExecutionEngine
2929
from pyrunner.core.config import Config
3030
from pyrunner.core.register import NodeRegister
31-
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE, SIG_PULSE
31+
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_REVIVE, SIG_PULSE
3232
from pyrunner.version import __version__
3333

3434
from pyrunner.notification import Notification
@@ -83,6 +83,11 @@ def load_proc_file(self, proc_file, restart=False):
8383
if not self.register or not isinstance(self.register, NodeRegister):
8484
return False
8585

86+
# Update nodes to run in service mode, if app running as service
87+
for node in self.register.all_nodes:
88+
node.as_service = True
89+
node.exec_interval = self.config['service_exec_interval']
90+
8691
return True
8792

8893
@property
@@ -150,6 +155,12 @@ def exec_from(self, id) : return self.register.exec_from(id)
150155
def exec_disable(self, id_list) : return self.register.exec_disable(id_list)
151156

152157
def prepare(self):
158+
# Deprecation messages
159+
if self.config.is_set('email_on_fail'):
160+
print('Warning: --email-on-fail option (and APP_EMAIL_ON_FAIL env var) will be deprecated in version 6. Use --notify-on-fail option (APP_NOTIFY_ON_FAIL) instead.')
161+
if self.config.is_set('email_on_success'):
162+
print('Warning: --email-on-success option (and APP_EMAIL_ON_SUCCESS env var) will be deprecated in version 6. Use --notify-on-success option (APP_EMAIL_ON_SUCCESS) instead.')
163+
153164
# Initialize NodeRegister
154165
if self.config['restart']:
155166
self.load_state()
@@ -348,7 +359,7 @@ def is_restartable(self):
348359
return True
349360

350361
def parse_args(self, run_getopts=True):
351-
abort = False
362+
abort, revive = False, False
352363

353364
opt_list = 'c:l:n:e:x:N:D:A:t:drhiv'
354365
longopt_list = [
@@ -360,7 +371,8 @@ def parse_args(self, run_getopts=True):
360371
'to=', 'from=', 'descendants=', 'ancestors=',
361372
'norun=', 'exec-only=', 'exec-proc-name=',
362373
'max-procs=', 'serde=', 'exec-loop-interval=',
363-
'notify-on-fail=', 'notify-on-success=', 'as-service'
374+
'notify-on-fail=', 'notify-on-success=', 'as-service',
375+
'service-exec-interval=', 'revive'
364376
]
365377

366378
if run_getopts:
@@ -424,8 +436,12 @@ def parse_args(self, run_getopts=True):
424436
self.config['allow_duplicate_jobs'] = True
425437
elif opt in ['--exec-proc-name']:
426438
self.config['exec_proc_name'] = arg
439+
elif opt == '--service-exec-interval':
440+
self.config['service_exec_interval'] = int(arg)
427441
elif opt == '--as-service':
428442
self.config['as_service'] = True
443+
elif opt == '--revive':
444+
revive = True
429445
elif opt == '--abort':
430446
abort = True
431447
elif opt == '--silent':
@@ -455,6 +471,11 @@ def parse_args(self, run_getopts=True):
455471
self.signal_handler.emit(SIG_ABORT)
456472
sys.exit(0)
457473

474+
if revive:
475+
print('Submitting REVIVE signal to running job for: {}'.format(self.config['app_name']))
476+
self.signal_handler.emit(SIG_REVIVE)
477+
sys.exit(0)
478+
458479
# Check if restart is possible (ctllog/ctx files exist)
459480
if self.config['restart'] and not self.is_restartable():
460481
self.config['restart'] = False

pyrunner/core/signal.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
SIG_ABORT = 'sig.abort'
2020
SIG_PAUSE = 'sig.pause'
2121
SIG_PULSE = 'sig.pulse'
22-
SIG_RESTART = 'sig.restart'
22+
SIG_REVIVE = 'sig.revive'
2323

24-
_valid_signals = (SIG_ABORT, SIG_PAUSE, SIG_PULSE, SIG_RESTART)
24+
_valid_signals = (SIG_ABORT, SIG_PAUSE, SIG_PULSE, SIG_REVIVE)
2525

2626
class SignalHandler:
2727

pyrunner/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '6.0.0'
1+
__version__ = '5.3.0'

pyrunner/worker/abstract.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
#
1515
# SPDX-License-Identifier: Apache-2.0
1616

17-
import traceback, sys
17+
import traceback, sys, time
1818
import multiprocessing.sharedctypes
1919

2020
import pyrunner.logger.file as lg
@@ -33,12 +33,15 @@ class Worker(ABC):
3333
- on_exit()
3434
"""
3535

36-
def __init__(self, context, logfile, argv):
36+
def __init__(self, context, logfile, argv, as_service, service_exec_interval=1):
3737
self.context = context
3838
self._retcode = multiprocessing.sharedctypes.Value('i', 0)
3939
self.logfile = logfile
4040
self.logger = None
4141
self.argv = argv
42+
self._as_service = as_service
43+
self._service_exec_interval = service_exec_interval
44+
4245
return
4346

4447
def cleanup(self):
@@ -65,14 +68,28 @@ def protected_run(self):
6568
sys.stdout = self.logger.logfile_handle
6669
sys.stderr = self.logger.logfile_handle
6770

71+
# ON START
72+
try:
73+
self.retcode = self.on_start() or self.retcode
74+
except NotImplementedError:
75+
pass
76+
except Exception as e:
77+
self.logger.error('Uncaught Exception from Worker Thread (ON_START)')
78+
self.logger.error(str(e))
79+
self.logger.error(traceback.format_exc())
80+
self.retcode = 902
81+
6882
# RUN
6983
try:
70-
self.retcode = self.run() or self.retcode
84+
while True:
85+
self.retcode = self.run() or self.retcode
86+
if not self._as_service: break
87+
time.sleep(self._service_exec_interval)
7188
except Exception as e:
7289
self.logger.error("Uncaught Exception from Worker Thread (RUN)")
7390
self.logger.error(str(e))
7491
self.logger.error(traceback.format_exc())
75-
self.retcode = 901
92+
self.retcode = 903
7693

7794
if not self.retcode:
7895
# ON SUCCESS
@@ -84,7 +101,7 @@ def protected_run(self):
84101
self.logger.error('Uncaught Exception from Worker Thread (ON_SUCCESS)')
85102
self.logger.error(str(e))
86103
self.logger.error(traceback.format_exc())
87-
self.retcode = 902
104+
self.retcode = 904
88105
else:
89106
# ON FAIL
90107
try:
@@ -95,25 +112,32 @@ def protected_run(self):
95112
self.logger.error('Uncaught Exception from Worker Thread (ON_FAIL)')
96113
self.logger.error(str(e))
97114
self.logger.error(traceback.format_exc())
98-
self.retcode = 903
115+
self.retcode = 905
99116

100117
# ON EXIT
101118
try:
102-
self.retcode = self.on_exit() or self.retcode
119+
self.retcode = self.on_destroy() or self.retcode
103120
except NotImplementedError:
104121
pass
105122
except Exception as e:
106-
self.logger.error('Uncaught Exception from Worker Thread (ON_EXIT)')
123+
self.logger.error('Uncaught Exception from Worker Thread (ON_DESTROY)')
107124
self.logger.error(str(e))
108125
self.logger.error(traceback.format_exc())
109-
self.retcode = 904
126+
self.retcode = 906
110127

111128
self.logger.close()
112129
self.logger = None
113130

114131
return
115132

116133
# To be implemented in user-defined workers.
134+
def on_start(self):
135+
"""
136+
Optional lifecycle method. Is only executed when the worker is started/restarted.
137+
This part of the lifecycle is redundant to the run() method unless app is run as a service.
138+
"""
139+
raise NotImplementedError('Method "on_start" is not implemented')
140+
117141
@abstractmethod
118142
def run(self):
119143
"""
@@ -136,10 +160,10 @@ def on_fail(self):
136160
"""
137161
raise NotImplementedError('Method "on_fail" is not implemented')
138162

139-
def on_exit(self):
163+
def on_destroy(self):
140164
"""
141165
Optional lifecycle method. Is always executed, if implemented, but always
142166
after on_success() or on_fail().
143167
"""
144-
raise NotImplementedError('Method "on_exit" is not implemented')
168+
raise NotImplementedError('Method "on_destroy" is not implemented')
145169

0 commit comments

Comments
 (0)