Skip to content

Introduce a warning timeout for MQTT topics #697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mqttwarn/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ def get_qos(self, section: str) -> int:
qos = int(self.config.get(section, "qos"))
return qos

def get_timeout(self, section: str) -> int:
timeout = -1
if self.config.has_option(section, "timeout"):
timeout = int(self.config.get(section, "timeout"))
return timeout

def get_notify_only_on_timeout(self, section: str) -> int:
notify_only_on_timeout = False
if self.config.has_option(section, "notify_only_on_timeout"):
notify_only_on_timeout = bool(self.config.get(section, "notify_only_on_timeout"))
return notify_only_on_timeout

def get_config(self, section: str, name: str) -> t.Any:
value = None
if self.config.has_option(section, name):
Expand Down
23 changes: 23 additions & 0 deletions mqttwarn/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
timeout,
truncate,
)
from mqttwarn.topic import TopicTimeout

try:
import json
Expand Down Expand Up @@ -76,6 +77,9 @@
# Instances of PeriodicThread objects
ptlist: t.Dict[str, PeriodicThread] = {}

# Instances of TopicTimeout objects
topic_timeout_list: t.Dict[str, TopicTimeout] = {}

# Instances of loaded service plugins
service_plugins: t.Dict[str, t.Dict[str, t.Any]] = dict()

Expand Down Expand Up @@ -131,13 +135,21 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str,
for section in context.get_sections():
topic = context.get_topic(section)
qos = context.get_qos(section)
topic_timeout = context.get_timeout(section)
notify_only_on_timeout = context.get_notify_only_on_timeout(section)

if topic in subscribed:
continue

logger.debug("Subscribing to %s (qos=%d)" % (topic, qos))
mqttc.subscribe(topic, qos)
subscribed.append(topic)
if topic_timeout > 0:
logger.debug("Setting up timeout thread for %s (timeout=%d)" % (topic, topic_timeout))
topic_timeout_list[topic] = TopicTimeout(timeout=topic_timeout, topic=topic, section=section,
notify_only_on_timeout=notify_only_on_timeout,
on_timeout=send_to_targets)
topic_timeout_list[topic].start()

if cf.lwt is not None:
mqttc.publish(cf.lwt, cf.lwt_alive, qos=0, retain=True)
Expand All @@ -160,6 +172,9 @@ def on_disconnect(mosq: MqttClient, userdata: t.Dict[str, str], result_code: int
"""
Handle disconnections from the broker
"""
for topic, thread in topic_timeout_list.items():
thread.stop()

if result_code == 0:
logger.info("Clean disconnection from broker")
else:
Expand Down Expand Up @@ -192,6 +207,14 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe
logger.debug("Skipping retained message on %s" % topic)
return

for match_topic in topic_timeout_list:
if paho.topic_matches_sub(match_topic, topic):
logger.debug("Message received, restarting timeout on %s" % match_topic)
topic_timeout_list[match_topic].restart()
# Sometimes it is only relevant if a timeout is reached or not
if topic_timeout_list[match_topic].notify_only_on_timeout:
return

# Try to find matching settings for this topic
for section in context.get_sections():
# Get the topic for this section (usually the section name but optionally overridden)
Expand Down
76 changes: 76 additions & 0 deletions mqttwarn/topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# (c) 2014-2023 The mqttwarn developers

import logging
import threading
import time
import typing as t

logger = logging.getLogger(__name__)


class TopicTimeout(threading.Thread):
"""
A thread handling timeouts on mqtt topics
"""

def __init__(
self,
topic: t.Optional[str] = None,
timeout: t.Optional[int] = 1,
section: t.Optional[str] = None,
notify_only_on_timeout: t.Optional[bool] = False,
on_timeout: t.Optional[t.Callable] = None
):
threading.Thread.__init__(self)
self.topic = topic
self.timeout = timeout
self.section = section
self.notify_only_on_timeout = notify_only_on_timeout
self._last_state_timeout = True
self._on_timeout = on_timeout
self._restart_event = threading.Event();
self._stop_event = threading.Event()

def run(self):
logger.debug("Starting thread %s for topic %s" % (self.name, self.topic))
# The outer loop runs until the thread receives a stop signal
# See: https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread
# The outer loop is used to reset the timeout after a message was received
while not self._stop_event.is_set():
timeout = self.timeout
# The inner loop runs until a stop signal is received or a message is received
# It uses the same logic as the outer loop for the signal handling
while True:
if self._stop_event.is_set():
# End the inner loop on stop signal
break
if self._restart_event.is_set():
# When a thread receives the reset signal, a message was received before the timeout.
# End the inner loop on reset signal, but before check what to do with the message.
# If the topic notifies only about timeout / no timeout and the last state was timeout
# a notification for the OK state should be published, otherwise just restart the thread
# and the received message will be handled by mqttwarn.
if self.notify_only_on_timeout and self._last_state_timeout:
logger.debug("%s received message for topic %s before timeout" % (self.name, self.topic))
message = "Message received for topic %s within %i" % (self.topic, self.timeout)
self._last_state_timeout = False
self._on_timeout(self.section, self.topic, message.encode('UTF-8'))
self._restart_event = threading.Event()
break
logger.debug("%s waiting... %i" % (self.name, timeout))
time.sleep(1)
timeout = timeout - 1
if timeout == 0:
logger.debug("%s timeout for topic %s" % (self.name, self.topic))
message = "Timeout for topic %s after %i" % (self.topic, self.timeout)
self._last_state_timeout = True
self._on_timeout(self.section, self.topic, message.encode('UTF-8'))
break

def restart(self):
logger.debug("Restarting timeout thread for %s (timeout %i)" % (self.topic, self.timeout))
self._restart_event.set()

def stop(self):
logger.debug("Stopping timeout thread for %s" % (self.topic))
self._stop_event.set()
Loading