diff --git a/src/instana/__init__.py b/src/instana/__init__.py index 7f870b6a..647b6253 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -173,7 +173,7 @@ def boot_agent(): # grpcio, # noqa: F401 logging, # noqa: F401 mysqlclient, # noqa: F401 - # pika, # noqa: F401 + pika, # noqa: F401 pep0249, # noqa: F401 psycopg2, # noqa: F401 pymongo, # noqa: F401 diff --git a/src/instana/instrumentation/pika.py b/src/instana/instrumentation/pika.py index cc9478cb..c8c74a5d 100644 --- a/src/instana/instrumentation/pika.py +++ b/src/instana/instrumentation/pika.py @@ -2,167 +2,261 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2021 - - -import wrapt -import opentracing -import types - -from ..log import logger -from ..singletons import tracer -from ..util.traceutils import get_tracer_tuple, tracing_is_off - try: - import pika - - - def _extract_broker_tags(span, conn): - span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) - - - def _extract_publisher_tags(span, conn, exchange, routing_key): - _extract_broker_tags(span, conn) - - span.set_tag("sort", "publish") - span.set_tag("key", routing_key) - span.set_tag("exchange", exchange) - - - def _extract_consumer_tags(span, conn, queue): - _extract_broker_tags(span, conn) + import types + from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterator, + Optional, + Tuple, + Union, + ) - span.set_tag("sort", "consume") - span.set_tag("queue", queue) - - - @wrapt.patch_function_wrapper('pika.channel', 'Channel.basic_publish') - def basic_publish_with_instana(wrapped, instance, args, kwargs): - def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs): + import pika + import wrapt + + from instana.log import logger + from instana.propagators.format import Format + from instana.singletons import tracer + from instana.util.traceutils import get_tracer_tuple, tracing_is_off + + if TYPE_CHECKING: + import pika.adapters.blocking_connection + import pika.channel + import pika.connection + + from instana.span.span import InstanaSpan + + def _extract_broker_attributes( + span: "InstanaSpan", conn: pika.connection.Connection + ) -> None: + span.set_attribute("address", f"{conn.params.host}:{conn.params.port}") + + def _extract_publisher_attributes( + span: "InstanaSpan", + conn: pika.connection.Connection, + exchange: str, + routing_key: str, + ) -> None: + _extract_broker_attributes(span, conn) + + span.set_attribute("sort", "publish") + span.set_attribute("key", routing_key) + span.set_attribute("exchange", exchange) + + def _extract_consumer_tags( + span: "InstanaSpan", conn: pika.connection.Connection, queue: str + ) -> None: + _extract_broker_attributes(span, conn) + + span.set_attribute("sort", "consume") + span.set_attribute("queue", queue) + + @wrapt.patch_function_wrapper("pika.channel", "Channel.basic_publish") + def basic_publish_with_instana( + wrapped: Callable[..., pika.channel.Channel.basic_publish], + instance: pika.channel.Channel, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: + def _bind_args( + exchange: str, + routing_key: str, + body: str, + properties: Optional[object] = None, + *args: object, + **kwargs: object, + ) -> Tuple[object, ...]: return (exchange, routing_key, body, properties, args, kwargs) - tracer, parent_span, _ = get_tracer_tuple() - + # If we're not tracing, just return if tracing_is_off(): return wrapped(*args, **kwargs) - (exchange, routing_key, body, properties, args, kwargs) = (_bind_args(*args, **kwargs)) + tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None + + (exchange, routing_key, body, properties, args, kwargs) = _bind_args( + *args, **kwargs + ) - with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + with tracer.start_as_current_span( + "rabbitmq", span_context=parent_context + ) as span: try: - _extract_publisher_tags(scope.span, - conn=instance.connection, - routing_key=routing_key, - exchange=exchange) - except: - logger.debug("publish_with_instana: ", exc_info=True) + _extract_publisher_attributes( + span, + conn=instance.connection, + routing_key=routing_key, + exchange=exchange, + ) + except Exception: + logger.debug("pika publish_with_instana error: ", exc_info=True) # context propagation properties = properties or pika.BasicProperties() properties.headers = properties.headers or {} - tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, properties.headers, - disable_w3c_trace_context=True) + tracer.inject( + span.context, + Format.HTTP_HEADERS, + properties.headers, + disable_w3c_trace_context=True, + ) args = (exchange, routing_key, body, properties) + args try: rv = wrapped(*args, **kwargs) - except Exception as e: - scope.span.log_exception(e) - raise + except Exception as exc: + span.record_exception(exc) else: return rv - - def basic_get_with_instana(wrapped, instance, args, kwargs): - def _bind_args(*args, **kwargs): + def basic_get_with_instana( + wrapped: Callable[ + ..., + Union[pika.channel.Channel.basic_get, pika.channel.Channel.basic_consume], + ], + instance: pika.channel.Channel, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: + def _bind_args(*args: object, **kwargs: object) -> Tuple[object, ...]: args = list(args) - queue = kwargs.pop('queue', None) or args.pop(0) - callback = kwargs.pop('callback', None) or kwargs.pop('on_message_callback', None) or args.pop(0) + queue = kwargs.pop("queue", None) or args.pop(0) + callback = ( + kwargs.pop("callback", None) + or kwargs.pop("on_message_callback", None) + or args.pop(0) + ) return (queue, callback, tuple(args), kwargs) queue, callback, args, kwargs = _bind_args(*args, **kwargs) - def _cb_wrapper(channel, method, properties, body): - parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers, - disable_w3c_trace_context=True) - - with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + def _cb_wrapper( + channel: pika.channel.Channel, + method: pika.spec.Basic, + properties: pika.BasicProperties, + body: str, + ) -> None: + parent_context = tracer.extract( + Format.HTTP_HEADERS, properties.headers, disable_w3c_trace_context=True + ) + + with tracer.start_as_current_span( + "rabbitmq", span_context=parent_context + ) as span: try: - _extract_consumer_tags(scope.span, - conn=instance.connection, - queue=queue) - except: - logger.debug("basic_get_with_instana: ", exc_info=True) + _extract_consumer_tags(span, conn=instance.connection, queue=queue) + except Exception: + logger.debug("pika basic_get_with_instana error: ", exc_info=True) try: callback(channel, method, properties, body) - except Exception as e: - scope.span.log_exception(e) - raise + except Exception as exc: + span.record_exception(exc) args = (queue, _cb_wrapper) + args return wrapped(*args, **kwargs) - @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume') - def basic_consume_with_instana(wrapped, instance, args, kwargs): - def _bind_args(queue, on_message_callback, *args, **kwargs): + @wrapt.patch_function_wrapper( + "pika.adapters.blocking_connection", "BlockingChannel.basic_consume" + ) + def basic_consume_with_instana( + wrapped: Callable[ + ..., pika.adapters.blocking_connection.BlockingChannel.basic_consume + ], + instance: pika.adapters.blocking_connection.BlockingChannel, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: + def _bind_args( + queue: str, + on_message_callback: object, + *args: object, + **kwargs: object, + ) -> Tuple[object, ...]: return (queue, on_message_callback, args, kwargs) queue, on_message_callback, args, kwargs = _bind_args(*args, **kwargs) - def _cb_wrapper(channel, method, properties, body): - parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers, - disable_w3c_trace_context=True) - - with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + def _cb_wrapper( + channel: pika.channel.Channel, + method: pika.spec.Basic, + properties: pika.BasicProperties, + body: str, + ) -> None: + parent_context = tracer.extract( + Format.HTTP_HEADERS, properties.headers, disable_w3c_trace_context=True + ) + + with tracer.start_as_current_span( + "rabbitmq", span_context=parent_context + ) as span: try: - _extract_consumer_tags(scope.span, - conn=instance.connection._impl, - queue=queue) - except: - logger.debug("basic_consume_with_instana: ", exc_info=True) + _extract_consumer_tags( + span, conn=instance.connection._impl, queue=queue + ) + except Exception: + logger.debug( + "pika basic_consume_with_instana error:", exc_info=True + ) try: on_message_callback(channel, method, properties, body) - except Exception as e: - scope.span.log_exception(e) - raise + except Exception as exc: + span.record_exception(exc) args = (queue, _cb_wrapper) + args return wrapped(*args, **kwargs) - - @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.consume') - def consume_with_instana(wrapped, instance, args, kwargs): - def _bind_args(queue, *args, **kwargs): + @wrapt.patch_function_wrapper( + "pika.adapters.blocking_connection", "BlockingChannel.consume" + ) + def consume_with_instana( + wrapped: Callable[..., pika.adapters.blocking_connection.BlockingChannel], + instance: pika.adapters.blocking_connection.BlockingChannel, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: + def _bind_args( + queue: str, *args: object, **kwargs: object + ) -> Tuple[object, ...]: return (queue, args, kwargs) - (queue, args, kwargs) = (_bind_args(*args, **kwargs)) + (queue, args, kwargs) = _bind_args(*args, **kwargs) - def _consume(gen): - for yilded in gen: + def _consume(gen: Iterator[object]) -> object: + for yielded in gen: # Bypass the delivery created due to inactivity timeout - if yilded is None or not any(yilded): - yield yilded + if not yielded or not any(yielded): + yield yielded continue - (method_frame, properties, body) = yilded + (method_frame, properties, body) = yielded - parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers, - disable_w3c_trace_context=True) - with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + parent_context = tracer.extract( + Format.HTTP_HEADERS, + properties.headers, + disable_w3c_trace_context=True, + ) + with tracer.start_as_current_span( + "rabbitmq", span_context=parent_context + ) as span: try: - _extract_consumer_tags(scope.span, - conn=instance.connection._impl, - queue=queue) - except: + _extract_consumer_tags( + span, conn=instance.connection._impl, queue=queue + ) + except Exception: logger.debug("consume_with_instana: ", exc_info=True) try: - yield yilded - except Exception as e: - scope.span.log_exception(e) - raise + yield yielded + except Exception as exc: + span.record_exception(exc) args = (queue,) + args res = wrapped(*args, **kwargs) @@ -172,20 +266,31 @@ def _consume(gen): else: return res - - @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.__init__') - def _BlockingChannel___init__(wrapped, instance, args, kwargs): + @wrapt.patch_function_wrapper( + "pika.adapters.blocking_connection", "BlockingChannel.__init__" + ) + def _BlockingChannel___init__( + wrapped: Callable[ + ..., pika.adapters.blocking_connection.BlockingChannel.__init__ + ], + instance: pika.adapters.blocking_connection.BlockingChannel, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: ret = wrapped(*args, **kwargs) - impl = getattr(instance, '_impl', None) + impl = getattr(instance, "_impl", None) - if impl and hasattr(impl.basic_consume, '__wrapped__'): + if impl and hasattr(impl.basic_consume, "__wrapped__"): impl.basic_consume = impl.basic_consume.__wrapped__ return ret - - wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana) - wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_consume', basic_get_with_instana) + wrapt.wrap_function_wrapper( + "pika.channel", "Channel.basic_get", basic_get_with_instana + ) + wrapt.wrap_function_wrapper( + "pika.channel", "Channel.basic_consume", basic_get_with_instana + ) logger.debug("Instrumenting pika") except ImportError: diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py index 887f4bf4..093c36cd 100644 --- a/tests/clients/test_pika.py +++ b/tests/clients/test_pika.py @@ -1,328 +1,177 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2021 -import unittest import threading import time +from typing import Generator, Optional -import pika import mock +import pika +import pika.adapters.blocking_connection +import pika.channel +import pika.spec +import pytest from instana.singletons import agent, tracer -class _TestPika(unittest.TestCase): +class _TestPika: @staticmethod - @mock.patch('pika.connection.Connection') - def _create_connection(connection_class_mock=None): + @mock.patch("pika.connection.Connection") + def _create_connection(connection_class_mock=None) -> object: return connection_class_mock() - def _create_obj(self): + def _create_obj(self) -> NotImplementedError: raise NotImplementedError() - def setUp(self): - self.recorder = tracer.recorder + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """SetUp and TearDown""" + # setup + # Clear all spans before a test run + self.recorder = tracer.span_processor self.recorder.clear_spans() self.connection = self._create_connection() self._on_openok_callback = mock.Mock() self.obj = self._create_obj() - - def tearDown(self): + yield + # teardown del self.connection del self._on_openok_callback del self.obj + # Ensure that allow_exit_as_root has the default value agent.options.allow_exit_as_root = False -class TestPikaChannel(_TestPika): - def _create_obj(self): - return pika.channel.Channel(self.connection, 1, self._on_openok_callback) - - @mock.patch('pika.spec.Basic.Publish') - @mock.patch('pika.channel.Channel._send_method') - def test_basic_publish(self, send_method, _unused): - self.obj._set_state(self.obj.OPEN) - - with tracer.start_active_span("testing"): - self.obj.basic_publish("test.exchange", "test.queue", "Hello!") - - spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - - rabbitmq_span = spans[0] - test_span = spans[1] - - self.assertIsNone(tracer.active_span) - - # Same traceId - self.assertEqual(test_span.t, rabbitmq_span.t) - - # Parent relationships - self.assertEqual(rabbitmq_span.p, test_span.s) - - # Error logging - self.assertIsNone(test_span.ec) - self.assertIsNone(rabbitmq_span.ec) - - # Span tags - self.assertEqual("test.exchange", rabbitmq_span.data["rabbitmq"]["exchange"]) - self.assertEqual('publish', rabbitmq_span.data["rabbitmq"]["sort"]) - self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) - self.assertIsNotNone(rabbitmq_span.stack) - self.assertTrue(type(rabbitmq_span.stack) is list) - self.assertGreater(len(rabbitmq_span.stack), 0) - - send_method.assert_called_once_with( - pika.spec.Basic.Publish( - exchange="test.exchange", - routing_key="test.queue"), (pika.spec.BasicProperties(headers={ - "X-INSTANA-T": rabbitmq_span.t, - "X-INSTANA-S": rabbitmq_span.s, - "X-INSTANA-L": "1" - }), b"Hello!")) - - @mock.patch('pika.spec.Basic.Publish') - @mock.patch('pika.channel.Channel._send_method') - def test_basic_publish_as_root_exit_span(self, send_method, _unused): - agent.options.allow_exit_as_root = True - self.obj._set_state(self.obj.OPEN) - self.obj.basic_publish("test.exchange", "test.queue", "Hello!") - - spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) - - rabbitmq_span = spans[0] - - self.assertIsNone(tracer.active_span) +class TestPikaBlockingChannel(_TestPika): + @mock.patch("pika.channel.Channel", spec=pika.channel.Channel) + def _create_obj( + self, channel_impl: mock.MagicMock + ) -> pika.adapters.blocking_connection.BlockingChannel: + self.impl = channel_impl() + self.impl.channel_number = 1 - # Parent relationships - self.assertIsNone(rabbitmq_span.p, None) + return pika.adapters.blocking_connection.BlockingChannel( + self.impl, self.connection + ) - # Error logging - self.assertIsNone(rabbitmq_span.ec) + def _generate_delivery( + self, consumer_tag: str, properties: pika.BasicProperties, body: str + ) -> None: + from pika.adapters.blocking_connection import _ConsumerDeliveryEvt - # Span tags - self.assertEqual("test.exchange", rabbitmq_span.data["rabbitmq"]["exchange"]) - self.assertEqual('publish', rabbitmq_span.data["rabbitmq"]["sort"]) - self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) - self.assertIsNotNone(rabbitmq_span.stack) - self.assertTrue(type(rabbitmq_span.stack) is list) - self.assertGreater(len(rabbitmq_span.stack), 0) + # Wait until queue consumer is initialized + while self.obj._queue_consumer_generator is None: + time.sleep(0.25) - send_method.assert_called_once_with( - pika.spec.Basic.Publish( - exchange="test.exchange", - routing_key="test.queue"), (pika.spec.BasicProperties(headers={ - "X-INSTANA-T": rabbitmq_span.t, - "X-INSTANA-S": rabbitmq_span.s, - "X-INSTANA-L": "1" - }), b"Hello!")) - - @mock.patch('pika.spec.Basic.Publish') - @mock.patch('pika.channel.Channel._send_method') - def test_basic_publish_with_headers(self, send_method, _unused): - self.obj._set_state(self.obj.OPEN) + method = pika.spec.Basic.Deliver(consumer_tag=consumer_tag) + self.obj._on_consumer_generator_event( + _ConsumerDeliveryEvt(method, properties, body) + ) - with tracer.start_active_span("testing"): - self.obj.basic_publish("test.exchange", - "test.queue", - "Hello!", - pika.BasicProperties(headers={ - "X-Custom-1": "test" - })) + def test_consume(self) -> None: + consumed_deliveries = [] - spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + def __consume() -> None: + for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): + # Skip deliveries generated due to inactivity + if delivery is not None and any(delivery): + consumed_deliveries.append(delivery) - rabbitmq_span = spans[0] - test_span = spans[1] + break - send_method.assert_called_once_with( - pika.spec.Basic.Publish( - exchange="test.exchange", - routing_key="test.queue"), (pika.spec.BasicProperties(headers={ - "X-Custom-1": "test", - "X-INSTANA-T": rabbitmq_span.t, - "X-INSTANA-S": rabbitmq_span.s, - "X-INSTANA-L": "1" - }), b"Hello!")) - - @mock.patch('pika.spec.Basic.Get') - def test_basic_get(self, _unused): - self.obj._set_state(self.obj.OPEN) + consumer_tag = "test.consumer" - body = "Hello!" - properties = pika.BasicProperties() + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + self.impl._consumers = {} - method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) - header_frame = pika.frame.Header(1, len(body), properties) + t = threading.Thread(target=__consume) + t.start() - cb = mock.Mock() + self._generate_delivery(consumer_tag, pika.BasicProperties(), "Hello!") - self.obj.basic_get("test.queue", cb) - self.obj._on_getok(method_frame, header_frame, body) + t.join(timeout=5.0) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 rabbitmq_span = spans[0] - self.assertIsNone(tracer.active_span) - # A new span has been started - self.assertIsNotNone(rabbitmq_span.t) - self.assertIsNone(rabbitmq_span.p) - self.assertIsNotNone(rabbitmq_span.s) + assert rabbitmq_span.t + assert not rabbitmq_span.p + assert rabbitmq_span.s # Error logging - self.assertIsNone(rabbitmq_span.ec) + assert not rabbitmq_span.ec # Span tags - self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) - self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) - self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) - self.assertIsNotNone(rabbitmq_span.stack) - self.assertTrue(type(rabbitmq_span.stack) is list) - self.assertGreater(len(rabbitmq_span.stack), 0) - - cb.assert_called_once_with(self.obj, pika.spec.Basic.GetOk, properties, body) - - @mock.patch('pika.spec.Basic.Get') - def test_basic_get_with_trace_context(self, _unused): - self.obj._set_state(self.obj.OPEN) - - body = "Hello!" - properties = pika.BasicProperties(headers={ - "X-INSTANA-T": "0000000000000001", - "X-INSTANA-S": "0000000000000002", - "X-INSTANA-L": "1" - }) - - method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) - header_frame = pika.frame.Header(1, len(body), properties) - - cb = mock.Mock() - - self.obj.basic_get("test.queue", cb) - self.obj._on_getok(method_frame, header_frame, body) - - spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) - - rabbitmq_span = spans[0] - - self.assertIsNone(tracer.active_span) - - # Trace context propagation - self.assertEqual("0000000000000001", rabbitmq_span.t) - self.assertEqual("0000000000000002", rabbitmq_span.p) - - # A new span has been started - self.assertIsNotNone(rabbitmq_span.s) - self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) - - @mock.patch('pika.spec.Basic.Consume') - def test_basic_consume(self, _unused): - self.obj._set_state(self.obj.OPEN) + assert not rabbitmq_span.data["rabbitmq"]["exchange"] + assert rabbitmq_span.data["rabbitmq"]["sort"] == "consume" + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["queue"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 - body = "Hello!" - properties = pika.BasicProperties() - - method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) - header_frame = pika.frame.Header(1, len(body), properties) - - cb = mock.Mock() - - self.obj.basic_consume("test.queue", cb, consumer_tag="test") - self.obj._on_deliver(method_frame, header_frame, body) - - spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) - - rabbitmq_span = spans[0] + assert len(consumed_deliveries) == 1 - self.assertIsNone(tracer.active_span) - - # A new span has been started - self.assertIsNotNone(rabbitmq_span.t) - self.assertIsNone(rabbitmq_span.p) - self.assertIsNotNone(rabbitmq_span.s) + def test_consume_with_trace_context(self) -> None: + consumed_deliveries = [] - # Error logging - self.assertIsNone(rabbitmq_span.ec) + def __consume(): + for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): + # Skip deliveries generated due to inactivity + if delivery is not None and any(delivery): + consumed_deliveries.append(delivery) + break - # Span tags - self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) - self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) - self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) - self.assertIsNotNone(rabbitmq_span.stack) - self.assertTrue(type(rabbitmq_span.stack) is list) - self.assertGreater(len(rabbitmq_span.stack), 0) + consumer_tag = "test.consumer" - cb.assert_called_once_with(self.obj, method_frame.method, properties, body) + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + self.impl._consumers = {} - @mock.patch('pika.spec.Basic.Consume') - def test_basic_consume_with_trace_context(self, _unused): - self.obj._set_state(self.obj.OPEN) + t = threading.Thread(target=__consume) + t.start() - body = "Hello!" - properties = pika.BasicProperties(headers={ + instana_headers = { "X-INSTANA-T": "0000000000000001", "X-INSTANA-S": "0000000000000002", - "X-INSTANA-L": "1" - }) - - method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) - header_frame = pika.frame.Header(1, len(body), properties) + "X-INSTANA-L": "1", + } + self._generate_delivery( + consumer_tag, + pika.BasicProperties(headers=instana_headers), + "Hello!", + ) - cb = mock.Mock() - - self.obj.basic_consume(queue="test.queue", on_message_callback=cb, consumer_tag="test") - self.obj._on_deliver(method_frame, header_frame, body) + t.join(timeout=5.0) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 rabbitmq_span = spans[0] - self.assertIsNone(tracer.active_span) - # Trace context propagation - self.assertEqual("0000000000000001", rabbitmq_span.t) - self.assertEqual("0000000000000002", rabbitmq_span.p) + assert rabbitmq_span.t == int(instana_headers["X-INSTANA-T"]) + assert rabbitmq_span.p == int(instana_headers["X-INSTANA-S"]) # A new span has been started - self.assertIsNotNone(rabbitmq_span.s) - self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + assert rabbitmq_span.s + assert rabbitmq_span.p != rabbitmq_span.s + def test_consume_with_not_GeneratorType(self, mocker) -> None: + mocker.patch( + "instana.instrumentation.pika.isinstance", + return_value=False, + ) -class TestPikaBlockingChannel(_TestPika): - @mock.patch('pika.channel.Channel', spec=pika.channel.Channel) - def _create_obj(self, channel_impl): - self.impl = channel_impl() - self.impl.channel_number = 1 - - return pika.adapters.blocking_connection.BlockingChannel(self.impl, self.connection) - - def _generate_delivery(self, consumer_tag, properties, body): - from pika.adapters.blocking_connection import _ConsumerDeliveryEvt - - # Wait until queue consumer is initialized - while self.obj._queue_consumer_generator is None: - time.sleep(0.25) - - method = pika.spec.Basic.Deliver(consumer_tag=consumer_tag) - self.obj._on_consumer_generator_event(_ConsumerDeliveryEvt(method, properties, body)) - - def test_consume(self): consumed_deliveries = [] - def __consume(): + def __consume() -> None: for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): # Skip deliveries generated due to inactivity if delivery is not None and any(delivery): @@ -344,35 +193,17 @@ def __consume(): t.join(timeout=5.0) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) - - rabbitmq_span = spans[0] - - self.assertIsNone(tracer.active_span) + assert len(spans) == 0 - # A new span has been started - self.assertIsNotNone(rabbitmq_span.t) - self.assertIsNone(rabbitmq_span.p) - self.assertIsNotNone(rabbitmq_span.s) - - # Error logging - self.assertIsNone(rabbitmq_span.ec) + def test_consume_with_any_yielded(self, mocker) -> None: + mocker.patch( + "instana.instrumentation.pika.any", + return_value=False, + ) - # Span tags - self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) - self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) - self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) - self.assertIsNotNone(rabbitmq_span.stack) - self.assertTrue(type(rabbitmq_span.stack) is list) - self.assertGreater(len(rabbitmq_span.stack), 0) - - self.assertEqual(1, len(consumed_deliveries)) - - def test_consume_with_trace_context(self): consumed_deliveries = [] - def __consume(): + def __consume() -> None: for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): # Skip deliveries generated due to inactivity if delivery is not None and any(delivery): @@ -389,51 +220,45 @@ def __consume(): t = threading.Thread(target=__consume) t.start() - self._generate_delivery(consumer_tag, pika.BasicProperties(headers={ - "X-INSTANA-T": "0000000000000001", - "X-INSTANA-S": "0000000000000002", - "X-INSTANA-L": "1" - }), "Hello!") + self._generate_delivery(consumer_tag, pika.BasicProperties(), "Hello!") t.join(timeout=5.0) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) - - rabbitmq_span = spans[0] - - self.assertIsNone(tracer.active_span) - - # Trace context propagation - self.assertEqual("0000000000000001", rabbitmq_span.t) - self.assertEqual("0000000000000002", rabbitmq_span.p) - - # A new span has been started - self.assertIsNotNone(rabbitmq_span.s) - self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + assert len(spans) == 0 class TestPikaBlockingChannelBlockingConnection(_TestPika): - @mock.patch('pika.adapters.blocking_connection.BlockingConnection', autospec=True) - def _create_connection(self, connection=None): + @mock.patch("pika.adapters.blocking_connection.BlockingConnection", autospec=True) + def _create_connection(self, connection: Optional[mock.MagicMock] = None) -> object: connection._impl = mock.create_autospec(pika.connection.Connection) connection._impl.params = pika.connection.Parameters() return connection - @mock.patch('pika.channel.Channel', spec=pika.channel.Channel) - def _create_obj(self, channel_impl): + @mock.patch("pika.channel.Channel", spec=pika.channel.Channel) + def _create_obj( + self, channel_impl: mock.MagicMock + ) -> pika.adapters.blocking_connection.BlockingChannel: self.impl = channel_impl() self.impl.channel_number = 1 - return pika.adapters.blocking_connection.BlockingChannel(self.impl, self.connection) + return pika.adapters.blocking_connection.BlockingChannel( + self.impl, self.connection + ) - def _generate_delivery(self, method, properties, body): + def _generate_delivery( + self, + method: pika.spec.Basic.Deliver, + properties: pika.BasicProperties, + body: str, + ) -> None: from pika.adapters.blocking_connection import _ConsumerDeliveryEvt + evt = _ConsumerDeliveryEvt(method, properties, body) self.obj._add_pending_event(evt) self.obj._dispatch_events() - def test_basic_consume(self): + def test_basic_consume(self) -> None: consumer_tag = "test.consumer" self.impl.basic_consume.return_value = consumer_tag @@ -449,28 +274,26 @@ def test_basic_consume(self): self._generate_delivery(method, properties, body) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 rabbitmq_span = spans[0] - self.assertIsNone(tracer.active_span) - # A new span has been started - self.assertIsNotNone(rabbitmq_span.t) - self.assertIsNone(rabbitmq_span.p) - self.assertIsNotNone(rabbitmq_span.s) + assert rabbitmq_span.t + assert not rabbitmq_span.p + assert rabbitmq_span.s # Error logging - self.assertIsNone(rabbitmq_span.ec) + assert not rabbitmq_span.ec # Span tags - self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) - self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) - self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) - self.assertIsNotNone(rabbitmq_span.stack) - self.assertTrue(type(rabbitmq_span.stack) is list) - self.assertGreater(len(rabbitmq_span.stack), 0) + assert not rabbitmq_span.data["rabbitmq"]["exchange"] + assert rabbitmq_span.data["rabbitmq"]["sort"] == "consume" + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["queue"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 cb.assert_called_once_with(self.obj, method, properties, body) @@ -485,25 +308,320 @@ def test_basic_consume_with_trace_context(self): self.obj.basic_consume(queue="test.queue", on_message_callback=cb) body = "Hello!" - properties = pika.BasicProperties(headers={ + instana_headers = { "X-INSTANA-T": "0000000000000001", "X-INSTANA-S": "0000000000000002", - "X-INSTANA-L": "1" - }) + "X-INSTANA-L": "1", + } + properties = pika.BasicProperties(headers=instana_headers) method = pika.spec.Basic.Deliver(consumer_tag) self._generate_delivery(method, properties, body) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 + + rabbitmq_span = spans[0] + + # Trace context propagation + assert rabbitmq_span.t == int(instana_headers["X-INSTANA-T"]) + assert rabbitmq_span.p == int(instana_headers["X-INSTANA-S"]) + + # A new span has been started + assert rabbitmq_span.s + assert rabbitmq_span.p != rabbitmq_span.s + + +class TestPikaChannel(_TestPika): + def _create_obj(self) -> pika.channel.Channel: + return pika.channel.Channel(self.connection, 1, self._on_openok_callback) + + @mock.patch("pika.spec.Basic.Publish") + @mock.patch("pika.channel.Channel._send_method") + def test_basic_publish(self, send_method, _unused) -> None: + self.obj._set_state(self.obj.OPEN) + + with tracer.start_as_current_span("testing"): + self.obj.basic_publish("test.exchange", "test.queue", "Hello!") + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + rabbitmq_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == rabbitmq_span.t + + # Parent relationships + assert rabbitmq_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not rabbitmq_span.ec + + # Span tags + assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" + assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish" + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + + send_method.assert_called_once_with( + pika.spec.Basic.Publish(exchange="test.exchange", routing_key="test.queue"), + ( + pika.spec.BasicProperties( + headers={ + "X-INSTANA-T": str(rabbitmq_span.t), + "X-INSTANA-S": str(rabbitmq_span.s), + "X-INSTANA-L": "1", + } + ), + b"Hello!", + ), + ) + + @mock.patch("pika.spec.Basic.Publish") + @mock.patch("pika.channel.Channel._send_method") + def test_basic_publish_as_root_exit_span(self, send_method, _unused) -> None: + agent.options.allow_exit_as_root = True + self.obj._set_state(self.obj.OPEN) + self.obj.basic_publish("test.exchange", "test.queue", "Hello!") + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + rabbitmq_span = spans[0] + + # Parent relationships + assert not rabbitmq_span.p + + # Error logging + assert not rabbitmq_span.ec + + # Span tags + assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" + assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish" + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + + send_method.assert_called_once_with( + pika.spec.Basic.Publish(exchange="test.exchange", routing_key="test.queue"), + ( + pika.spec.BasicProperties( + headers={ + "X-INSTANA-T": str(rabbitmq_span.t), + "X-INSTANA-S": str(rabbitmq_span.s), + "X-INSTANA-L": "1", + } + ), + b"Hello!", + ), + ) + + @mock.patch("pika.spec.Basic.Publish") + @mock.patch("pika.channel.Channel._send_method") + def test_basic_publish_with_headers(self, send_method, _unused) -> None: + self.obj._set_state(self.obj.OPEN) + + with tracer.start_as_current_span("testing"): + self.obj.basic_publish( + "test.exchange", + "test.queue", + "Hello!", + pika.BasicProperties(headers={"X-Custom-1": "test"}), + ) + + spans = self.recorder.queued_spans() + assert len(spans) == 2 rabbitmq_span = spans[0] - self.assertIsNone(tracer.active_span) + send_method.assert_called_once_with( + pika.spec.Basic.Publish(exchange="test.exchange", routing_key="test.queue"), + ( + pika.spec.BasicProperties( + headers={ + "X-Custom-1": "test", + "X-INSTANA-T": str(rabbitmq_span.t), + "X-INSTANA-S": str(rabbitmq_span.s), + "X-INSTANA-L": "1", + } + ), + b"Hello!", + ), + ) + + @mock.patch("pika.spec.Basic.Publish") + @mock.patch("pika.channel.Channel._send_method") + def test_basic_publish_tracing_off(self, send_method, _unused, mocker) -> None: + mocker.patch( + "instana.instrumentation.pika.tracing_is_off", + return_value=True, + ) + + self.obj._set_state(self.obj.OPEN) + + with tracer.start_as_current_span("testing"): + self.obj.basic_publish("test.exchange", "test.queue", "Hello!") + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + # Span names are not "rabbitmq" + for span in spans: + assert span.n != "rabbitmq" + + @mock.patch("pika.spec.Basic.Get") + def test_basic_get(self, _unused) -> None: + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.BasicProperties() + + method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_get("test.queue", cb) + self.obj._on_getok(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + rabbitmq_span = spans[0] + + # A new span has been started + assert rabbitmq_span.t + assert not rabbitmq_span.p + assert rabbitmq_span.s + + # Error logging + assert not rabbitmq_span.ec + + # Span tags + assert not rabbitmq_span.data["rabbitmq"]["exchange"] + assert rabbitmq_span.data["rabbitmq"]["sort"] == "consume" + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["queue"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + + cb.assert_called_once_with(self.obj, pika.spec.Basic.GetOk, properties, body) + + @mock.patch("pika.spec.Basic.Get") + def test_basic_get_with_trace_context(self, _unused) -> None: + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + instana_headers = { + "X-INSTANA-T": "0000000000000001", + "X-INSTANA-S": "0000000000000002", + "X-INSTANA-L": "1", + } + properties = pika.BasicProperties(headers=instana_headers) + + method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_get("test.queue", cb) + self.obj._on_getok(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + rabbitmq_span = spans[0] + + # Trace context propagation + assert rabbitmq_span.t == int(instana_headers["X-INSTANA-T"]) + assert rabbitmq_span.p == int(instana_headers["X-INSTANA-S"]) + + # A new span has been started + assert rabbitmq_span.s + assert rabbitmq_span.p != rabbitmq_span.s + + @mock.patch("pika.spec.Basic.Consume") + def test_basic_consume(self, _unused) -> None: + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.BasicProperties() + + method_frame = pika.frame.Method( + 1, pika.spec.Basic.Deliver(consumer_tag="test") + ) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_consume("test.queue", cb, consumer_tag="test") + self.obj._on_deliver(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + rabbitmq_span = spans[0] + + # A new span has been started + assert rabbitmq_span.t + assert not rabbitmq_span.p + assert rabbitmq_span.s + + # Error logging + assert not rabbitmq_span.ec + + # Span tags + assert not rabbitmq_span.data["rabbitmq"]["exchange"] + assert rabbitmq_span.data["rabbitmq"]["sort"] == "consume" + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["queue"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + + cb.assert_called_once_with(self.obj, method_frame.method, properties, body) + + @mock.patch("pika.spec.Basic.Consume") + def test_basic_consume_with_trace_context(self, _unused) -> None: + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + instana_headers = { + "X-INSTANA-T": "0000000000000001", + "X-INSTANA-S": "0000000000000002", + "X-INSTANA-L": "1", + } + properties = pika.BasicProperties(headers=instana_headers) + + method_frame = pika.frame.Method( + 1, pika.spec.Basic.Deliver(consumer_tag="test") + ) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_consume( + queue="test.queue", on_message_callback=cb, consumer_tag="test" + ) + self.obj._on_deliver(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + assert len(spans) == 1 + + rabbitmq_span = spans[0] # Trace context propagation - self.assertEqual("0000000000000001", rabbitmq_span.t) - self.assertEqual("0000000000000002", rabbitmq_span.p) + assert rabbitmq_span.t == int(instana_headers["X-INSTANA-T"]) + assert rabbitmq_span.p == int(instana_headers["X-INSTANA-S"]) # A new span has been started - self.assertIsNotNone(rabbitmq_span.s) - self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + assert rabbitmq_span.s + assert rabbitmq_span.p != rabbitmq_span.s diff --git a/tests/conftest.py b/tests/conftest.py index 660f1245..e544b69d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,7 +38,6 @@ # TODO: remove the following entries as the migration of the instrumentation # codes are finalised. collect_ignore_glob.append("*clients/test_google*") -collect_ignore_glob.append("*clients/test_pika*") collect_ignore_glob.append("*clients/test_sql*") collect_ignore_glob.append("*frameworks/test_celery*")