-
Notifications
You must be signed in to change notification settings - Fork 1
bug: idsse-1135: Consumer callbacks not logging #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I created a simple usage of the Consumer class to confirm this behaves as expected. import logging
import logging.config
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
from idsse.common.log_util import get_default_log_config
from idsse.common.rabbitmq_utils import (Consumer, Conn, Exch, Queue, RabbitMqParams,
RabbitMqParamsAndCallback, threadsafe_ack)
logger = logging.getLogger(__name__)
class SimpleConsumer:
"""Class that runs a RabbitMQ Consumer and logs when it gets messages"""
def __init__(self):
conn = Conn('localhost', '/', 5672, 'guest', 'guest')
rmq_params = RabbitMqParams(
Exch('my_exch', 'topic'),
Queue('my_queue', '#', True, False, False, {'x-queue-type': 'classic'})
)
logger.info('Starting the Runner with conn %s, params: %s', conn, rmq_params)
self._consumer = Consumer(conn, RabbitMqParamsAndCallback(rmq_params, self.on_message))
self._consumer.start()
self._consumer.join()
def on_message(self,
channel: BlockingChannel,
method: Basic.Return,
properties: BasicProperties,
body: bytes):
"""React to message being consumed from RabbitMQ queue"""
message = body.decode()
logger.info('Received message: %s', message)
threadsafe_ack(channel,
method.delivery_tag,
lambda: logger.info('Message acked! %s', message))
if __name__ == '__main__':
"""Main function"""
logging.config.dictConfig(get_default_log_config('DEBUG'))
logging.getLogger('pika').setLevel('WARNING')
try:
SimpleConsumer()
except Exception as exc:
logger.error('Failed to start Runner. Cause: [%s] %s', type(exc), str(exc)) After I published a message
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing, the solution is straight forward and should be transferable to any other thread pool executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the thinking behind change these methods to be 'public' vs 'private'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to remove all the "protected access" pylint warnings from these unit tests. I don't normally like to make functions/variables public just for the sake of testing, but I think the alternative is tossing any unit tests that try to access them as being too coupled with internal logic.
If you'd rather we just leave it as it is (tests accessing private variables), and come back to this when we're fixing all the bad unit test practices, I'm ok with that too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon deeper reflection, I think this will be solved when we have full mock of RabbitMQ for unit tests and rewrite tests to use that mock, rather than read/write private class vars to simulate certain states. Changing private vars to public is only masking the problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we should fix the test in a future task
Linear Issue
IDSSE-1135
Changes
Consumer
: copycontext
of Consumer to any ThreadPoolExecutor threadson_message
callbacks were making it to the console due to this bug, except calls to functions likethreadsafe_ack()
.rabbitmq_utils.Rpc
: nack any response message that can't be correlated with a request based onproperties.headers.rpc
Explanation
N/A