Skip to content

Commit 7f7e307

Browse files
committed
Merge pull request #61 from shinken-monitoring/clean_main_module
Clean and simplification in main module
2 parents 0880de2 + 4b5a272 commit 7f7e307

File tree

3 files changed

+61
-186
lines changed

3 files changed

+61
-186
lines changed

module/module.py

Lines changed: 59 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def get_instance(plugin):
4747

4848
#############################################################################
4949

50+
import errno
5051
import select
5152
import socket
5253
import os
@@ -104,7 +105,6 @@ def __init__(self, modconf):
104105
# We can be in a scheduler. If so, we keep a link to it to speed up regenerator phase
105106
self.scheduler = None
106107
self.plugins = []
107-
self.use_threads = (getattr(modconf, 'use_threads', '0') == '1')
108108
self.host = getattr(modconf, 'host', '127.0.0.1')
109109
if self.host == '*':
110110
self.host = '0.0.0.0'
@@ -149,7 +149,10 @@ def __init__(self, modconf):
149149
# We need to have our regenerator now because it will need to load
150150
# data from scheduler before main() if in scheduler of course
151151
self.rg = LiveStatusRegenerator(self.service_authorization_strict, self.group_authorization_strict)
152-
self.client_connections = {} # keys will be socket of client, values are LiveStatusClientThread instances
152+
153+
self.client_connections = {} # keys will be socket of client,
154+
# values are LiveStatusClientThread instances
155+
153156
self.db = None
154157
self.listeners = []
155158
self._listening_thread = threading.Thread(target=self._listening_thread_run)
@@ -236,8 +239,6 @@ def main(self):
236239
self.rg.register_cache(self.query_cache)
237240

