Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:

# send request to external RMQ service, providing the queue where it should respond
properties = BasicProperties(content_type='application/json',
correlation_id=request_id,
headers={'rpc': request_id},
reply_to=self._queue.name)

# add future to dict where callback can retrieve it and set result
Expand All @@ -376,7 +376,8 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:
# block until callback runs (we'll know when the future's result has been changed)
return request_future.result(timeout=self._timeout)
except TimeoutError:
logger.warning('Timed out waiting for response. correlation_id: %s', request_id)
# logger.warning('Timed out waiting for response. correlation_id: %s', request_id)
logger.warning('Timed out waiting for response. rpc request_id: %s', request_id)
self._pending_requests.pop(request_id) # stop tracking request Future
return None
except Exception as exc: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -415,7 +416,7 @@ def _response_callback(
method.routing_key, properties.content_type, str(body, encoding='utf-8'))

# remove future from pending list. we will update result shortly
request_future = self._pending_requests.pop(properties.correlation_id)
request_future = self._pending_requests.pop(properties.headers['rpc'])

# messages sent through RabbitMQ Direct reply-to are auto acked
is_direct_reply = str(method.routing_key).startswith(DIRECT_REPLY_QUEUE)
Expand Down
3 changes: 1 addition & 2 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,8 @@ def test_send_request_works_without_calling_start(rpc_thread: Rpc,
def mock_blocking_publish(*_args, **_kwargs):
# build mock message from imaginary external service
method = Method('', 123)
props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID)
props = BasicProperties(content_type='application/json', headers={'rpc': EXAMPLE_UUID})
body = bytes(json.dumps(example_message), encoding='utf-8')

rpc_thread._response_callback(mock_channel, method, props, body)

monkeypatch.setattr('idsse.common.rabbitmq_utils._blocking_publish',
Expand Down
Loading