Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Collectors/DetailedCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ def addRecord(self, sid, userID, fileClose, timestamp, addr, openTime, fileToClo
if 'filename' not in rec or rec['filename'] == "missing directory":
self.metrics_q.put({'type': 'failed filename', 'count': 1})

if not lcg_record:
if not self.wlcg and not lcg_record:
self.logger.debug("OSG record to send: %s", str(rec))
self.publish("file-close", rec, exchange=self._exchange)
self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'stashcache'})
else:
wlcg_packet = wlcg_converter.Convert(rec)
wlcg_packet = wlcg_converter.Convert(rec, self.metadata_producer, self.metadata_type)
self.logger.debug("WLCG record to send: %s", str(wlcg_packet))
self.publish("file-close", wlcg_packet, exchange=self._wlcg_exchange)
self.metrics_q.put({'type': 'message sent', 'count': 1, 'message_type': 'wlcg'})
Expand Down
172 changes: 132 additions & 40 deletions Collectors/UdpCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import multiprocessing
import os
import queue
import random
import socket
import sys
import time
Expand All @@ -24,6 +25,8 @@
from prometheus_client import start_http_server, Counter, Gauge

import pika
import stomp
from stomp_helper import get_stomp_connection_objects


class _LoggerWriter(object):
Expand Down Expand Up @@ -68,8 +71,28 @@ def __init__(self, config, bind_addr):
self.config = config
self.message_q = None
self.child_process = None
self.exchange = config.get('AMQP', 'exchange')
self.metrics_q = None
self.wlcg = config.get('DEFAULT', 'wlcg')

self.metadata_producer = config.get('WLCG', 'producer')
self.metadata_type = config.get('WLCG', 'type')

self.protocol = config.get('DEFAULT', 'protocol')
if self.protocol == 'AMQP':
self.exchange = config.get('AMQP', 'exchange')
elif self.protocol == 'STOMP':
self.topic = config.get('STOMP', 'topic')
self.connections = []
else:
self.logger.exception('Error while loading configuration, selected protocol is not valid')


def _create_mq_channel(self):
if self.protocol == 'AMQP':
self._create_rmq_channel()
elif self.protocol == 'STOMP':
self._create_amq_channel()