238241
try:
239-
#import cProfile
240-
#cProfile.runctx('''self.do_main()''', globals(), locals(),'/tmp/livestatus.profile')
241242
self.do_main()
242243
except Exception, exp:
243244
msg = Message(id=0, type='ICrash', data={
@@ -276,125 +277,15 @@ def do_main(self):
276277
if self.db.max_logs_age > 0:
277278
self.db.log_db_do_archive()
278279

279-
# We ill protect the operations on
280-
# the non read+write with a lock and
281-
# 2 int
282-
self.global_lock = threading.RLock()
283-
self.nb_readers = 0
284-
self.nb_writers = 0
285-
286-
self.data_thread = None
287-
288-
# Check if some og the required directories exist
289-
#if not os.path.exists(bottle.TEMPLATE_PATH[0]):
290-
# logger.error('The view path do not exist at %s' % bottle.TEMPLATE_PATH)
291-
# sys.exit(2)
292-
293280
self.load_plugins()
294-
295-
if self.use_threads:
296-
# Launch the data thread"
297-
logger.info("[Livestatus Broker] Starting Livestatus application")
298-
self.data_thread = threading.Thread(None, self.manage_brok_thread, 'datathread')
299-
self.data_thread.start()
300-
self.lql_thread = threading.Thread(None, self.manage_lql_thread, 'lqlthread')
301-
self.lql_thread.start()
302-
self.data_thread.join()
303-
self.lql_thread.join()
304-
else:
305-
self.manage_lql_thread()
306-
307-
308-
# It's the thread function that will get broks
309-
# and update data. Will lock the whole thing
310-
# while updating
311-
def manage_brok_thread(self):
312-
logger.info("[Livestatus Broker] Data thread started")
313-
while True:
314-
l = self.to_q.get()
315-
for b in l:
316-
# Un-serialize the brok data
317-
b.prepare()
318-
# For updating, we cannot do it while
319-
# answer queries, so wait for no readers
320-
self.wait_for_no_readers()
321-
try:
322-
logger.debug("[Livestatus Broker] Got data lock, manage brok")
323-
self.rg.manage_brok(b)
324-
for mod in self.modules_manager.get_internal_instances():
325-
try:
326-
mod.manage_brok(b)
327-
except Exception, exp:
328-
logger.debug("[Livestatus Broker] %s" % str(exp.__dict__))
329-
logger.warning("[%s] The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp)))
330-
logger.debug("[%s] Exception type: %s" % (self.name, type(exp)))
331-
logger.debug("Back trace of this kill: %s" % (traceback.format_exc()))
332-
self.modules_manager.set_to_restart(mod)
333-
except Exception, exp:
334-
msg = Message(id=0, type='ICrash', data={
335-
'name': self.get_name(),
336-
'exception': exp,
337-
'trace': traceback.format_exc()
338-
})
339-
self.from_q.put(msg)
340-
# wait 2 sec so we know that the broker got our message, and die
341-
time.sleep(2)
342-
# No need to raise here, we are in a thread, exit!
343-
os._exit(2)
344-
#finally:
345-
# We can remove us as a writer from now. It's NOT an atomic operation
346-
# so we REALLY not need a lock here (yes, I try without and I got
347-
# a not so accurate value there....)
348-
self.global_lock.acquire()
349-
self.nb_writers -= 1
350-
self.global_lock.release()
281+
self.main_thread_run()
351282

352283
# Here we will load all plugins (pages) under the webui/plugins
353284
# directory. Each one can have a page, views and htdocs dir that we must
354285
# route correctly
355286
def load_plugins(self):
356287
pass
357288

358-
# It will say if we can launch a page rendering or not.
359-
# We can only if there is no writer running from now
360-
def wait_for_no_writers(self):
361-
while True:
362-
self.global_lock.acquire()
363-
# We will be able to run
364-
if self.nb_writers == 0:
365-
# Ok, we can run, register us as readers
366-
self.nb_readers += 1
367-
self.global_lock.release()
368-
break
369-
# Oups, a writer is in progress. We must wait a bit
370-
self.global_lock.release()
371-
# Before checking again, we should wait a bit
372-
# like 1ms
373-
time.sleep(0.001)
374-
375-
# It will say if we can launch a brok management or not
376-
# We can only if there is no readers running from now
377-
def wait_for_no_readers(self):
378-
start = time.time()
379-
while True:
380-
self.global_lock.acquire()
381-
# We will be able to run
382-
if self.nb_readers == 0:
383-
# Ok, we can run, register us as writers
384-
self.nb_writers += 1
385-
self.global_lock.release()
386-
break
387-
# Ok, we cannot run now, wait a bit
388-
self.global_lock.release()
389-
# Before checking again, we should wait a bit
390-
# like 1ms
391-
time.sleep(0.001)
392-
# We should warn if we cannot update broks
393-
# for more than 30s because it can be not good
394-
if time.time() - start > 30:
395-
logger.warning("[Livestatus Broker] WARNING: we are in lock/read since more than 30s!")
396-
start = time.time()
397-
398289
def manage_brok(self, brok):
399290
"""We use this method mostly for the unit tests"""
400291
brok.prepare()
@@ -418,6 +309,7 @@ def do_stop(self):
418309
for client in self.client_connections.values():
419310
assert isinstance(client, LiveStatusClientThread)
420311
client.join()
312+
self.client_connections.clear()
421313
if self._listening_thread:
422314
self._listening_thread.join()
423315
# inputs must be closed after listening_thread
@@ -428,7 +320,6 @@ def do_stop(self):
428320
except Exception as err:
429321
logger.warning('Error on db close: %s' % err)
430322

431-
432323
def create_listeners(self):
433324
backlog = 5
434325
if self.port:
@@ -459,43 +350,34 @@ def create_listeners(self):
459350
self.listeners.append(sock)
460351
logger.info("[Livestatus Broker] listening on unix socket: %s" % str(self.socket))
461352

462-
463353
def _listening_thread_run(self):
464354
while not self.interrupted:
465-
# Check for pending livestatus requests
355+
# Check for pending livestatus new connection..
466356
inputready, _, exceptready = select.select(self.listeners, [], [], 1)
467357

468358
if len(exceptready) > 0:
469-
pass
470-
471-
if len(inputready) > 0:
472-
for s in inputready:
473-
# We will identify sockets by their filehandle number
474-
# during the rest of this loop
475-
#socketid = s.fileno()
476-
if s in self.listeners:
477-
# handle the server socket
478-
sock, address = s.accept()
479-
if isinstance(address, tuple):
480-
client_ip, _ = address
481-
if self.allowed_hosts and client_ip not in self.allowed_hosts:
482-
logger.warning("[Livestatus Broker] Connection attempt from illegal ip address %s" % str(client_ip))
483-
full_safe_close(sock)
484-
continue
485-
486-
new_client = self.client_connections[sock] = LiveStatusClientThread(sock, address, self)
487-
new_client.start()
488-
self.livestatus.count_event('connections')
489-
490-
# end for s in inputready:
359+
pass # TODO ?
360+
361+
for s in inputready:
362+
# handle the server socket
363+
sock, address = s.accept()
364+
if isinstance(address, tuple):
365+
client_ip, _ = address
366+
if self.allowed_hosts and client_ip not in self.allowed_hosts:
367+
logger.warning("[Livestatus Broker] Connection attempt from illegal ip address %s" % str(client_ip))
368+
full_safe_close(sock)
369+
continue
370+
371+
new_client = self.client_connections[sock] = LiveStatusClientThread(sock, address, self)
372+
new_client.start()
373+
self.livestatus.count_event('connections')
374+
# end for s in inputready:
491375

492376
# At the end of this loop we probably will discard connections
493377
kick_connections = []
494378
for sock, client in self.client_connections.items():
495379
assert isinstance(client, LiveStatusClientThread)
496-
if client.is_alive():
497-
pass
498-
else:
380+
if not client.is_alive():
499381
kick_connections.append(sock)
500382

501383
for sock in kick_connections:
@@ -504,56 +386,49 @@ def _listening_thread_run(self):
504386
# It's the thread function that will get broks
505387
# and update data. Will lock the whole thing
506388
# while updating
507-
def manage_lql_thread(self):
389+
def main_thread_run(self):
508390
logger.info("[Livestatus Broker] Livestatus query thread started")
509-
self.db.open() # make sure to open the db in this thread..
391+
self.db.open() # make sure to open the db in this thread..
510392
# This is the main object of this broker where the action takes place
511393
self.livestatus = LiveStatus(self.datamgr, self.query_cache, self.db, self.pnp_path, self.from_q)
512394
self.create_listeners()
513395
self._listening_thread.start()
514396

397+
db_commit_next_time = time.time()
398+
515399
while not self.interrupted:
516-
if self.use_threads:
517-
self.wait_for_no_writers()
518-
self.livestatus.counters.calc_rate()
519-
else:
520-
try:
521-
l = self.to_q.get(True, 1)
522-
for b in l:
523-
# Un-serialize the brok data
524-
b.prepare()
525-
self.rg.manage_brok(b)
526-
for mod in self.modules_manager.get_internal_instances():
527-
try:
528-
mod.manage_brok(b)
529-
except Exception, exp:
530-
logger.debug("[Livestatus Broker] %s" % str(exp.__dict__))
531-
logger.warning("[%s] Warning: The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp)))
532-
logger.debug("[%s] Exception type: %s" % (self.name, type(exp)))
533-
logger.debug("Back trace of this kill: %s" % (traceback.format_exc()))
534-
self.modules_manager.set_to_restart(mod)
535-
except Queue.Empty:
536-
self.livestatus.counters.calc_rate()
537-
except IOError, e:
538-
if hasattr(os, 'errno') and e.errno != os.errno.EINTR:
539-
raise
540-
except Exception, exp:
541-
logger.debug("[Livestatus Broker] %s" % str(exp.__dict__))
542-
logger.error("[%s] Warning: The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp)))
543-
logger.debug("[%s] Exception type: %s" % (self.name, type(exp)))
544-
logger.debug("Back trace of this kill: %s" % (traceback.format_exc()))
400+
now = time.time()
401+
402+
self.livestatus.counters.calc_rate()
403+
404+
if db_commit_next_time < now:
405+
db_commit_next_time = now + 3 # only commit every ~3 secs
406+
self.db.commit_and_rotate_log_db()
407+
408+
try:
409+
l = self.to_q.get(True, 1)
410+
except IOError as err:
411+
if err.errno != os.errno.EINTR:
545412
raise
413+
except Queue.Empty:
414+
pass
415+
else:
416+
for b in l:
417+
b.prepare() # Un-serialize the brok data
418+
self.rg.manage_brok(b)
419+
for mod in self.modules_manager.get_internal_instances():
420+
try:
421+
mod.manage_brok(b)
422+
except Exception as err:
423+
logger.exception(
424+
"[%s] Warning: The mod %s raise an exception: %s,"
425+
"I'm tagging it to restart later",
426+
self.name, mod.get_name(), err)
427+
self.modules_manager.set_to_restart(mod)
546428

547-
# Commit log broks to the database
548-
self.db.commit_and_rotate_log_db()
429+
# just to have eventually more broks accumulated
430+
# in our input queue:
431+
time.sleep(0.1)
549432

550433
# end: while not self.interrupted:
551434
self.do_stop()
552-
553-
def write_protocol(self, request=None, response=None, sent=0):
554-
if self.debug_queries:
555-
if request is not None:
556-
print "REQUEST>>>>>\n" + request + "\n\n"
557-
if response is not None:
558-
print "RESPONSE<<<<\n" + response + "\n"
559-
print "RESPONSE SENT<<<<\n %s \n\n" % sent

test/test_livestatus_allowedhosts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def init_livestatus(self, conf):
7171

7272
# NOTE: function is blocking, so must be launched in a thread
7373
#self.livestatus_broker.do_main()
74-
self.lql_thread = threading.Thread(None, self.livestatus_broker.manage_lql_thread, 'lqlthread')
74+
self.lql_thread = threading.Thread(None, self.livestatus_broker.main_thread_run, 'lqlthread')
7575
self.lql_thread.start()
7676
# wait for thread to init
7777
time.sleep(3)

test/test_wait_query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def init_livestatus(self, conf):
7878
self.sched.fill_initial_broks('Default-Broker')
7979
self.update_broker()
8080
# execute the livestatus by starting a dedicated thread to run the manage_lql_thread function:
81-
self.lql_thread = threading.Thread(target=self.livestatus_broker.manage_lql_thread, name='lqlthread')
81+
self.lql_thread = threading.Thread(target=self.livestatus_broker.main_thread_run, name='lqlthread')
8282
self.lql_thread.start()
8383
t0 = time.time()
8484
# give some time for the thread to init and creates its listener socket(s) :

0 commit comments

Comments
 (0)