Skip to content

Commit b6fe658

Browse files
authored
Merge pull request #27 from cloudblue/feature-2/LITE-17209
LITE-17209 Added support for passing of Correlation ID in CQRS payload
2 parents 569866d + fd2ac96 commit b6fe658

File tree

8 files changed

+111
-15
lines changed

8 files changed

+111
-15
lines changed

dj_cqrs/correlation.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
from django.conf import settings
4+
5+
6+
_correlation_function = getattr(settings, 'CQRS', {}).get('master', {}).get('correlation_function')
7+
if _correlation_function and (not callable(_correlation_function)):
8+
raise AttributeError('CQRS correlation_function must be callable.')
9+
10+
11+
def get_correlation_id(signal_type, cqrs_id, instance_pk, queue):
12+
"""
13+
:param signal_type: Type of the signal for this message.
14+
:type signal_type: dj_cqrs.constants.SignalType
15+
:param cqrs_id: The unique CQRS identifier of the model.
16+
:type cqrs_id: str
17+
:param instance_pk: Primary key of the instance.
18+
:param queue: Queue to synchronize, defaults to None
19+
:type queue: str, optional
20+
"""
21+
if _correlation_function:
22+
return _correlation_function(signal_type, cqrs_id, instance_pk, queue)

dj_cqrs/dataclasses.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
from dj_cqrs.correlation import get_correlation_id
24

35

46
class TransportPayload:
@@ -11,23 +13,38 @@ class TransportPayload:
1113
:param instance_data: Serialized data of the instance that
1214
generates the event.
1315
:type instance_data: dict
14-
:param instance_pk: Primary key of the instance
15-
:param queue: Queue to syncronize, defaults to None
16+
:param instance_pk: Primary key of the instance.
17+
:param queue: Queue to synchronize, defaults to None.
1618
:type queue: str, optional
1719
:param previous_data: Previous values for fields tracked for changes,
18-
defaults to None
20+
defaults to None.
1921
:type previous_data: dict, optional
22+
:param correlation_id: Correlation ID of process, where this payload is used.
23+
:type correlation_id: str, optional
2024
"""
2125

22-
def __init__(self, signal_type, cqrs_id, instance_data, instance_pk, queue=None,
23-
previous_data=None):
26+
def __init__(
27+
self,
28+
signal_type,
29+
cqrs_id,
30+
instance_data,
31+
instance_pk,
32+
queue=None,
33+
previous_data=None,
34+
correlation_id=None,
35+
):
2436
self.__signal_type = signal_type
2537
self.__cqrs_id = cqrs_id
2638
self.__instance_data = instance_data
2739
self.__instance_pk = instance_pk
2840
self.__queue = queue
2941
self.__previous_data = previous_data
3042

43+
if correlation_id:
44+
self.__correlation_id = correlation_id
45+
else:
46+
self.__correlation_id = get_correlation_id(signal_type, cqrs_id, instance_pk, queue)
47+
3148
@property
3249
def signal_type(self):
3350
return self.__signal_type
@@ -52,6 +69,10 @@ def queue(self):
5269
def previous_data(self):
5370
return self.__previous_data
5471

72+
@property
73+
def correlation_id(self):
74+
return self.__correlation_id
75+
5576
def to_dict(self):
5677
"""
5778
Return the payload as a dictionary.
@@ -65,4 +86,5 @@ def to_dict(self):
6586
'instance_data': self.__instance_data,
6687
'previous_data': self.__previous_data,
6788
'instance_pk': self.__instance_pk,
89+
'correlation_id': self.__correlation_id,
6890
}

dj_cqrs/transport/kombu.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

33
import logging
44

@@ -124,8 +124,12 @@ def _consume_message(cls, body, message):
124124
return
125125

126126
payload = TransportPayload(
127-
dct['signal_type'], dct['cqrs_id'], dct['instance_data'], dct.get('instance_pk'),
127+
dct['signal_type'],
128+
dct['cqrs_id'],
129+
dct['instance_data'],
130+
dct.get('instance_pk'),
128131
previous_data=dct.get('previous_data'),
132+
correlation_id=dct.get('correlation_id'),
129133
)
130134

131135
cls.log_consumed(payload)

dj_cqrs/transport/rabbit_mq.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

33
import logging
44
import time
@@ -108,8 +108,12 @@ def _consume_message(cls, ch, method, properties, body):
108108
return
109109

110110
payload = TransportPayload(
111-
dct['signal_type'], dct['cqrs_id'], dct['instance_data'], dct.get('instance_pk'),
111+
dct['signal_type'],
112+
dct['cqrs_id'],
113+
dct['instance_data'],
114+
dct.get('instance_pk'),
112115
previous_data=dct.get('previous_data'),
116+
correlation_id=dct.get('correlation_id'),
113117
)
114118

115119
cls.log_consumed(payload)

tests/test_controller.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

33
import pytest
44

