diff --git a/memphis/consumer.py b/memphis/consumer.py index dd66996..97d944c 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -43,6 +43,7 @@ def __init__( self.context = {} self.dls_messages = [] self.dls_current_index = 0 + self.t_consume = None self.dls_callback_func = None self.t_dls = asyncio.create_task(self.__consume_dls()) @@ -191,4 +192,4 @@ async def destroy(self): map_key = internal_station_name + "_" + self.consumer_name.lower() del self.connection.consumers_map[map_key] except Exception as e: - raise MemphisError(str(e)) from e \ No newline at end of file + raise MemphisError(str(e)) from e diff --git a/memphis/memphis.py b/memphis/memphis.py index 7cf1db8..eab135a 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -505,6 +505,7 @@ async def consumer( real_name = consumer_name.lower() if generate_random_suffix: consumer_name = self.__generateRandomSuffix(consumer_name) + real_name += "_" + consumer_name.split("_")[-1] cg = consumer_name if not consumer_group else consumer_group if start_consume_from_sequence <= 0: