diff --git a/mqttwarn/context.py b/mqttwarn/context.py index cfed68e1..33679906 100644 --- a/mqttwarn/context.py +++ b/mqttwarn/context.py @@ -51,6 +51,18 @@ def get_qos(self, section: str) -> int: qos = int(self.config.get(section, "qos")) return qos + def get_timeout(self, section: str) -> int: + timeout = -1 + if self.config.has_option(section, "timeout"): + timeout = int(self.config.get(section, "timeout")) + return timeout + + def get_notify_only_on_timeout(self, section: str) -> int: + notify_only_on_timeout = False + if self.config.has_option(section, "notify_only_on_timeout"): + notify_only_on_timeout = bool(self.config.get(section, "notify_only_on_timeout")) + return notify_only_on_timeout + def get_config(self, section: str, name: str) -> t.Any: value = None if self.config.has_option(section, name): diff --git a/mqttwarn/core.py b/mqttwarn/core.py index ccca83d2..7b822366 100644 --- a/mqttwarn/core.py +++ b/mqttwarn/core.py @@ -35,6 +35,7 @@ timeout, truncate, ) +from mqttwarn.topic import TopicTimeout try: import json @@ -76,6 +77,9 @@ # Instances of PeriodicThread objects ptlist: t.Dict[str, PeriodicThread] = {} +# Instances of TopicTimeout objects +topic_timeout_list: t.Dict[str, TopicTimeout] = {} + # Instances of loaded service plugins service_plugins: t.Dict[str, t.Dict[str, t.Any]] = dict() @@ -131,6 +135,8 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str, for section in context.get_sections(): topic = context.get_topic(section) qos = context.get_qos(section) + topic_timeout = context.get_timeout(section) + notify_only_on_timeout = context.get_notify_only_on_timeout(section) if topic in subscribed: continue @@ -138,6 +144,12 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str, logger.debug("Subscribing to %s (qos=%d)" % (topic, qos)) mqttc.subscribe(topic, qos) subscribed.append(topic) + if topic_timeout > 0: + logger.debug("Setting up timeout thread for %s (timeout=%d)" % (topic, topic_timeout)) + topic_timeout_list[topic] = TopicTimeout(timeout=topic_timeout, topic=topic, section=section, + notify_only_on_timeout=notify_only_on_timeout, + on_timeout=send_to_targets) + topic_timeout_list[topic].start() if cf.lwt is not None: mqttc.publish(cf.lwt, cf.lwt_alive, qos=0, retain=True) @@ -160,6 +172,9 @@ def on_disconnect(mosq: MqttClient, userdata: t.Dict[str, str], result_code: int """ Handle disconnections from the broker """ + for topic, thread in topic_timeout_list.items(): + thread.stop() + if result_code == 0: logger.info("Clean disconnection from broker") else: @@ -192,6 +207,14 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe logger.debug("Skipping retained message on %s" % topic) return + for match_topic in topic_timeout_list: + if paho.topic_matches_sub(match_topic, topic): + logger.debug("Message received, restarting timeout on %s" % match_topic) + topic_timeout_list[match_topic].restart() + # Sometimes it is only relevant if a timeout is reached or not + if topic_timeout_list[match_topic].notify_only_on_timeout: + return + # Try to find matching settings for this topic for section in context.get_sections(): # Get the topic for this section (usually the section name but optionally overridden) diff --git a/mqttwarn/topic.py b/mqttwarn/topic.py new file mode 100644 index 00000000..8c88bc16 --- /dev/null +++ b/mqttwarn/topic.py @@ -0,0 +1,76 @@ +# (c) 2014-2023 The mqttwarn developers + +import logging +import threading +import time +import typing as t + +logger = logging.getLogger(__name__) + + +class TopicTimeout(threading.Thread): + """ + A thread handling timeouts on mqtt topics + """ + + def __init__( + self, + topic: t.Optional[str] = None, + timeout: t.Optional[int] = 1, + section: t.Optional[str] = None, + notify_only_on_timeout: t.Optional[bool] = False, + on_timeout: t.Optional[t.Callable] = None + ): + threading.Thread.__init__(self) + self.topic = topic + self.timeout = timeout + self.section = section + self.notify_only_on_timeout = notify_only_on_timeout + self._last_state_timeout = True + self._on_timeout = on_timeout + self._restart_event = threading.Event(); + self._stop_event = threading.Event() + + def run(self): + logger.debug("Starting thread %s for topic %s" % (self.name, self.topic)) + # The outer loop runs until the thread receives a stop signal + # See: https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread + # The outer loop is used to reset the timeout after a message was received + while not self._stop_event.is_set(): + timeout = self.timeout + # The inner loop runs until a stop signal is received or a message is received + # It uses the same logic as the outer loop for the signal handling + while True: + if self._stop_event.is_set(): + # End the inner loop on stop signal + break + if self._restart_event.is_set(): + # When a thread receives the reset signal, a message was received before the timeout. + # End the inner loop on reset signal, but before check what to do with the message. + # If the topic notifies only about timeout / no timeout and the last state was timeout + # a notification for the OK state should be published, otherwise just restart the thread + # and the received message will be handled by mqttwarn. + if self.notify_only_on_timeout and self._last_state_timeout: + logger.debug("%s received message for topic %s before timeout" % (self.name, self.topic)) + message = "Message received for topic %s within %i" % (self.topic, self.timeout) + self._last_state_timeout = False + self._on_timeout(self.section, self.topic, message.encode('UTF-8')) + self._restart_event = threading.Event() + break + logger.debug("%s waiting... %i" % (self.name, timeout)) + time.sleep(1) + timeout = timeout - 1 + if timeout == 0: + logger.debug("%s timeout for topic %s" % (self.name, self.topic)) + message = "Timeout for topic %s after %i" % (self.topic, self.timeout) + self._last_state_timeout = True + self._on_timeout(self.section, self.topic, message.encode('UTF-8')) + break + + def restart(self): + logger.debug("Restarting timeout thread for %s (timeout %i)" % (self.topic, self.timeout)) + self._restart_event.set() + + def stop(self): + logger.debug("Stopping timeout thread for %s" % (self.topic)) + self._stop_event.set()