def _create_rmq_channel(self):
"""
Expand All @@ -85,21 +108,69 @@ def _create_rmq_channel(self):
print(e)


def _create_amq_channel(self):
"""
Create a fresh connection to AMQ
"""
try:
mb_alias = self.config.get('STOMP', 'url')
port = self.config.get('STOMP', 'port')
username = self.config.get('STOMP', 'username')
password = self.config.get('STOMP', 'password')
self.logger.debug("Instantiating connection to {}, with user {}".format(
mb_alias, username
))
self.connections = get_stomp_connection_objects(mb_alias, port,
username, password,
self.connections)
self.logger.debug("Amount of opened connections: {}".format(len(self.connections)))
except Exception:
self.logger.exception("Something bad happened")


def publish(self, routing_key, record: dict, retry=True, exchange=None):
if exchange is None:
exchange = self.exchange

try:
self.channel.basic_publish(exchange,
routing_key,
json.dumps(record),
pika.BasicProperties(content_type='application/json',
delivery_mode=pika.spec.TRANSIENT_DELIVERY_MODE))
except Exception:
if retry:
self.logger.exception('Error while sending rabbitmq message; will recreate connection and retry')
self._create_rmq_channel()
self.publish(routing_key, record, retry=False, exchange=exchange)
if self.protocol == 'AMQP':
try:
self.channel.basic_publish(exchange,
routing_key,
json.dumps(record),
pika.BasicProperties(content_type='application/json',
delivery_mode=pika.spec.TRANSIENT_DELIVERY_MODE))
except Exception:
if retry:
self.logger.exception('Error while sending rabbitmq message; will recreate connection and retry')
self._create_rmq_channel()
self.publish(routing_key, record, retry=False, exchange=exchange)
elif self.protocol == 'STOMP':
try:
random.choice(self.connection).send(self.topic,
json.dumps(record),
headers={'content_type': 'application/json'})
except Exception:
if retry:
sent = False
for connection in self.connections:
if sent:
# Message sent we are done
break
else:
try:
connection.send(self.topic,
json.dumps(record),
headers={'content_type': 'application/json'})
sent = True
except Exception:
# Just keep trying other connections
pass
# Recreate the connections anyhow to keep them healthy
self._create_amq_channel()
# If it was not sent at all We better retry
if not sent:
self.logger.exception('Error while sending amq message; will recreate connection and retry')
self.publish(routing_key, record, retry=False, exchange=exchange)


def _init_logging(self):
Expand Down Expand Up @@ -242,7 +313,7 @@ def start(self):
def _start_child(Collector, config, message_q, metrics_q):
coll = Collector(config, (Collector.DEFAULT_HOST, Collector.DEFAULT_PORT))
coll._init_logging()
coll._create_rmq_channel()
coll._create_mq_channel()
coll.message_q = message_q
coll.metrics_q = metrics_q
while True:
Expand All @@ -257,41 +328,62 @@ def _start_child(Collector, config, message_q, metrics_q):

@classmethod
def _bus_child(self, config, message_q: multiprocessing.Queue, metrics_q: multiprocessing.Queue):
# Setup the connection to the message bus
parameters = pika.URLParameters(config.get('AMQP', 'url'))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
protocol = config.get('DEFAULT', 'protocol')
if protocol == 'AMQP':
# Setup the connection to the message bus
parameters = pika.URLParameters(config.get('AMQP', 'url'))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

def on_message(channel, method, properties, body):
# Parse the JSON message
loaded_json = json.loads(body)
def on_message(channel, method, properties, body):
# Parse the JSON message
loaded_json = json.loads(body)

# Base64 decode the data
message = base64.standard_b64decode(loaded_json['data'])
# Base64 decode the data
message = base64.standard_b64decode(loaded_json['data'])

# Send to message queue
# Get the address and port
addr = loaded_json['remote'].rsplit(":", 1)
message_q.put([message, addr[0], addr[1]])
channel.basic_ack(method.delivery_tag)
# Send to message queue
# Get the address and port
addr = loaded_json['remote'].rsplit(":", 1)
message_q.put([message, addr[0], addr[1]])
channel.basic_ack(method.delivery_tag)

# Update the number of messages received on the bus to the metrics_q
metrics_q.put({'type': 'pushed messages', 'count': 1})
# Update the number of messages received on the bus to the metrics_q
metrics_q.put({'type': 'pushed messages', 'count': 1})

channel.basic_qos(prefetch_count=1000)
channel.basic_consume(config.get("Pushed", "push_queue"), on_message)
channel.basic_qos(prefetch_count=1000)
channel.basic_consume(config.get("Pushed", "push_queue"), on_message)

try:
channel.start_consuming()
except Exception as ex:
channel.stop_consuming()
try:
channel.start_consuming()
except Exception as ex:
channel.stop_consuming()
connection.close()
raise ex
connection.close()
raise ex
connection.close()


elif protocol == 'STOMP':
mb_alias = config.get('STOMP', 'url')
port = config.get('STOMP', 'port')
username = config.get('STOMP', 'username')
password = config.get('STOMP', 'password')
subscribed = False
connections = []

while(True):
if not subscribed:
connections = get_stomp_connection_objects(mb_alias, port,
username, password,
connections,
message_q, metrics_q)
for connection in connections:
connection.subscribe(config.get('STOMP', 'UDPQueue'),
id='XrootDCollector')
subscribed = True
# Check every 5 minutes if all the connection are alive, otherwise reconnect
time.sleep(300)
if not all(connection.is_connected() for connection in connections):
subscribed = False



@staticmethod
def _metrics_child(metrics_q):
Expand Down
6 changes: 3 additions & 3 deletions Collectors/logging.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ args=(sys.stdout,)
class=FileHandler
level=INFO
formatter=simpleFormatter
args=('summary.log',)
args=('logs/summary.log',)

[handler_DetailedFileHandler]
class=handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=('detailed.log', 'a',100000000, 5,)
args=('logs/detailed.log', 'a',100000000, 5,)

[handler_RedirectorFileHandler]
class=FileHandler
level=INFO
formatter=simpleFormatter
args=('redirector.log',)
args=('logs/redirector.log',)
57 changes: 57 additions & 0 deletions Collectors/stomp_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import base64
import json
import logging
import multiprocessing
import socket
import stomp


class StompyListener(stomp.ConnectionListener):
def __init__(self, message_q: multiprocessing.Queue, metrics_q: multiprocessing.Queue):
self.message_q = message_q
self.metrics_q = metrics_q

def on_error(self, frame):
logging.error('Received an error "%s"' % frame.body)

def on_message(self, frame):
# Parse the JSON message
loaded_json = json.loads(frame.body)

# Base64 decode the data
message = base64.standard_b64decode(loaded_json['data'])

# Send to message queue
# Get the address and port
addr = loaded_json['remote'].rsplit(":", 1)
self.message_q.put([message, addr[0], addr[1]])

# Update the number of messages received on the bus to the metrics_q
self.metrics_q.put({'type': 'pushed messages', 'count': 1})

def on_disconnected(self):
logging.debug('disconnected')


def get_stomp_connection_objects(mb_alias, port, username, password,
connections=[], message_q=[], metrics_q=[]):
# If we had connection already, make sure to close them
for connection in connections:
if connection.is_connected():
connection.disconnect()

connections = []
# Following advice from messaging team URL should be translated into all possible
# hosts behind the alias
# Get the list of IPs behind the alias
hosts = socket.gethostbyname_ex(mb_alias)[2]

host_and_ports = map(lambda x: (x, port), hosts)
# Create a connection to each broker available
for host_and_port in host_and_ports:
connection = stomp.Connection(host_and_ports=[host_and_port])
connection.set_listener('StompyListener', StompyListener(message_q, metrics_q))
connection.connect(username=username, passcode=password, wait=True)
connections.append(connection)

return connections
13 changes: 8 additions & 5 deletions Collectors/wlcg_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import socket
import urllib.parse

def Convert(source_record):
def Convert(source_record, metadata_producer, metadata_type):
"""
Convert to the WLCG format documented here:
https://twiki.cern.ch/twiki/bin/view/Main/GenericFileMonitoring
Expand Down Expand Up @@ -169,12 +169,15 @@ def Convert(source_record):
to_return['CRAB_Workflow'] = split_appinfo[1].split("/")[-1]
except:
pass


if 'HasFileCloseMsg' in source_record:
to_return['HasFileCloseMsg'] = source_record['HasFileCloseMsg']

# Add the metadata
to_return["metadata"] = {
"producer": "cms",
"type": "aaa-ng",
"timestamp": int(round(time.time()*1000)),
"producer": metadata_producer,
"type": metadata_type,
"timestamp": to_return['end_time'],
"type_prefix": "raw",
"host": socket.gethostname(),
"_id": to_return['unique_id']
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
FROM python:3-alpine

COPY requirements.txt /
RUN apk update && apk add python3-dev gcc libc-dev libffi-dev
RUN python3 -m pip install --upgrade pip setuptools wheel
RUN pip install -r /requirements.txt

COPY Collectors /app
Expand Down
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,7 @@ record into a AMQP-based message bus.
Configuration
-------------

The DetailedCollector needs the a configuration in order to connect to the AMQP message bus.

[AMQP]

# Host information
url = amqps://username:password@example.com

# Exchange to write to
exchange = xrd.detailed
The DetailedCollector needs the a configuration in order to connect to the MQ message bus, please check connection.sample.conf file in this repository.

This file is named `connection.conf` and should be in the Collectors directory or deployed with docker volumes, as shown below.

Expand Down
Loading