Skip to content

Conversation

mackenzie-grimes-noaa
Copy link
Contributor

@mackenzie-grimes-noaa mackenzie-grimes-noaa commented Feb 18, 2025

Linear Issue

IDSSE-1135

Changes

  • Consumer: copy context of Consumer to any ThreadPoolExecutor threads
    • Previously, no log messages from Consumer on_message callbacks were making it to the console due to this bug, except calls to functions like threadsafe_ack().
  • rabbitmq_utils.Rpc: nack any response message that can't be correlated with a request based on properties.headers.rpc
    • This quirk where RPC silently ignores any "unregistered" response message hid a bug in DAS. We figured it out, but this logging will surface issues like that sooner.

Explanation

N/A

@mackenzie-grimes-noaa
Copy link
Contributor Author

I created a simple usage of the Consumer class to confirm this behaves as expected.

import logging
import logging.config

from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties

from idsse.common.log_util import get_default_log_config
from idsse.common.rabbitmq_utils import (Consumer, Conn, Exch, Queue, RabbitMqParams,
                                         RabbitMqParamsAndCallback, threadsafe_ack)

logger = logging.getLogger(__name__)


class SimpleConsumer:
    """Class that runs a RabbitMQ Consumer and logs when it gets messages"""
    def __init__(self):
        conn = Conn('localhost', '/', 5672, 'guest', 'guest')
        rmq_params = RabbitMqParams(
            Exch('my_exch', 'topic'),
            Queue('my_queue', '#', True, False, False, {'x-queue-type': 'classic'})
        )
        logger.info('Starting the Runner with conn %s, params: %s', conn, rmq_params)
        self._consumer = Consumer(conn, RabbitMqParamsAndCallback(rmq_params, self.on_message))

        self._consumer.start()
        self._consumer.join()

    def on_message(self,
                   channel: BlockingChannel,
                   method: Basic.Return,
                   properties: BasicProperties,
                   body: bytes):
        """React to message being consumed from RabbitMQ queue"""
        message = body.decode()
        logger.info('Received message: %s', message)
        threadsafe_ack(channel,
                       method.delivery_tag,
                       lambda: logger.info('Message acked! %s', message))


if __name__ == '__main__':
    """Main function"""
    logging.config.dictConfig(get_default_log_config('DEBUG'))
    logging.getLogger('pika').setLevel('WARNING')

    try:
        SimpleConsumer()
    except Exception as exc:
        logger.error('Failed to start Runner. Cause: [%s] %s', type(exc), str(exc))

After I published a message {"foo": "bar"} to the made up RabbitMQ queue my_queue, it outputs the following messages to my console (notably, no log messages are missing!)

2025-02-18 22:44:40,204 idsse.common.rabbitmq_utils::Consumer INFO     None;00000000-0000-0000-0000-000000000000;_ rabbitmq_utils::run"(line 156) Start Consuming...  (to stop press CTRL+C)
2025-02-18 22:45:00,838 __main__ INFO     None;00000000-0000-0000-0000-000000000000;_ multithread_logging::on_message"(line 53) Received message: {"foo":"bar"}
2025-02-18 22:45:00,839 __main__ INFO    None;00000000-0000-0000-0000-000000000000;_ multithread_logging::<lambda>"(line 56) Message acked! {"foo":"bar"}

Geary-Layne
Geary-Layne previously approved these changes Feb 20, 2025
Copy link
Contributor

@Geary-Layne Geary-Layne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing, the solution is straight forward and should be transferable to any other thread pool executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the thinking behind change these methods to be 'public' vs 'private'?

Copy link
Contributor Author

@mackenzie-grimes-noaa mackenzie-grimes-noaa Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to remove all the "protected access" pylint warnings from these unit tests. I don't normally like to make functions/variables public just for the sake of testing, but I think the alternative is tossing any unit tests that try to access them as being too coupled with internal logic.

If you'd rather we just leave it as it is (tests accessing private variables), and come back to this when we're fixing all the bad unit test practices, I'm ok with that too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon deeper reflection, I think this will be solved when we have full mock of RabbitMQ for unit tests and rewrite tests to use that mock, rather than read/write private class vars to simulate certain states. Changing private vars to public is only masking the problem.

@mackenzie-grimes-noaa mackenzie-grimes-noaa merged commit 691d1df into main Feb 20, 2025
2 checks passed
@mackenzie-grimes-noaa mackenzie-grimes-noaa deleted the bug/consumer-on-message-logging branch February 20, 2025 16:49
Copy link
Contributor

@Geary-Layne Geary-Layne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should fix the test in a future task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants