From 94a76766309602b2109ab2f4626e6e1cb292cd84 Mon Sep 17 00:00:00 2001 From: Varsha GS Date: Mon, 22 Sep 2025 11:39:49 +0530 Subject: [PATCH 1/2] fix(aio-pika): implement `_bind_args` method to fetch values from both args and kwargs Signed-off-by: Varsha GS --- src/instana/instrumentation/aio_pika.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/instana/instrumentation/aio_pika.py b/src/instana/instrumentation/aio_pika.py index ef16dfa9..2dcb6ad3 100644 --- a/src/instana/instrumentation/aio_pika.py +++ b/src/instana/instrumentation/aio_pika.py @@ -43,18 +43,26 @@ async def publish_with_instana( ) -> Optional["ConfirmationFrameType"]: if tracing_is_off(): return await wrapped(*args, **kwargs) - + tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + def _bind_args( + message: Type["AbstractMessage"], + routing_key: str, + *args: object, + **kwargs: object, + ) -> Tuple[object, ...]: + return (message, routing_key, args, kwargs) + + (message, routing_key, args, kwargs) = _bind_args( + *args, **kwargs + ) + with tracer.start_as_current_span( "rabbitmq", span_context=parent_context ) as span: connection = instance.channel._connection - message = kwargs["message"] if kwargs.get("message") else args[0] - routing_key = ( - kwargs["routing_key"] if kwargs.get("routing_key") else args[1] - ) _extract_span_attributes( span, connection, "publish", routing_key, instance.name @@ -66,6 +74,9 @@ async def publish_with_instana( message.properties.headers, disable_w3c_trace_context=True, ) + + args = (message, routing_key) + args + try: response = await wrapped(*args, **kwargs) except Exception as exc: From 40cc8d9c872c7c2828a9971e49494e853f5e684c Mon Sep 17 00:00:00 2001 From: Varsha GS Date: Wed, 24 Sep 2025 14:45:58 +0530 Subject: [PATCH 2/2] test(aio-pika): verify publish works with an empty `routing_key` Signed-off-by: Varsha GS --- src/instana/instrumentation/aio_pika.py | 2 +- tests/clients/test_aio_pika.py | 55 +++++++++---------------- 2 files changed, 20 insertions(+), 37 deletions(-) diff --git a/src/instana/instrumentation/aio_pika.py b/src/instana/instrumentation/aio_pika.py index 2dcb6ad3..a47e09f7 100644 --- a/src/instana/instrumentation/aio_pika.py +++ b/src/instana/instrumentation/aio_pika.py @@ -43,7 +43,7 @@ async def publish_with_instana( ) -> Optional["ConfirmationFrameType"]: if tracing_is_off(): return await wrapped(*args, **kwargs) - + tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None diff --git a/tests/clients/test_aio_pika.py b/tests/clients/test_aio_pika.py index 4071e568..20e97618 100644 --- a/tests/clients/test_aio_pika.py +++ b/tests/clients/test_aio_pika.py @@ -56,6 +56,9 @@ async def publish_message(self, params_combination: str = "both_args") -> None: elif params_combination == "arg_kwarg": args = (message,) kwargs = {"routing_key": queue_name} + elif params_combination == "arg_kwarg_empty_key": + args = (message,) + kwargs = {"routing_key": ""} else: # params_combination == "both_args" args = (message, queue_name) @@ -102,6 +105,15 @@ async def on_message(msg): await queue.consume(on_message) await asyncio.sleep(1) # Wait to ensure the message is processed + def assert_span_info(self, rabbitmq_span: "ReadableSpan", sort: str, key: str = "test.queue") -> None: + assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" + assert rabbitmq_span.data["rabbitmq"]["sort"] == sort + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["key"] == key + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + @pytest.mark.parametrize( "params_combination", ["both_args", "both_kwargs", "arg_kwarg"], @@ -127,13 +139,8 @@ def test_basic_publish(self, params_combination) -> None: assert not rabbitmq_span.ec # Span attributes - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish" - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 + key = "" if params_combination == "arg_kwarg_empty_key" else self.queue_name + self.assert_span_info(rabbitmq_span, "publish", key) def test_basic_publish_as_root_exit_span(self) -> None: agent.options.allow_exit_as_root = True @@ -151,13 +158,7 @@ def test_basic_publish_as_root_exit_span(self) -> None: assert not rabbitmq_span.ec # Span attributes - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish" - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 + self.assert_span_info(rabbitmq_span, "publish") @pytest.mark.parametrize( "connect_method", @@ -189,17 +190,8 @@ def test_basic_consume(self, connect_method) -> None: assert not test_span.ec # Span attributes - def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None: - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == sort - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 - - assert_span_info(rabbitmq_publisher_span, "publish") - assert_span_info(rabbitmq_consumer_span, "consume") + self.assert_span_info(rabbitmq_publisher_span, "publish") + self.assert_span_info(rabbitmq_consumer_span, "consume") @pytest.mark.parametrize( "connect_method", @@ -231,14 +223,5 @@ def test_consume_with_exception(self, connect_method) -> None: assert not test_span.ec # Span attributes - def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None: - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == sort - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 - - assert_span_info(rabbitmq_publisher_span, "publish") - assert_span_info(rabbitmq_consumer_span, "consume") + self.assert_span_info(rabbitmq_publisher_span, "publish") + self.assert_span_info(rabbitmq_consumer_span, "consume")