Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:

# send request to external RMQ service, providing the queue where it should respond
properties = BasicProperties(content_type='application/json',
correlation_id=request_id,
#correlation_id=request_id,
headers={'rpc': request_id},
reply_to=self._queue.name)

# add future to dict where callback can retrieve it and set result
Expand All @@ -376,7 +377,8 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:
# block until callback runs (we'll know when the future's result has been changed)
return request_future.result(timeout=self._timeout)
except TimeoutError:
logger.warning('Timed out waiting for response. correlation_id: %s', request_id)
# logger.warning('Timed out waiting for response. correlation_id: %s', request_id)
logger.warning('Timed out waiting for response. rpc request_id: %s', request_id)
self._pending_requests.pop(request_id) # stop tracking request Future
return None
except Exception as exc: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -415,7 +417,8 @@ def _response_callback(
method.routing_key, properties.content_type, str(body, encoding='utf-8'))

# remove future from pending list. we will update result shortly
request_future = self._pending_requests.pop(properties.correlation_id)
# request_future = self._pending_requests.pop(properties.correlation_id)
request_future = self._pending_requests.pop(properties.headers['rpc'])

# messages sent through RabbitMQ Direct reply-to are auto acked
is_direct_reply = str(method.routing_key).startswith(DIRECT_REPLY_QUEUE)
Expand Down
4 changes: 2 additions & 2 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ def test_send_request_works_without_calling_start(rpc_thread: Rpc,
def mock_blocking_publish(*_args, **_kwargs):
# build mock message from imaginary external service
method = Method('', 123)
props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID)
#props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID)
props = BasicProperties(content_type='application/json', headers={'rpc': EXAMPLE_UUID})
body = bytes(json.dumps(example_message), encoding='utf-8')

rpc_thread._response_callback(mock_channel, method, props, body)

monkeypatch.setattr('idsse.common.rabbitmq_utils._blocking_publish',
Expand Down
Loading