diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 8887829c..f0eb8b97 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -359,7 +359,7 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None: # send request to external RMQ service, providing the queue where it should respond properties = BasicProperties(content_type='application/json', - correlation_id=request_id, + headers={'rpc': request_id}, reply_to=self._queue.name) # add future to dict where callback can retrieve it and set result @@ -376,7 +376,8 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None: # block until callback runs (we'll know when the future's result has been changed) return request_future.result(timeout=self._timeout) except TimeoutError: - logger.warning('Timed out waiting for response. correlation_id: %s', request_id) + # logger.warning('Timed out waiting for response. correlation_id: %s', request_id) + logger.warning('Timed out waiting for response. rpc request_id: %s', request_id) self._pending_requests.pop(request_id) # stop tracking request Future return None except Exception as exc: # pylint: disable=broad-exception-caught @@ -415,7 +416,7 @@ def _response_callback( method.routing_key, properties.content_type, str(body, encoding='utf-8')) # remove future from pending list. we will update result shortly - request_future = self._pending_requests.pop(properties.correlation_id) + request_future = self._pending_requests.pop(properties.headers['rpc']) # messages sent through RabbitMQ Direct reply-to are auto acked is_direct_reply = str(method.routing_key).startswith(DIRECT_REPLY_QUEUE) diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index 5effb30c..c1163db7 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -296,9 +296,8 @@ def test_send_request_works_without_calling_start(rpc_thread: Rpc, def mock_blocking_publish(*_args, **_kwargs): # build mock message from imaginary external service method = Method('', 123) - props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID) + props = BasicProperties(content_type='application/json', headers={'rpc': EXAMPLE_UUID}) body = bytes(json.dumps(example_message), encoding='utf-8') - rpc_thread._response_callback(mock_channel, method, props, body) monkeypatch.setattr('idsse.common.rabbitmq_utils._blocking_publish',