diff --git a/Collectors/DetailedCollector.py b/Collectors/DetailedCollector.py index d2fa31e..da47834 100755 --- a/Collectors/DetailedCollector.py +++ b/Collectors/DetailedCollector.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ A simple daemon for collecting monitoring packets from a remote XRootD @@ -22,6 +22,7 @@ class DetailedCollector(UdpCollector.UdpCollector): DEFAULT_PORT = 9930 + UDP_MON_PORT = 8000 def __init__(self, *args, **kw): @@ -30,12 +31,38 @@ def __init__(self, *args, **kw): self._servers = {} self._users = {} self._dictid_map = {} - self._exchange = self.config.get('AMQP', 'exchange') self._wlcg_exchange = self.config.get('AMQP', 'wlcg_exchange') + self.report_raw_data = not bool(self.config.get('AMQP', 'process_metrics')) self.last_flush = time.time() self.seq_data = {} + def addCacheRecord(self, event, hostname, addr, port): + + rec = {} + rec['timestamp'] = event.detach_t*1000 # Needed to be in ms + + rec['lfn'] = event.lfn + rec['access_cnt'] = event.access_cnt + rec['attach_t'] = event.attach_t + rec['detach_t'] = event.detach_t + rec['size'] = event.size + rec['blk_size'] = event.blk_size + rec['n_blks'] = event.n_blks + rec['n_blks_done'] = event.n_blks_done + rec['b_hit'] = event.b_hit + rec['b_miss'] = event.b_miss + rec['b_bypass'] = event.b_bypass + rec['hostname'] = hostname + rec['addr'] = addr + rec['port'] = port + rec['remotes'] = event.remotes + + self.publish("cache-event", rec, exchange=self._wlcg_exchange) + + self.logger.debug('Publishing Cache Event: {}'.format(str(rec))) + + def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime): """ Given information to create a record, send it up to the message queue. @@ -57,7 +84,10 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime): s = self._servers[sid] rec['serverID'] = sid rec['server'] = s.addr - rec['site'] = s.site.decode('utf-8') + if s.site: + rec['site'] = s.site.decode('utf-8') + else: + rec['site'] = "unknown" else: rec['server'] = addr # logger.warning('server still not identified: %s',sid) @@ -102,35 +132,38 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime): if transfer_key in self._transfers: f = self._transfers[transfer_key][1] fname = f.fileName.decode('utf-8') + #self.logger.debug('{}'.format(self._transfers[transfer_key])) rec['filename'] = fname rec['filesize'] = f.fileSize - rec['dirname1'] = "/".join(fname.split('/', 2)[:2]) - rec['dirname2'] = "/".join(fname.split('/', 3)[:3]) - if fname.startswith('/user'): - rec['logical_dirname'] = rec['dirname2'] - elif fname.startswith('/osgconnect/public'): - rec['logical_dirname'] = "/".join(fname.split('/', 4)[:4]) - elif fname.startswith('/hcc'): - rec['logical_dirname'] = "/".join(fname.split('/', 6)[:6]) - elif fname.startswith('/pnfs/fnal.gov/usr'): - rec['logical_dirname'] = "/".join(f.fileName.decode('utf-8').split('/')[:5]) - elif fname.startswith('/gwdata'): - rec['logical_dirname'] = rec['dirname2'] - elif fname.startswith('/chtc/'): - rec['logical_dirname'] = '/chtc' - elif fname.startswith('/icecube/'): - rec['logical_dirname'] = '/icecube' - - # Check for CMS files - elif fname.startswith('/store') or fname.startswith('/user/dteam'): - rec['logical_dirname'] = rec['dirname2'] - lcg_record = True - else: - rec['logical_dirname'] = 'unknown directory' + if not self.report_raw_data: + rec['dirname1'] = "/".join(fname.split('/', 2)[:2]) + rec['dirname2'] = "/".join(fname.split('/', 3)[:3]) + if fname.startswith('/user'): + rec['logical_dirname'] = rec['dirname2'] + elif fname.startswith('/osgconnect/public'): + rec['logical_dirname'] = "/".join(fname.split('/', 4)[:4]) + elif fname.startswith('/hcc'): + rec['logical_dirname'] = "/".join(fname.split('/', 6)[:6]) + elif fname.startswith('/pnfs/fnal.gov/usr'): + rec['logical_dirname'] = "/".join(f.fileName.decode('utf-8').split('/')[:5]) + elif fname.startswith('/gwdata'): + rec['logical_dirname'] = rec['dirname2'] + elif fname.startswith('/chtc/'): + rec['logical_dirname'] = '/chtc' + elif fname.startswith('/icecube/'): + rec['logical_dirname'] = '/icecube' + + # Check for CMS files + elif fname.startswith('/store') or fname.startswith('/user/dteam'): + rec['logical_dirname'] = rec['dirname2'] + lcg_record = True + else: + rec['logical_dirname'] = 'unknown directory' else: rec['filename'] = "missing directory" rec['filesize'] = "-1" - rec['logical_dirname'] = "missing directory" + if not self.report_raw_data: + rec['logical_dirname'] = "missing directory" rec['read'] = fileClose.read rec['readv'] = fileClose.readv rec['write'] = fileClose.write @@ -217,7 +250,7 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime): if not lcg_record: self.logger.debug("OSG record to send: %s", str(rec)) - self.publish("file-close", rec, exchange=self._exchange) + self.publish("file-close", rec, exchange=self._wlcg_exchange) self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'stashcache'}) else: wlcg_packet = wlcg_converter.Convert(rec) @@ -295,7 +328,8 @@ def process(self, data, addr, port): self.seq_data[sid][str_header_code] = header.pseq if header.code == b'f': - # self.logger.debug("Got fstream object") + # TODO break me out into support function + self.logger.debug("Got fstream object") time_record = decoding.MonFile(data) # first one is always TOD self.logger.debug(time_record) data = data[time_record.recSize:] @@ -355,11 +389,33 @@ def process(self, data, addr, port): self.logger.debug("r - redirect stream message.") elif header.code == b't': - #self.logger.warning("t - stream message. Server at %s should remove 'files', 'io', and " - # "'iov' directives from the monitoring configuration.", addr) + self.logger.warning("t - stream message. Server at %s should remove 'files', 'io', and " + "'iov' directives from the monitoring configuration.", addr) + self.logger.warning("{}".format(data)) pass + elif header.code == b'g': + self.logger.debug('cache header') + + infolen = len(data) - 4 + mm = decoding.mapheader._make(struct.unpack("!I" + str(infolen) + "s", data)) + + self.logger.debug(mm) + + cacheInfo = decoding.cacheInfo(mm.info) + + self.logger.debug('Debug Data: {}'.format(str(cacheInfo))) + + try: + hostname = socket.gethostbyaddr(addr)[0] + except: + hostname = 'unresolvable' + + for event in cacheInfo: + self.addCacheRecord(event, hostname, addr, port) + else: + self.logger.debug('Header Code is: {}'.format(header.code.decode('utf-8'))) infolen = len(data) - 4 mm = decoding.mapheader._make(struct.unpack("!I" + str(infolen) + "s", data)) try: @@ -383,8 +439,9 @@ def process(self, data, addr, port): elif header.code == b'd': path = rest - #self.logger.warning('Path information sent (%s). Server at %s should remove "files" ' - # 'directive from the monitoring configuration.', path, addr) + self.logger.warning('Path information sent (%s). Server at %s should remove "files" ' + 'directive from the monitoring configuration.', path, addr) + self.logger.debug('{}'.format(data)) elif header.code == b'i': appinfo = rest @@ -446,6 +503,7 @@ def process(self, data, addr, port): elif header.code == b'x': decoding.xfrInfo(rest) + self.logger.debug('Header is x') # transfer_key = str(sid) + "." + str(xfrInfo.fileID) # if transfer_key in AllTransfers: # cur_value = AllTransfers[transfer_key] @@ -453,6 +511,9 @@ def process(self, data, addr, port): # print "Adding xfrInfo" # print xfrInfo + else: + + self.logger.debug('Header is now: {}'.format(header.code.decode('utf-8'))) # Check if we have to flush the AllTransfer now_time = time.time() diff --git a/Collectors/SummaryCollector.py b/Collectors/SummaryCollector.py index bbbd06d..cfeb2e3 100755 --- a/Collectors/SummaryCollector.py +++ b/Collectors/SummaryCollector.py @@ -36,6 +36,7 @@ def __init__(self): # self.link_sfps =0 # Partial sendfile() operations. self.proc_usr = 0 self.proc_sys = 0 + self.xrootd_num = 0 self.xrootd_err = 0 self.xrootd_dly = 0 self.xrootd_rdr = 0 @@ -45,10 +46,17 @@ def __init__(self): self.ops_readv = 0 self.ops_sync = 0 self.ops_write = 0 + self.ops_getf = 0 + self.ops_misc = 0 + self.ops_rf = 0 + self.ops_rs = 0 self.lgn_num = 0 self.lgn_af = 0 self.lgn_au = 0 self.lgn_ua = 0 + self.aio_max = 0 + self.aio_num = 0 + self.aio_rej = 0 def prnt(self): @@ -68,12 +76,14 @@ def setDiff(attr_name, json_data, currState, prevState): class SummaryCollector(UdpCollector.UdpCollector): DEFAULT_PORT = 9931 - + UDP_MON_PORT = 8001 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._states = collections.defaultdict(lambda: collections.defaultdict(ProcessState)) + self.report_raw_data = not bool(self.config.get('AMQP', 'process_metrics')) + def process(self, data, addr, port): @@ -87,6 +97,9 @@ def process(self, data, addr, port): self.logger.exception("Unexpected error. Original data was: %s", data) return + self.logger.debug("Summary Data:\n{}".format(data)) + self.logger.debug("Summary XML:\n{}".format(summary)) + currState = ProcessState() statistics = summary['statistics'] # top level @@ -94,7 +107,7 @@ def process(self, data, addr, port): self.logger.debug("Program: %s", pgm) if pgm != 'xrootd': self.logger.warning("Program: %s should not be sending summary information. Source: %s", pgm, statistics['@src']) - return + #return tos = int(statistics['@tos']) # Unix time when the program was started. tod = int(statistics['@tod']) # Unix time when statistics gathering started. @@ -146,6 +159,7 @@ def process(self, data, addr, port): currState.proc_usr = int(st['usr']['s']) self.logger.debug("proc %s", st) elif sw == 'xrootd': + currState.xrootd_num = int(st['num']) currState.xrootd_err = int(st['err']) currState.xrootd_dly = int(st['dly']) currState.xrootd_rdr = int(st['rdr']) @@ -156,11 +170,19 @@ def process(self, data, addr, port): currState.ops_readv = int(ops['rv']) currState.ops_sync = int(ops['sync']) currState.ops_write = int(ops['wr']) + currState.ops_getf = int(ops['getf']) + currState.ops_misc = int(ops['misc']) + currState.ops_rf = int(ops['rf']) + currState.ops_rs = int(ops['rs']) lgn = st['lgn'] currState.lgn_num = int(lgn['num']) currState.lgn_af = int(lgn['af']) currState.lgn_au = int(lgn['au']) currState.lgn_ua = int(lgn['ua']) + aio = st['aio'] + currState.aio_max = int(aio['max']) + currState.aio_num = int(aio['num']) + currState.aio_rej = int(aio['rej']) self.logger.debug("xrootd %s", st) elif sw == 'sched': rmq_data['sched_in_queue'] = int(st['inq']) @@ -171,19 +193,171 @@ def process(self, data, addr, port): rmq_data['sgen_as'] = int(st['as']) # data['sgen_et'] = int(st['et']) # always 0 rmq_data['cend'] = datetime.utcfromtimestamp(float(st['toe'])).isoformat() + elif sw == 'cache': + self.logger.debug('Cache Data: {}'.format(str(st))) + self.logger.debug('Cache Opened: {}'.format(st['files']['opened'])) + + rmq_data['cache_type']= st['@type'] + rmq_data['cache_preread_in']= int(st['prerd']['in']) + rmq_data['cache_preread_hits']= int(st['prerd']['hits']) + rmq_data['cache_preread_miss']= int(st['prerd']['miss']) + rmq_data['cache_read_in']= int(st['rd']['in']) + rmq_data['cache_read_out']= int(st['rd']['out']) + if st['rd']['hits'][-1] == '>': + # Fix annoying bug where XRootD 5.0.3 at least returns a non STR object here... + # BAD XROOTD SERVER, BAD!!! + st['rd']['hits'] = st['rd']['hits'][:-1] + rmq_data['cache_read_hits']= int(st['rd']['hits']) + rmq_data['cache_read_miss']= int(st['rd']['miss']) + rmq_data['cache_read_nocache']= int(st['pass']['#text']) + rmq_data['cache_read_bypass']= int(st['pass']['cnt']) + rmq_data['cache_write_out']= int(st['wr']['out']) + rmq_data['cache_write_in']= int(st['wr']['updt']) + rmq_data['cache_write_frommem']= int(st['saved']) + rmq_data['cache_purge']= int(st['purge']) + rmq_data['cache_count_open']= int(st['files']['opened']) + rmq_data['cache_count_closed']= int(st['files']['closed']) + rmq_data['cache_count_new']= int(st['files']['new']) + rmq_data['cache_size']= int(st['store']['size']) + rmq_data['cache_used']= int(st['store']['used']) + rmq_data['cache_minuse']= int(st['store']['min']) + rmq_data['cache_maxuse']= int(st['store']['max']) + rmq_data['cache_mem_size']= int(st['mem']['size']) + rmq_data['cache_mem_used']= int(st['mem']['used']) + rmq_data['cache_mem_queued']= int(st['mem']['wq']) + rmq_data['cache_open_defertotal']= int(st['opcl']['odefer']) + rmq_data['cache_open_deferopen']= int(st['opcl']['defero']) + rmq_data['cache_close_defer']= int(st['opcl']['cdefer']) + rmq_data['cache_close_deferopen']= int(st['opcl']['clost']) + elif sw == 'ofs': - pass + + rmq_data['ofs_role']= st['role'] + rmq_data['ofs_bkgtask']= int(st['bxq']) + rmq_data['ofs_delay']= int(st['dly']) + rmq_data['ofs_errors']= int(st['err']) + rmq_data['ofs_handles']= int(st['han']) + rmq_data['ofs_count_posc']= int(st['opp']) + rmq_data['ofs_count_read']= int(st['opr']) + rmq_data['ofs_count_rw']= int(st['opw']) + rmq_data['ofs_count_redir']= int(st['rdr']) + rmq_data['ofs_count_reply']= int(st['rep']) + rmq_data['ofs_count_fail']= int(st['ser']) + rmq_data['ofs_count_succ']= int(st['sok']) + rmq_data['ofs_count_unposc']= int(st['ups']) + rmq_data['ofs_count_tpc_allow']= int(st['tpc']['grnt']) + rmq_data['ofs_count_tpc_deny']= int(st['tpc']['deny']) + rmq_data['ofs_count_tpc_error']= int(st['tpc']['err']) + rmq_data['ofs_count_tpc_expire']= int(st['tpc']['exp']) + + #pass # TODO: fixup this information # print 'ofs >>>',st - if hasPrev: + elif sw == "buff": + + rmq_data['buff_count_adj']= int(st['adj']) + rmq_data['buff_count_alloc']= int(st['buffs']) + rmq_data['buff_mem_size']= int(st['mem']) + rmq_data['buff_count_reqs']= int(st['reqs']) + + elif sw == "poll": + + rmq_data['poll_num_desc']= int(st['att']) + rmq_data['poll_count_enops']= int(st['en']) + rmq_data['poll_count_events']= int(st['ev']) + rmq_data['poll_count_badevt']= int(st['int']) + + elif sw == "proc": + + rmq_data['proc_systimesec']= int(st['sys']['s']) + rmq_data['proc_systimeusec']= int(st['sys']['us']) + rmq_data['proc_usrtimesec']= int(st['usr']['s']) + rmq_data['prox_usrtimeusec']= int(st['usr']['us']) + + elif sw == "oss": + + oss_path_num = int(getattr(st['paths'], '#text', '0')) + oss_space_num = int(getattr(st['space'], '#text', '0')) + + rmq_data['oss_num_paths']= oss_path_num + rmq_data['oss_num_spaces']= oss_space_num + + i=0 + for _path in getattr(st['paths'], 'stats', []): + rmq_data['oss_path_'+str(i)+'_total_size']= int(_path['tot']) + rmq_data['oss_path_'+str(i)+'_total_inode']= int(_path['ino']) + rmq_data['oss_path_'+str(i)+'_free_size']= int(_path['free']) + rmq_data['oss_path_'+str(i)+'_free_inode']= int(_path['ifr']) + rmq_data['oss_path_'+str(i)+'_path_logical']= _path['lp'] + rmq_data['oss_path_'+str(i)+'_path_real']= _path['rp'] + i=i+1 + + i=0 + for _path in getattr(st['space'], 'stats', []): + rmq_data['oss_space_'+str(i)+'_total_alloc']= int(_path['tot']) + rmq_data['oss_space_'+str(i)+'_total_quota']= int(_path['qta']) + rmq_data['oss_space_'+str(i)+'_max_extant']= int(_path['maxf']) + rmq_data['oss_space_'+str(i)+'_num_fsn']= int(_path['fsn']) + rmq_data['oss_space_'+str(i)+'_free']= int(_path['free']) + rmq_data['oss_space_'+str(i)+'_name_usg']= int(_path['usg']) + rmq_data['oss_space_'+str(i)+'_name']= _path['name'] + i=i+1 + + elif sw == "cmsm": + + rmq_data['cmsm_role'] = st['role'] + rmq_data['cmsm_sel_count_t'] = st['sel']['t'] + rmq_data['cmsm_sel_count_r'] = st['sel']['r'] + rmq_data['cmsm_sel_count_w'] = st['sel']['w'] + + if 'paths' not in st: + rmq_data['cmsm_num_nodes'] = 0 + continue + + num_nodes = int(getattr(st['paths'], '#text', '0')) + rmq_data['cmsm_num_nodes'] = num_nodes + + i=0 + for i in range(0, num_nodes): + rmq_data['cmsm_'+str(i)+'_host']= st['host'] + rmq_data['cmsm_'+str(i)+'_role']= st['role'] + rmq_data['cmsm_'+str(i)+'_run_status']= st['run'] + if 'ref' in st: + rmq_data['cmsm_'+str(i)+'_count_ref_r']= int(st['ref']['r']) + rmq_data['cmsm_'+str(i)+'_count_ref_w']= int(st['ref']['w']) + if 'shr' in st: + rmq_data['cmsm_'+str(i)+'_count_shr_req']= int(getattr(st['shr'], '#text', 0)) + rmq_data['cmsm_'+str(i)+'_count_shr_exh']= int(getattr(st['shr'], 'use', 0)) + i=i+1 + + if 'frq' in st: + self.logger.warning("Need Parser for: cmsm.frq !") + + elif sw == "pss": + + rmq_data['pss_count_open']= int(st['open']['#text']) + rmq_data['pss_count_open_err']= int(st['open']['errs']) + rmq_data['pss_count_close']= int(st['close']['#text']) + rmq_data['pss_count_close_err']= int(st['close']['errs']) + + elif sw == "dpmoss": + + # Site is using DPM... not much to see here as of DPM 1.14.3... + pass + + else: + self.logger.warning("Need Parser for: {}".format(str(sw))) + + if hasPrev and not self.report_raw_data: if currState.tod < previousState.tod: self.logger.warning("UDP packet came out of order; skipping the message.") return for attr in ['link_total', 'link_in', 'link_out', 'link_ctime', 'link_tmo', 'proc_usr', 'proc_sys', 'ops_open', 'ops_preread', 'ops_read', - 'ops_readv', 'ops_sync', 'ops_write']: + 'ops_readv', 'ops_sync', 'ops_write', 'aio_max', 'aio_num', 'aio_rej', + 'ops_getf', 'ops_rf', 'ops_rs', 'ops_misc']: setDiff(attr, rmq_data, currState, previousState) # data['link_stall'] = currState.link_stall - previousState.link_stall @@ -198,8 +372,26 @@ def process(self, data, addr, port): rmq_data['unauthenticated_successes'] = currState.lgn_ua - previousState.lgn_ua self.publish('summary', rmq_data) + elif self.report_raw_data: + + for attr in ['link_total', 'link_in', 'link_out', 'link_ctime', 'link_tmo', + 'proc_usr', 'proc_sys', 'ops_open', 'ops_preread', 'ops_read', + 'ops_readv', 'ops_sync', 'ops_write', 'aio_max', 'aio_num', 'aio_rej', + 'ops_getf', 'ops_rf', 'ops_rs', 'ops_misc']: + rmq_data[attr] = getattr(currState, attr) + + rmq_data['xrootd_errors'] = currState.xrootd_err + rmq_data['xrootd_delays'] = currState.xrootd_dly + rmq_data['xrootd_redirections'] = currState.xrootd_rdr + rmq_data['login_attempts'] = currState.lgn_num + rmq_data['authentication_failures'] = currState.lgn_af + rmq_data['authentication_successes'] = currState.lgn_au + rmq_data['unauthenticated_successes'] = currState.lgn_ua + self.publish('summary', rmq_data) + self._states[addr][pid] = currState if __name__ == '__main__': SummaryCollector.main() + diff --git a/Collectors/UdpCollector.py b/Collectors/UdpCollector.py index 711cc70..9d1a4e4 100755 --- a/Collectors/UdpCollector.py +++ b/Collectors/UdpCollector.py @@ -58,24 +58,30 @@ class UdpCollector(object): DEFAULT_HOST = '0.0.0.0' DEFAULT_PORT = None + UDP_MON_PORT = 8000 - def __init__(self, config, bind_addr): + def __init__(self, config, bind_addr, udp_mon_port): self.channel = None self.bind_addr = bind_addr self.socks = [] self.config = config self.message_q = None self.child_process = None - self.exchange = config.get('AMQP', 'exchange') + self.exchange = config.get('AMQP', 'wlcg_exchange') self.metrics_q = None + self.udp_mon_port = udp_mon_port def _create_rmq_channel(self): """ Create a fresh connection to RabbitMQ """ parameters = pika.URLParameters(self.config.get('AMQP', 'url')) - connection = pika.BlockingConnection(parameters) + try: + connection = pika.BlockingConnection(parameters) + except pika.exceptions.AMQPConnectionError: + self.logger.exception('Failed to connect to AMQP URL: "{}"'.format(self.config.get('AMQP', 'url'))) + raise self.channel = connection.channel() @@ -92,6 +98,7 @@ def publish(self, routing_key, record: dict, retry=True, exchange=None): except Exception: if retry: self.logger.exception('Error while sending rabbitmq message; will recreate connection and retry') + # TODO decide if want to handle buffering of messages until queue comes back? self._create_rmq_channel() self.publish(routing_key, record, retry=False, exchange=exchange) @@ -132,7 +139,7 @@ def _launch_child(self): def _launch_metrics(self): # Metrics process - self.metrics_process = multiprocessing.Process(target=self._metrics_child, args=(self.metrics_q,)) + self.metrics_process = multiprocessing.Process(target=self._metrics_child, args=(self.metrics_q, self.udp_mon_port)) self.metrics_process.name = "Collector metrics thread" self.metrics_process.daemon = True orig_stdout = sys.stdout @@ -198,6 +205,8 @@ def start(self): if sock in rlist: message, addr = sock.recvfrom(65536) + self.logger.debug('Message From: {}'.format(str(addr))) + self.message_q.put([message, addr[0], addr[1]]) n_messages += 1 if n_messages % 10000 == 0: @@ -213,9 +222,19 @@ def start(self): @classmethod def _start_child(Collector, config, message_q, metrics_q): - coll = Collector(config, (Collector.DEFAULT_HOST, Collector.DEFAULT_PORT)) + coll = Collector(config, (Collector.DEFAULT_HOST, Collector.DEFAULT_PORT), Collector.UDP_MON_PORT) coll._init_logging() - coll._create_rmq_channel() + try: + coll._create_rmq_channel() + except pika.exceptions.AMQPConnectionError: + connectionEstablished=False + while not conectionEstablished: + try: + coll._create_rmq_channel() + connectionEstablished=True + except pika.exceptions.AMQPConnectionError: + time.sleep(2) + pass coll.message_q = message_q coll.metrics_q = metrics_q while True: @@ -229,10 +248,10 @@ def _start_child(Collector, config, message_q, metrics_q): @staticmethod - def _metrics_child(metrics_q): + def _metrics_child(metrics_q, monitor_port): # Start the prometheus HTTP client - start_http_server(8000) + start_http_server(monitor_port) missing_counter = Counter('missing_packets', 'Number of missing packets', ['host']) messages = Counter('messages', 'Number of messages') packets = Counter('packets', 'Number of packets') @@ -318,6 +337,8 @@ def main(Collector, port=None, host=None): if host is None: host = Collector.DEFAULT_HOST + udp_mon_port = Collector.UDP_MON_PORT + parser = argparse.ArgumentParser() parser.add_argument("config", nargs=1, help="Location of configuration file.") args = parser.parse_args() @@ -325,9 +346,10 @@ def main(Collector, port=None, host=None): config = configparser.ConfigParser() config.read(args.config[0]) - coll = Collector(config, bind_addr=(host, port)) + coll = Collector(config, bind_addr=(host, port), udp_mon_port=udp_mon_port) try: coll.start() except KeyboardInterrupt: coll.orig_stderr.write("Exiting on keyboard interrupt...\n") sys.exit(1) + diff --git a/Collectors/decoding.py b/Collectors/decoding.py index e352297..06666cd 100644 --- a/Collectors/decoding.py +++ b/Collectors/decoding.py @@ -1,5 +1,6 @@ from collections import namedtuple +import json import requests import struct import sys @@ -10,6 +11,7 @@ userid = namedtuple("userid", ["protocol", "username", "pid", "sid", "host"]) authinfo = namedtuple("authinfo", ["ap", "dn", "hn", "on", "rn", "gn", "info", 'execname', 'moninfo', "inetv"]) +cacheinfo = namedtuple("cacheinfo", ["event", "lfn", "size", "blk_size", "n_blks", "n_blks_done", "access_cnt", "attach_t", "detach_t", "b_hit", "b_miss", "b_bypass", "remotes"]) srvinfo = namedtuple("srvinfo", ["program", "version", "instance", "port", "site", "addr"]) prginfo = namedtuple("prginfo", ["xfn", "tod", "sz", "at", "ct", "mt", "fn"]) xfrinfo = namedtuple("xfrinfo", ["lfn", "tod", "sz", "tm", "op", "rc", "pd"]) @@ -22,7 +24,22 @@ ops = namedtuple("ops", ["read", "readv", "write", "rsMin", "rsMax", "rsegs", "rdMin", "rdMax", "rvMin", "rvMax", "wrMin", "wrMax"]) +def cacheInfo(message): + #print('cacheInfo message: {}'.format(message)) + this_msg = str(message) + datum=[] + #print('msg_len: {}'.format(len(this_msg.split("\\n")))) + for msg in this_msg.split("\\n"): + start_json=msg.rfind('{') + end_json=msg.rfind('}') + json_content=msg[start_json:end_json+1] + #print('Json_Content: {}'.format(json_content)) + datum.append(cacheinfo(**(json.loads(json_content)))) + return datum + + def userInfo(message): + #print('userInfo message: {}'.format(message)) c = message if b'/' in message: prot, c = message.split(b'/', 1) @@ -42,6 +59,7 @@ def userInfo(message): print("serious value error: ", pid, sid, "message was:", message) return userid(prot, user, pi, si, host) + def revUserInfo(useridStruct): return str.encode("{}/{}.{}:{}@{}".format(useridStruct.protocol, useridStruct.username, useridStruct.pid, useridStruct.sid, useridStruct.host)) diff --git a/Collectors/logging.conf b/Collectors/logging.conf index 5d3e507..506ffd5 100644 --- a/Collectors/logging.conf +++ b/Collectors/logging.conf @@ -13,11 +13,11 @@ datefmt= [logger_root] -level=INFO +level=DEBUG handlers=consoleHandler [logger_SummaryCollector] -level=INFO +level=DEBUG handlers=consoleHandler, SummaryFileHandler qualname=SummaryCollector propagate=0 @@ -30,7 +30,7 @@ qualname=DetailedCollector propagate=0 [logger_RedirectorCollector] -level=INFO +level=DEBUG handlers=consoleHandler, RedirectorFileHandler qualname=RedirectorCollector propagate=0 @@ -45,7 +45,7 @@ args=(sys.stdout,) [handler_SummaryFileHandler] class=FileHandler -level=INFO +level=DEBUG formatter=simpleFormatter args=('summary.log',) @@ -57,6 +57,6 @@ args=('detailed.log', 'a',100000000, 5,) [handler_RedirectorFileHandler] class=FileHandler -level=INFO +level=DEBUG formatter=simpleFormatter args=('redirector.log',) diff --git a/Dockerfile b/Dockerfile index 8c766b4..d2b07d0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,8 +5,11 @@ RUN pip install -r /requirements.txt COPY Collectors /app COPY configs /configs +COPY /scripts/run.sh /app/run.sh WORKDIR /app EXPOSE 9930/udp +EXPOSE 9931/udp EXPOSE 8000/tcp -CMD [ "/app/DetailedCollector.py", "/configs/connection.conf" ] +EXPOSE 8001/tcp +CMD [ "/app/run.sh" ] diff --git a/configs/connection.conf b/configs/connection.conf index a1ce119..663cc8f 100644 --- a/configs/connection.conf +++ b/configs/connection.conf @@ -1,12 +1,13 @@ [AMQP] - # Host information -url = amqps://username:password@example.com - -# Exchange to write to -exchange = xrd.detailed +url = amqp://rabbit_user:rabbit_pass@rabbitmq.elk:5672 # Exchange to write WLCG formatted -wlcg_exchange = xrd.wlcg_format +wlcg_exchange = xrd.detailed + +# Should we attempt to parse metrics here? +# True: attempt to report changes +# False: report absolute values and let monitoring/graphing tool deal with changes +process_metrics = False diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 0000000..2e3d313 --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1,43 @@ +#!/bin/sh + +# Entry point to run both the detailed and summary collectors in the same Docker container + +DetailedLog=/var/log/xrootdcollector/DetailedCollector.log +SummaryLog=/var/log/xrootdcollector/SummaryCollector.log + +mkdir -p /var/log/xrootdcollector + +echo "Starting Detailed Collector and pushing log files to: ${DetailedLog}" + +/app/DetailedCollector.py /configs/connection.conf 2>&1 | tee ${DetailedLog} & + +echo "Starting Summary Collector and pushing log files to: ${SummaryLog}" + +/app/SummaryCollector.py /configs/connection.conf 2>&1 | tee ${SummaryLog} & + + +# Based on the recommendation on Docker web-pages to identify commands which +# should be continuously running which we want to auto-restart should they break +# The difference comes from the actual 'command' 'ps c' which identifies a Python script +# Keep checking every 20s which is probably 'good enough' +while sleep 20; do + + ps acux | grep 'DetailedCollector' | grep -q -v grep + PROCESS_1_STATUS=$? + ps acux | grep 'SummaryCollector' | grep -q -v grep + PROCESS_2_STATUS=$? + + if [ $PROCESS_1_STATUS -ne 0 ]; then + echo "Detailed Collector died!" + echo "Restarting Detailed Collector" + /app/DetailedCollector.py /configs/connection.conf 2>&1 | tee ${DetailedLog} & + fi + + if [ $PROCESS_2_STATUS -ne 0 ]; then + echo "Summary Collector died!" + echo "Restarting Summary Collector" + /app/SummaryCollector.py /configs/connection.conf 2>&1 | tee ${SummaryLog} & + fi + +done +