From 76c4e263abf4b538cd87cfa4b6a8b6b46e5b91b3 Mon Sep 17 00:00:00 2001 From: "chandr-andr (Kiselev Aleksandr)" Date: Thu, 20 Apr 2023 00:06:16 +0400 Subject: [PATCH] Added new code to fix consumer destroy --- memphis/consumer.py | 3 ++- memphis/memphis.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index dd66996..97d944c 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -43,6 +43,7 @@ def __init__( self.context = {} self.dls_messages = [] self.dls_current_index = 0 + self.t_consume = None self.dls_callback_func = None self.t_dls = asyncio.create_task(self.__consume_dls()) @@ -191,4 +192,4 @@ async def destroy(self): map_key = internal_station_name + "_" + self.consumer_name.lower() del self.connection.consumers_map[map_key] except Exception as e: - raise MemphisError(str(e)) from e \ No newline at end of file + raise MemphisError(str(e)) from e diff --git a/memphis/memphis.py b/memphis/memphis.py index 7cf1db8..eab135a 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -505,6 +505,7 @@ async def consumer( real_name = consumer_name.lower() if generate_random_suffix: consumer_name = self.__generateRandomSuffix(consumer_name) + real_name += "_" + consumer_name.split("_")[-1] cg = consumer_name if not consumer_group else consumer_group if start_consume_from_sequence <= 0: