Skip to content

Commit e214de6

Browse files
authored
Merge pull request #30 from cloudblue/lite-18256-requeue-fix
LITE-18058 Fix for requeuing: local replica routing key is used instead of global one
2 parents 517da4b + 027db8f commit e214de6

File tree

5 files changed

+46
-3
lines changed

5 files changed

+46
-3
lines changed

dj_cqrs/controller/consumer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

3+
import copy
34
import logging
45
from contextlib import ExitStack
56

@@ -16,6 +17,7 @@ def consume(payload):
1617
1718
:param dj_cqrs.dataclasses.TransportPayload payload: Consumed payload from master service.
1819
"""
20+
payload = copy.deepcopy(payload)
1921
return route_signal_to_replica_model(
2022
payload.signal_type, payload.cqrs_id, payload.instance_data,
2123
previous_data=payload.previous_data,

dj_cqrs/transport/rabbit_mq.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ def _add_to_dead_letter_queue(cls, channel, payload):
196196
@classmethod
197197
def _requeue_message(cls, channel, delivery_tag, payload):
198198
payload.retries += 1
199+
payload.is_requeue = True
200+
199201
cls.produce(payload)
200202
cls._nack(channel, delivery_tag)
201203
cls.log_requeued(payload)
@@ -230,6 +232,9 @@ def _get_produced_message_routing_key(cls, payload):
230232
elif getattr(payload, 'is_dead_letter', False):
231233
dead_letter_queue_name = cls._get_consumer_settings()[-1]
232234
routing_key = 'cqrs.{}.{}'.format(dead_letter_queue_name, routing_key)
235+
elif getattr(payload, 'is_requeue', False):
236+
queue = cls._get_consumer_settings()[0]
237+
routing_key = 'cqrs.{}.{}'.format(queue, routing_key)
233238

234239
return routing_key
235240

@@ -249,7 +254,7 @@ def _get_consumer_rmq_objects(
249254
for cqrs_id, replica_model in ReplicaRegistry.models.items():
250255
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id)
251256

252-
# Every service must have specific SYNC routes
257+
# Every service must have specific SYNC or requeue routes
253258
channel.queue_bind(
254259
exchange=exchange,
255260
queue=queue_name,

integration_tests/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def clean_rabbit_transport_connection():
3737
@pytest.fixture
3838
def replica_channel(settings):
3939
if current_transport is not RabbitMQTransport:
40-
pytest.skip("Dead letter queue is implemented only for RabbitMQTransport.")
40+
pytest.skip("Replica channel is implemented only for RabbitMQTransport.")
4141

4242
connection = BlockingConnection(
4343
parameters=URLParameters(settings.CQRS['url']),

tests/test_controller.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,31 @@ def test_consumer(mocker):
3232
factory_mock.assert_called_once_with('a', 'b', {}, previous_data={'e': 'f'})
3333

3434

35+
def test_changed_payload_data_during_consume(mocker):
36+
def change_data(*args, **kwargs):
37+
instance_data = args[2]
38+
instance_data['instance_key'] = 'changed instance'
39+
kwargs['previous_data']['previous_key'] = 'changed previous'
40+
41+
factory_mock = mocker.patch(
42+
'dj_cqrs.controller.consumer.route_signal_to_replica_model',
43+
side_effect=change_data,
44+
)
45+
46+
payload = TransportPayload(
47+
SignalType.SAVE,
48+
cqrs_id='b',
49+
instance_data={'instance_key': 'initial instance'},
50+
instance_pk='c',
51+
previous_data={'previous_key': 'initial previous'},
52+
)
53+
consume(payload)
54+
55+
assert factory_mock.call_count == 1
56+
assert payload.instance_data == {'instance_key': 'initial instance'}
57+
assert payload.previous_data == {'previous_key': 'initial previous'}
58+
59+
3560
@pytest.mark.django_db(transaction=True)
3661
def test_route_signal_to_replica_model_with_db(django_assert_num_queries):
3762
with django_assert_num_queries(1):

tests/test_transport/test_rabbit_mq.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,16 @@ def test_get_produced_message_routing_key_dead_letter(settings):
464464
assert routing_key == 'cqrs.dead_letter_replica.CQRS_ID'
465465

466466

467+
def test_get_produced_message_routing_key_requeue(settings):
468+
settings.CQRS['queue'] = 'replica'
469+
payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {}, None)
470+
payload.is_requeue = True
471+
472+
routing_key = PublicRabbitMQTransport.get_produced_message_routing_key(payload)
473+
474+
assert routing_key == 'cqrs.replica.CQRS_ID'
475+
476+
467477
def test_process_delay_messages(mocker, caplog):
468478
channel = mocker.MagicMock()
469479
produce = mocker.patch('dj_cqrs.transport.rabbit_mq.RabbitMQTransport.produce')
@@ -483,6 +493,7 @@ def test_process_delay_messages(mocker, caplog):
483493
produce_payload = produce.call_args[0][0]
484494
assert produce_payload is payload
485495
assert produce_payload.retries == 1
496+
assert getattr(produce_payload, 'is_requeue', False)
486497

487498
assert 'CQRS is requeued: pk = 1 (CQRS_ID)' in caplog.text
488499

0 commit comments

Comments
 (0)