@@ -19,6 +19,7 @@ def test_producer(mocker):
1919
'instance_data': {},
2020
'instance_pk': 'c',
2121
'previous_data': {'e': 'f'},
22+
'correlation_id': None,
2223
}
2324

2425

tests/test_correlation.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
from importlib import import_module, reload
4+
5+
import pytest
6+
7+
from dj_cqrs.correlation import get_correlation_id
8+
9+
10+
def test_default_correlation():
11+
assert get_correlation_id(None, None, None, None) is None
12+
13+
14+
def test_wrong_correlation_type_in_settings(settings):
15+
previous_cqrs_settings = settings.CQRS
16+
settings.CQRS = {'master': {'correlation_function': 1}}
17+
18+
with pytest.raises(AttributeError) as e:
19+
reload(import_module('dj_cqrs.correlation'))
20+
21+
assert str(e.value) == 'CQRS correlation_function must be callable.'
22+
23+
settings.CQRS = previous_cqrs_settings
24+
reload(import_module('dj_cqrs.correlation'))
25+
26+
27+
def test_custom_correlation(settings):
28+
previous_cqrs_settings = settings.CQRS
29+
settings.CQRS = {'master': {'correlation_function': lambda *args: '1q2w3e'}}
30+
31+
reload(import_module('dj_cqrs.correlation'))
32+
assert get_correlation_id(None, None, None, None) == '1q2w3e'
33+
34+
settings.CQRS = previous_cqrs_settings
35+
reload(import_module('dj_cqrs.correlation'))

tests/test_transport/test_kombu.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

33
import logging
44
import ujson
@@ -146,6 +146,7 @@ def test_produce_message_ok(mocker):
146146
'instance_data': {},
147147
'instance_pk': 'id',
148148
'previous_data': {'e': 'f'},
149+
'correlation_id': None,
149150
}
150151

151152
assert prepare_message_args[2] == 'text/plain'
@@ -174,6 +175,7 @@ def test_produce_sync_message_no_queue(mocker):
174175
'instance_data': {},
175176
'instance_pk': None,
176177
'previous_data': None,
178+
'correlation_id': None,
177179
}
178180
assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
179181

@@ -195,6 +197,7 @@ def test_produce_sync_message_queue(mocker):
195197
'instance_data': {},
196198
'instance_pk': 'id',
197199
'previous_data': None,
200+
'correlation_id': None,
198201
}
199202
assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id'
200203

@@ -206,7 +209,7 @@ def test_consume_message_ack(mocker, caplog):
206209

207210
PublicKombuTransport.consume_message(
208211
'{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
209-
'"instance_pk":1, "previous_data":{}}',
212+
'"instance_pk":1, "previous_data":{}, "correlation_id":"zyx"}',
210213
message_mock,
211214
)
212215

@@ -219,6 +222,7 @@ def test_consume_message_ack(mocker, caplog):
219222
assert payload.instance_data == {}
220223
assert payload.previous_data == {}
221224
assert payload.pk == 1
225+
assert payload.correlation_id == 'zyx'
222226

223227
assert 'CQRS is received: pk = 1 (cqrs_id).' in caplog.text
224228
assert 'CQRS is applied: pk = 1 (cqrs_id).' in caplog.text

tests/test_transport/test_rabbit_mq.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

33
import logging
44
import ujson
@@ -182,6 +182,7 @@ def test_produce_message_ok(mocker):
182182
'instance_data': {},
183183
'instance_pk': 'id',
184184
'previous_data': {'e': 'f'},
185+
'correlation_id': None,
185186
}
186187
assert basic_publish_kwargs['exchange'] == 'exchange'
187188
assert basic_publish_kwargs['mandatory']
@@ -204,6 +205,7 @@ def test_produce_sync_message_no_queue(mocker):
204205
'instance_data': {},
205206
'instance_pk': None,
206207
'previous_data': None,
208+
'correlation_id': None,
207209
}
208210
assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
209211

@@ -222,6 +224,7 @@ def test_produce_sync_message_queue(mocker):
222224
'instance_data': {},
223225
'instance_pk': 'id',
224226
'previous_data': None,
227+
'correlation_id': None,
225228
}
226229
assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id'
227230

@@ -259,7 +262,7 @@ def test_consume_message_ack(mocker, caplog):
259262
mocker.MagicMock(),
260263
None,
261264
'{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
262-
'"instance_pk":1, "previous_data":{}}',
265+
'"instance_pk":1, "previous_data":{}, "correlation_id":"abc"}',
263266
)
264267

265268
assert consumer_mock.call_count == 1
@@ -270,6 +273,7 @@ def test_consume_message_ack(mocker, caplog):
270273
assert payload.instance_data == {}
271274
assert payload.previous_data == {}
272275
assert payload.pk == 1
276+
assert payload.correlation_id == 'abc'
273277

274278
assert 'CQRS is received: pk = 1 (cqrs_id).' in caplog.text
275279
assert 'CQRS is applied: pk = 1 (cqrs_id).' in caplog.text

0 commit comments

Comments
 (0)