From 4f1c084f1f24ea311a79cbe504bf8d6c4679c7e3 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Thu, 24 Mar 2022 16:50:38 +0100 Subject: [PATCH 01/11] Add first version for STOMP connection --- Collectors/UdpCollector.py | 203 +++++++++++++++++++++++++++++-------- README.md | 10 +- connection.sample.conf | 17 ++++ requirements.txt | 1 + 4 files changed, 182 insertions(+), 49 deletions(-) diff --git a/Collectors/UdpCollector.py b/Collectors/UdpCollector.py index 077e479..af3795e 100755 --- a/Collectors/UdpCollector.py +++ b/Collectors/UdpCollector.py @@ -14,6 +14,7 @@ import multiprocessing import os import queue +import random import socket import sys import time @@ -24,6 +25,7 @@ from prometheus_client import start_http_server, Counter, Gauge import pika +import stomp class _LoggerWriter(object): @@ -54,6 +56,32 @@ def flush(self): """ # self.level() +class StompyListener(stomp.ConnectionListener): + def __init__(self, message_q, metrics_q): + self.message_q = message_q + self.metrics_q = metrics_q + + def on_error(self, frame): + self.logger.error('Received an error "%s"' % frame.body) + + def on_message(self, headers, body): + # Parse the JSON message + loaded_json = json.loads(body) + + # Base64 decode the data + message = base64.standard_b64decode(loaded_json['data']) + + # Send to message queue + # Get the address and port + addr = loaded_json['remote'].rsplit(":", 1) + self.message_q.put([message, addr[0], addr[1]]) + + # Update the number of messages received on the bus to the metrics_q + self.metrics_q.put({'type': 'pushed messages', 'count': 1}) + + def on_disconnected(self): + self.logger.debug('disconnected') + class UdpCollector(object): @@ -68,9 +96,26 @@ def __init__(self, config, bind_addr): self.config = config self.message_q = None self.child_process = None - self.exchange = config.get('AMQP', 'exchange') self.metrics_q = None + self.protocol = config.get("protocol") + if self.protocol == 'AMQP': + self.exchange = config.get('AMQP', 'exchange') + elif self.protocol == 'STOMP': + self.UDP_queue = config.get('STOMP', 'UDPQueue') + self.topic = config.get('STOMP', 'topic') + self.connections = [] + else: + self.logger.exception('Error while loading configuration, selected protocol is not valid') + + + def _create_mq_channel(self): + if self.protocol == 'AMQP': + self._create_rmq_channel() + elif self.protocol == 'STOMP': + self._create_amq_channel() + + def _create_rmq_channel(self): """ Create a fresh connection to RabbitMQ @@ -84,22 +129,89 @@ def _create_rmq_channel(self): self.logger.exception('Error while connecting rabbitmq message;') print(e) + def _get_stomp_connection_objects(self): + connections = [] + try: + # Following advice from messaging team URL should be translated into all possible + # hosts behind the alias + mb_alias = self.config.get('STOMP', 'url') + port = self.config.get('STOMP', 'port') + + # Get the list of IPs behind the alias + hosts = socket.gethostbyname_ex(mb_alias)[2] + + host_and_ports = map(lambda x: (x, port), hosts) + username = self.config.get('STOMP', 'username') + password = self.config.get('STOMP', 'password') + + # Create a connection to each broker available + for host_and_port in host_and_ports: + connection = stomp.Connection(host_and_ports=[host_and_port]) + connection.set_listener('StompyListener', StompyListener([], [])) + self.connections.append(connection) + + except Exception as e: + self.logger.exception('Error while connecting to AMQ message;') + print(e) + + return connections + + + def _create_amq_channel(self): + """ + Create a fresh connection to AMQ + """ + for connection in self.connections: + if connection.is_connected(): + connection.disconnect() + # Clean up the connection arrays + self.connections = [] + self.connections = self._get_stomp_connection_objects() + def publish(self, routing_key, record: dict, retry=True, exchange=None): if exchange is None: exchange = self.exchange - try: - self.channel.basic_publish(exchange, - routing_key, - json.dumps(record), - pika.BasicProperties(content_type='application/json', - delivery_mode=pika.spec.TRANSIENT_DELIVERY_MODE)) - except Exception: - if retry: - self.logger.exception('Error while sending rabbitmq message; will recreate connection and retry') - self._create_rmq_channel() - self.publish(routing_key, record, retry=False, exchange=exchange) + if self.protocol == 'AMQP': + try: + self.channel.basic_publish(exchange, + routing_key, + json.dumps(record), + pika.BasicProperties(content_type='application/json', + delivery_mode=pika.spec.TRANSIENT_DELIVERY_MODE)) + except Exception: + if retry: + self.logger.exception('Error while sending rabbitmq message; will recreate connection and retry') + self._create_rmq_channel() + self.publish(routing_key, record, retry=False, exchange=exchange) + elif self.protocol == 'STOMP': + try: + random.choice(self.connection).send(self.topic, + json.dumps(record), + headers={'content_type': 'application/json'}) + except Exception: + if retry: + sent = False + for connection in self.connections: + if sent: + # Message sent we are done + break + else: + try: + connection.send(self.topic, + json.dumps(record), + headers={'content_type': 'application/json'}) + sent = True + except Exception: + # Just keep trying other connections + pass + # Recreate the connections anyhow to keep them healthy + self._create_amq_channel() + # If it was not sent at all We better retry + if not sent: + self.logger.exception('Error while sending amq message; will recreate connection and retry') + self.publish(routing_key, record, retry=False, exchange=exchange) def _init_logging(self): @@ -242,7 +354,7 @@ def start(self): def _start_child(Collector, config, message_q, metrics_q): coll = Collector(config, (Collector.DEFAULT_HOST, Collector.DEFAULT_PORT)) coll._init_logging() - coll._create_rmq_channel() + coll._create_mq_channel() coll.message_q = message_q coll.metrics_q = metrics_q while True: @@ -257,41 +369,52 @@ def _start_child(Collector, config, message_q, metrics_q): @classmethod def _bus_child(self, config, message_q: multiprocessing.Queue, metrics_q: multiprocessing.Queue): - # Setup the connection to the message bus - parameters = pika.URLParameters(config.get('AMQP', 'url')) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() + if self.protocol == 'AMQP': + # Setup the connection to the message bus + parameters = pika.URLParameters(config.get('AMQP', 'url')) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() - def on_message(channel, method, properties, body): - # Parse the JSON message - loaded_json = json.loads(body) + def on_message(channel, method, properties, body): + # Parse the JSON message + loaded_json = json.loads(body) - # Base64 decode the data - message = base64.standard_b64decode(loaded_json['data']) + # Base64 decode the data + message = base64.standard_b64decode(loaded_json['data']) - # Send to message queue - # Get the address and port - addr = loaded_json['remote'].rsplit(":", 1) - message_q.put([message, addr[0], addr[1]]) - channel.basic_ack(method.delivery_tag) + # Send to message queue + # Get the address and port + addr = loaded_json['remote'].rsplit(":", 1) + message_q.put([message, addr[0], addr[1]]) + channel.basic_ack(method.delivery_tag) - # Update the number of messages received on the bus to the metrics_q - metrics_q.put({'type': 'pushed messages', 'count': 1}) + # Update the number of messages received on the bus to the metrics_q + metrics_q.put({'type': 'pushed messages', 'count': 1}) - channel.basic_qos(prefetch_count=1000) - channel.basic_consume(config.get("Pushed", "push_queue"), on_message) + channel.basic_qos(prefetch_count=1000) + channel.basic_consume(config.get("Pushed", "push_queue"), on_message) - try: - channel.start_consuming() - except Exception as ex: - channel.stop_consuming() + try: + channel.start_consuming() + except Exception as ex: + channel.stop_consuming() + connection.close() + raise ex connection.close() - raise ex - connection.close() - - + elif self.protocol == 'STOMP': + connections = self._get_stomp_connection_objects() + for connection in connections: + try: + connection.subscribe(self.UDP_queue, + id='XrootDCollector') + except Exception as ex: + if connection.is_connected(): + connection.close() + raise ex + + if connection.is_connected(): + connection.close() - @staticmethod def _metrics_child(metrics_q): diff --git a/README.md b/README.md index 735817b..8b982ad 100644 --- a/README.md +++ b/README.md @@ -10,15 +10,7 @@ record into a AMQP-based message bus. Configuration ------------- -The DetailedCollector needs the a configuration in order to connect to the AMQP message bus. - - [AMQP] - - # Host information - url = amqps://username:password@example.com - - # Exchange to write to - exchange = xrd.detailed +The DetailedCollector needs the a configuration in order to connect to the MQ message bus, please check connection.sample.conf file in this repository. This file is named `connection.conf` and should be in the Collectors directory or deployed with docker volumes, as shown below. diff --git a/connection.sample.conf b/connection.sample.conf index e0856df..5f16c09 100644 --- a/connection.sample.conf +++ b/connection.sample.conf @@ -1,3 +1,5 @@ +protocol = STOMP + [AMQP] @@ -31,3 +33,18 @@ wlcg_exchange_tpc = xrd.tpc.wlcg enable = true push_queue = xrd.push +[STOMP] + +# Host information +url = mb.cern.ch +port = port + +# User information +username = username +password = password + +# Topic to write to +topic = /topic/xrootd.transfer + +# Queue to subscribe for XrootD UDP messages +UDPQueue = /queue/monitprod.xrootd.shoveler diff --git a/requirements.txt b/requirements.txt index 4a075cd..0267f3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ requests six xmltodict prometheus_client +stomp.py \ No newline at end of file From e71e4cca3b530d44a3b7424bddae7de304aa84a7 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Tue, 5 Apr 2022 15:05:41 +0200 Subject: [PATCH 02/11] Add self healing to stomp subscription --- Collectors/UdpCollector.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/Collectors/UdpCollector.py b/Collectors/UdpCollector.py index af3795e..6552038 100755 --- a/Collectors/UdpCollector.py +++ b/Collectors/UdpCollector.py @@ -97,6 +97,7 @@ def __init__(self, config, bind_addr): self.message_q = None self.child_process = None self.metrics_q = None + self.wlcg = config.get("wlcg") self.protocol = config.get("protocol") if self.protocol == 'AMQP': @@ -129,7 +130,12 @@ def _create_rmq_channel(self): self.logger.exception('Error while connecting rabbitmq message;') print(e) - def _get_stomp_connection_objects(self): + def _get_stomp_connection_objects(self, connections=[]): + # If we had connection already, make sure to close them + for connection in connections: + if connection.is_connected(): + connection.close() + connections = [] try: # Following advice from messaging team URL should be translated into all possible @@ -402,18 +408,19 @@ def on_message(channel, method, properties, body): raise ex connection.close() elif self.protocol == 'STOMP': - connections = self._get_stomp_connection_objects() - for connection in connections: - try: - connection.subscribe(self.UDP_queue, - id='XrootDCollector') - except Exception as ex: - if connection.is_connected(): - connection.close() - raise ex - - if connection.is_connected(): - connection.close() + subscribed = False + + while(True): + if not subscribed: + connections = self._get_stomp_connection_objects(connections) + for connection in connections: + connection.subscribe(self.UDP_queue, + id='XrootDCollector') + subscribed = True + # Check every 5 minutes if all the connection are alive, otherwise reconnect + time.sleep(300) + if not all(connection.is_connected() for connection in connections): + subscribed = False @staticmethod From 0120c615d2e73d6f3e5e290ec516943c374193f6 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Tue, 5 Apr 2022 15:06:16 +0200 Subject: [PATCH 03/11] Add configuration parameter for WLCG mode --- Collectors/DetailedCollector.py | 2 +- connection.sample.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Collectors/DetailedCollector.py b/Collectors/DetailedCollector.py index 3a82a4e..6a9a1e9 100755 --- a/Collectors/DetailedCollector.py +++ b/Collectors/DetailedCollector.py @@ -273,7 +273,7 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime, fileToClo if 'filename' not in rec or rec['filename'] == "missing directory": self.metrics_q.put({'type': 'failed filename', 'count': 1}) - if not lcg_record: + if not self.wlcg and not lcg_record: self.logger.debug("OSG record to send: %s", str(rec)) self.publish("file-close", rec, exchange=self._exchange) self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'stashcache'}) diff --git a/connection.sample.conf b/connection.sample.conf index 5f16c09..40d86a0 100644 --- a/connection.sample.conf +++ b/connection.sample.conf @@ -1,8 +1,8 @@ protocol = STOMP +wlcg = true [AMQP] - # Host information url = amqps://username:password@example.com From 376184c6597e6d1170e5763a267a206ca6764d6d Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Wed, 6 Apr 2022 14:45:14 +0200 Subject: [PATCH 04/11] Add stomp helper to arrange code better --- Collectors/UdpCollector.py | 97 ++++++++++---------------------------- Collectors/stomp_helper.py | 57 ++++++++++++++++++++++ 2 files changed, 83 insertions(+), 71 deletions(-) create mode 100644 Collectors/stomp_helper.py diff --git a/Collectors/UdpCollector.py b/Collectors/UdpCollector.py index 6552038..406c3a5 100755 --- a/Collectors/UdpCollector.py +++ b/Collectors/UdpCollector.py @@ -26,6 +26,7 @@ import pika import stomp +from stomp_helper import get_stomp_connection_objects class _LoggerWriter(object): @@ -56,32 +57,6 @@ def flush(self): """ # self.level() -class StompyListener(stomp.ConnectionListener): - def __init__(self, message_q, metrics_q): - self.message_q = message_q - self.metrics_q = metrics_q - - def on_error(self, frame): - self.logger.error('Received an error "%s"' % frame.body) - - def on_message(self, headers, body): - # Parse the JSON message - loaded_json = json.loads(body) - - # Base64 decode the data - message = base64.standard_b64decode(loaded_json['data']) - - # Send to message queue - # Get the address and port - addr = loaded_json['remote'].rsplit(":", 1) - self.message_q.put([message, addr[0], addr[1]]) - - # Update the number of messages received on the bus to the metrics_q - self.metrics_q.put({'type': 'pushed messages', 'count': 1}) - - def on_disconnected(self): - self.logger.debug('disconnected') - class UdpCollector(object): @@ -97,13 +72,12 @@ def __init__(self, config, bind_addr): self.message_q = None self.child_process = None self.metrics_q = None - self.wlcg = config.get("wlcg") + self.wlcg = config.get('DEFAULT', 'wlcg') - self.protocol = config.get("protocol") + self.protocol = config.get('DEFAULT', 'protocol') if self.protocol == 'AMQP': self.exchange = config.get('AMQP', 'exchange') elif self.protocol == 'STOMP': - self.UDP_queue = config.get('STOMP', 'UDPQueue') self.topic = config.get('STOMP', 'topic') self.connections = [] else: @@ -130,49 +104,21 @@ def _create_rmq_channel(self): self.logger.exception('Error while connecting rabbitmq message;') print(e) - def _get_stomp_connection_objects(self, connections=[]): - # If we had connection already, make sure to close them - for connection in connections: - if connection.is_connected(): - connection.close() - - connections = [] - try: - # Following advice from messaging team URL should be translated into all possible - # hosts behind the alias - mb_alias = self.config.get('STOMP', 'url') - port = self.config.get('STOMP', 'port') - - # Get the list of IPs behind the alias - hosts = socket.gethostbyname_ex(mb_alias)[2] - - host_and_ports = map(lambda x: (x, port), hosts) - username = self.config.get('STOMP', 'username') - password = self.config.get('STOMP', 'password') - - # Create a connection to each broker available - for host_and_port in host_and_ports: - connection = stomp.Connection(host_and_ports=[host_and_port]) - connection.set_listener('StompyListener', StompyListener([], [])) - self.connections.append(connection) - - except Exception as e: - self.logger.exception('Error while connecting to AMQ message;') - print(e) - - return connections - def _create_amq_channel(self): """ Create a fresh connection to AMQ """ - for connection in self.connections: - if connection.is_connected(): - connection.disconnect() - # Clean up the connection arrays - self.connections = [] - self.connections = self._get_stomp_connection_objects() + try: + mb_alias = self.config.get('STOMP', 'url') + port = self.config.get('STOMP', 'port') + username = self.config.get('STOMP', 'username') + password = self.config.get('STOMP', 'password') + self.connections = get_stomp_connection_objects(mb_alias, port, + username, password, + self.connections) + except Exception: + self.logger("Something bad happened") def publish(self, routing_key, record: dict, retry=True, exchange=None): @@ -375,7 +321,8 @@ def _start_child(Collector, config, message_q, metrics_q): @classmethod def _bus_child(self, config, message_q: multiprocessing.Queue, metrics_q: multiprocessing.Queue): - if self.protocol == 'AMQP': + protocol = config.get('DEFAULT', 'protocol') + if protocol == 'AMQP': # Setup the connection to the message bus parameters = pika.URLParameters(config.get('AMQP', 'url')) connection = pika.BlockingConnection(parameters) @@ -407,14 +354,22 @@ def on_message(channel, method, properties, body): connection.close() raise ex connection.close() - elif self.protocol == 'STOMP': + elif protocol == 'STOMP': + mb_alias = config.get('STOMP', 'url') + port = config.get('STOMP', 'port') + username = config.get('STOMP', 'username') + password = config.get('STOMP', 'password') subscribed = False + connections = [] while(True): if not subscribed: - connections = self._get_stomp_connection_objects(connections) + connections = get_stomp_connection_objects(mb_alias, port, + username, password, + connections, + message_q, metrics_q) for connection in connections: - connection.subscribe(self.UDP_queue, + connection.subscribe(config.get('STOMP', 'UDPQueue'), id='XrootDCollector') subscribed = True # Check every 5 minutes if all the connection are alive, otherwise reconnect diff --git a/Collectors/stomp_helper.py b/Collectors/stomp_helper.py new file mode 100644 index 0000000..3935beb --- /dev/null +++ b/Collectors/stomp_helper.py @@ -0,0 +1,57 @@ +import base64 +import json +import logging +import multiprocessing +import socket +import stomp + + +class StompyListener(stomp.ConnectionListener): + def __init__(self, message_q: multiprocessing.Queue, metrics_q: multiprocessing.Queue): + self.message_q = message_q + self.metrics_q = metrics_q + + def on_error(self, frame): + logging.error('Received an error "%s"' % frame.body) + + def on_message(self, headers, body): + # Parse the JSON message + loaded_json = json.loads(body) + + # Base64 decode the data + message = base64.standard_b64decode(loaded_json['data']) + + # Send to message queue + # Get the address and port + addr = loaded_json['remote'].rsplit(":", 1) + self.message_q.put([message, addr[0], addr[1]]) + + # Update the number of messages received on the bus to the metrics_q + self.metrics_q.put({'type': 'pushed messages', 'count': 1}) + + def on_disconnected(self): + logging.debug('disconnected') + + +def get_stomp_connection_objects(mb_alias, port, username, password, + connections=[], message_q=[], metrics_q=[]): + # If we had connection already, make sure to close them + for connection in connections: + if connection.is_connected(): + connection.close() + + connections = [] + # Following advice from messaging team URL should be translated into all possible + # hosts behind the alias + # Get the list of IPs behind the alias + hosts = socket.gethostbyname_ex(mb_alias)[2] + + host_and_ports = map(lambda x: (x, port), hosts) + # Create a connection to each broker available + for host_and_port in host_and_ports: + connection = stomp.Connection(host_and_ports=[host_and_port]) + connection.set_listener('StompyListener', StompyListener(message_q, metrics_q)) + connection.connect(username=username, passcode=password, wait=True) + connections.append(connection) + + return connections \ No newline at end of file From 6e91487bbba457e0eb431c7c87521409063b4dbe Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Wed, 6 Apr 2022 14:48:09 +0200 Subject: [PATCH 05/11] Update Dockerfile for it to work with stomp.py dependencies --- Dockerfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Dockerfile b/Dockerfile index 683d9e0..9018482 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,8 @@ FROM python:3-alpine COPY requirements.txt / +RUN apk update && apk add python3-dev gcc libc-dev libffi-dev +RUN python3 -m pip install --upgrade pip setuptools wheel RUN pip install -r /requirements.txt COPY Collectors /app From 1fa95c46f50f45f5816d98946e2803940e4d6072 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Tue, 26 Apr 2022 11:51:43 +0200 Subject: [PATCH 06/11] Change logs folder to be able to mount it in Kubernetes --- Collectors/logging.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Collectors/logging.conf b/Collectors/logging.conf index 5d3e507..53c929b 100644 --- a/Collectors/logging.conf +++ b/Collectors/logging.conf @@ -47,16 +47,16 @@ args=(sys.stdout,) class=FileHandler level=INFO formatter=simpleFormatter -args=('summary.log',) +args=('logs/summary.log',) [handler_DetailedFileHandler] class=handlers.RotatingFileHandler level=DEBUG formatter=simpleFormatter -args=('detailed.log', 'a',100000000, 5,) +args=('logs/detailed.log', 'a',100000000, 5,) [handler_RedirectorFileHandler] class=FileHandler level=INFO formatter=simpleFormatter -args=('redirector.log',) +args=('logs/redirector.log',) From a448bfeb9a0ed26f0e25aae9c00f768a1f93c01e Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Tue, 26 Apr 2022 12:01:31 +0200 Subject: [PATCH 07/11] Update stomp helper to match newest version interface --- Collectors/stomp_helper.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Collectors/stomp_helper.py b/Collectors/stomp_helper.py index 3935beb..822af57 100644 --- a/Collectors/stomp_helper.py +++ b/Collectors/stomp_helper.py @@ -14,9 +14,9 @@ def __init__(self, message_q: multiprocessing.Queue, metrics_q: multiprocessing. def on_error(self, frame): logging.error('Received an error "%s"' % frame.body) - def on_message(self, headers, body): + def on_message(self, frame): # Parse the JSON message - loaded_json = json.loads(body) + loaded_json = json.loads(frame.body) # Base64 decode the data message = base64.standard_b64decode(loaded_json['data']) @@ -54,4 +54,4 @@ def get_stomp_connection_objects(mb_alias, port, username, password, connection.connect(username=username, passcode=password, wait=True) connections.append(connection) - return connections \ No newline at end of file + return connections From 800611c80a9721037ef3506d02192f762a58b63d Mon Sep 17 00:00:00 2001 From: Borja Garrido Bear Date: Thu, 5 May 2022 11:14:01 +0200 Subject: [PATCH 08/11] Add some extra debugging --- Collectors/UdpCollector.py | 6 +++++- Collectors/stomp_helper.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Collectors/UdpCollector.py b/Collectors/UdpCollector.py index 406c3a5..114c44d 100755 --- a/Collectors/UdpCollector.py +++ b/Collectors/UdpCollector.py @@ -114,11 +114,15 @@ def _create_amq_channel(self): port = self.config.get('STOMP', 'port') username = self.config.get('STOMP', 'username') password = self.config.get('STOMP', 'password') + self.logger.debug("Instantiating connection to {}, with user {}".format( + mb_alias, username + )) self.connections = get_stomp_connection_objects(mb_alias, port, username, password, self.connections) + self.logger.debug("Amount of opened connections: {}".format(len(self.connections))) except Exception: - self.logger("Something bad happened") + self.logger.exception("Something bad happened") def publish(self, routing_key, record: dict, retry=True, exchange=None): diff --git a/Collectors/stomp_helper.py b/Collectors/stomp_helper.py index 822af57..d3f64ed 100644 --- a/Collectors/stomp_helper.py +++ b/Collectors/stomp_helper.py @@ -38,7 +38,7 @@ def get_stomp_connection_objects(mb_alias, port, username, password, # If we had connection already, make sure to close them for connection in connections: if connection.is_connected(): - connection.close() + connection.disconnect() connections = [] # Following advice from messaging team URL should be translated into all possible From be0d9850820121eae16e936caa6a3802e2422355 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Thu, 23 Jun 2022 16:08:35 +0200 Subject: [PATCH 09/11] Make producer and type configurable for WLCG records --- Collectors/DetailedCollector.py | 2 +- Collectors/UdpCollector.py | 3 +++ Collectors/wlcg_converter.py | 8 ++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Collectors/DetailedCollector.py b/Collectors/DetailedCollector.py index 6a9a1e9..5b01b11 100755 --- a/Collectors/DetailedCollector.py +++ b/Collectors/DetailedCollector.py @@ -278,7 +278,7 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime, fileToClo self.publish("file-close", rec, exchange=self._exchange) self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'stashcache'}) else: - wlcg_packet = wlcg_converter.Convert(rec) + wlcg_packet = wlcg_converter.Convert(rec, self.metadata_producer, self.metadata_type) self.logger.debug("WLCG record to send: %s", str(wlcg_packet)) self.publish("file-close", wlcg_packet, exchange=self._wlcg_exchange) self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'wlcg'}) diff --git a/Collectors/UdpCollector.py b/Collectors/UdpCollector.py index 114c44d..a1e0162 100755 --- a/Collectors/UdpCollector.py +++ b/Collectors/UdpCollector.py @@ -74,6 +74,9 @@ def __init__(self, config, bind_addr): self.metrics_q = None self.wlcg = config.get('DEFAULT', 'wlcg') + self.metadata_producer = config.get('WLCG', 'producer') + self.metadata_type = config.get('WLCG', 'type') + self.protocol = config.get('DEFAULT', 'protocol') if self.protocol == 'AMQP': self.exchange = config.get('AMQP', 'exchange') diff --git a/Collectors/wlcg_converter.py b/Collectors/wlcg_converter.py index 87e21be..d2e6def 100644 --- a/Collectors/wlcg_converter.py +++ b/Collectors/wlcg_converter.py @@ -37,7 +37,7 @@ import socket import urllib.parse -def Convert(source_record): +def Convert(source_record, metadata_producer, metadata_type): """ Convert to the WLCG format documented here: https://twiki.cern.ch/twiki/bin/view/Main/GenericFileMonitoring @@ -172,9 +172,9 @@ def Convert(source_record): # Add the metadata to_return["metadata"] = { - "producer": "cms", - "type": "aaa-ng", - "timestamp": int(round(time.time()*1000)), + "producer": metadata_producer, + "type": metadata_type, + "timestamp": to_return['end_time'], "type_prefix": "raw", "host": socket.gethostname(), "_id": to_return['unique_id'] From ae54353cab57a5edb74db1e5d6b18b5afebf5bc7 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Thu, 8 Jun 2023 10:06:40 +0200 Subject: [PATCH 10/11] Add HasFileCloseMsg to WLCG converter --- Collectors/wlcg_converter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Collectors/wlcg_converter.py b/Collectors/wlcg_converter.py index d2e6def..92fe36c 100644 --- a/Collectors/wlcg_converter.py +++ b/Collectors/wlcg_converter.py @@ -169,7 +169,10 @@ def Convert(source_record, metadata_producer, metadata_type): to_return['CRAB_Workflow'] = split_appinfo[1].split("/")[-1] except: pass - + + if 'HasFileCloseMsg' in source_record: + to_return['HasFileCloseMsg'] = source_record['HasFileCloseMsg'] + # Add the metadata to_return["metadata"] = { "producer": metadata_producer, From 0b216d4050e5299e392beae5d643858c1f073755 Mon Sep 17 00:00:00 2001 From: Borja Garrido Date: Thu, 8 Jun 2023 10:23:50 +0200 Subject: [PATCH 11/11] Add wlcg metadata configuration example --- connection.sample.conf | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connection.sample.conf b/connection.sample.conf index 40d86a0..8f86108 100644 --- a/connection.sample.conf +++ b/connection.sample.conf @@ -1,5 +1,9 @@ protocol = STOMP -wlcg = true +wlcg = false + +[WLCG] +producer = cms +type = aaa-ng [AMQP]