Skip to content

Commit 517da4b

Browse files
authored
Merge pull request #29 from cloudblue/feature/lite-15627-message-retrying
LITE-15627 Message retrying
2 parents b6fe658 + e34d6df commit 517da4b

27 files changed

+1125
-147
lines changed

dj_cqrs/constants.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

33
ALL_BASIC_FIELDS = '__all__'
44

@@ -20,3 +20,9 @@ class SignalType:
2020

2121

2222
NO_QUEUE = 'None'
23+
24+
DEFAULT_DEAD_MESSAGE_TTL = 864000 # 10 days
25+
DEFAULT_DELAY_QUEUE_MAX_SIZE = None # Infinite
26+
DEFAULT_CQRS_MESSAGE_TTL = 86400 # 1 day
27+
DEFAULT_CQRS_MAX_RETRIES = 30
28+
DEFAULT_CQRS_RETRY_DELAY = 2 # seconds

dj_cqrs/dataclasses.py

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

3+
from dateutil.parser import parse as dateutil_parse
4+
from django.utils import timezone
5+
36
from dj_cqrs.correlation import get_correlation_id
7+
from dj_cqrs.utils import get_expires_datetime
48

59

610
class TransportPayload:
@@ -21,17 +25,23 @@ class TransportPayload:
2125
:type previous_data: dict, optional
2226
:param correlation_id: Correlation ID of process, where this payload is used.
2327
:type correlation_id: str, optional
28+
:param retries: Current number of message retries.
29+
:type retries: int, optional
30+
:param expires: Message expiration datetime, infinite if None
31+
:type expires: datetime, optional
2432
"""
2533

2634
def __init__(
27-
self,
28-
signal_type,
29-
cqrs_id,
30-
instance_data,
31-
instance_pk,
32-
queue=None,
33-
previous_data=None,
34-
correlation_id=None,
35+
self,
36+
signal_type,
37+
cqrs_id,
38+
instance_data,
39+
instance_pk,
40+
queue=None,
41+
previous_data=None,
42+
correlation_id=None,
43+
expires=None,
44+
retries=0,
3545
):
3646
self.__signal_type = signal_type
3747
self.__cqrs_id = cqrs_id
@@ -45,6 +55,37 @@ def __init__(
4555
else:
4656
self.__correlation_id = get_correlation_id(signal_type, cqrs_id, instance_pk, queue)
4757

58+
self.__expires = expires
59+
self.__retries = retries
60+
61+
@classmethod
62+
def from_message(cls, dct):
63+
"""Builds payload from message data.
64+
65+
:param dct: Deserialized message body data.
66+
:type dct: dict
67+
:return: TransportPayload instance.
68+
:rtype: TransportPayload
69+
"""
70+
if 'expires' in dct:
71+
expires = dct['expires']
72+
if dct['expires'] is not None:
73+
expires = dateutil_parse(dct['expires'])
74+
else:
75+
# Backward compatibility for old messages otherwise they are infinite by default.
76+
expires = get_expires_datetime()
77+
78+
return cls(
79+
dct['signal_type'],
80+
dct['cqrs_id'],
81+
dct['instance_data'],
82+
dct.get('instance_pk'),
83+
previous_data=dct.get('previous_data'),
84+
correlation_id=dct.get('correlation_id'),
85+
expires=expires,
86+
retries=dct.get('retries') or 0,
87+
)
88+
4889
@property
4990
def signal_type(self):
5091
return self.__signal_type
@@ -73,18 +114,47 @@ def previous_data(self):
73114
def correlation_id(self):
74115
return self.__correlation_id
75116

117+
@property
118+
def expires(self):
119+
return self.__expires
120+
121+
@property
122+
def retries(self):
123+
return self.__retries
124+
125+
@retries.setter
126+
def retries(self, value):
127+
assert value >= 0, "Payload retries field should be 0 or positive integer."
128+
self.__retries = value
129+
76130
def to_dict(self):
77-
"""
78-
Return the payload as a dictionary.
131+
"""Return the payload as a dictionary.
79132
80133
:return: This payload.
81134
:rtype: dict
82135
"""
136+
expires = self.__expires
137+
if expires:
138+
expires = expires.replace(microsecond=0).isoformat()
139+
83140
return {
84141
'signal_type': self.__signal_type,
85142
'cqrs_id': self.__cqrs_id,
86143
'instance_data': self.__instance_data,
87144
'previous_data': self.__previous_data,
88145
'instance_pk': self.__instance_pk,
89146
'correlation_id': self.__correlation_id,
147+
'retries': self.__retries,
148+
'expires': expires,
90149
}
150+
151+
def is_expired(self):
152+
"""Checks if this payload is expired.
153+
154+
:return: True if payload is expired, False otherwise.
155+
:rtype: bool
156+
"""
157+
return (
158+
self.__expires is not None
159+
and self.__expires <= timezone.now()
160+
)

dj_cqrs/delay.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
from queue import PriorityQueue, Full
4+
5+
from django.utils import timezone
6+
7+
8+
class DelayMessage:
9+
"""Delay message.
10+
11+
:param delivery_tag: The server-assigned and channel-specific delivery tag.
12+
:type delivery_tag: int
13+
:param payload: Transport payload.
14+
:type payload: dj_cqrs.dataclasses.TransportPayload
15+
:param eta: Time after which the message should be requeued.
16+
:type eta: datetime.datetime
17+
"""
18+
19+
def __init__(self, delivery_tag, payload, eta):
20+
self.delivery_tag = delivery_tag
21+
self.payload = payload
22+
self.eta = eta
23+
24+
25+
class DelayQueue:
26+
"""Queue for delay messages."""
27+
28+
def __init__(self, max_size=None):
29+
if max_size is not None:
30+
assert max_size > 0, "Delay queue max_size should be positive integer."
31+
32+
self._max_size = max_size
33+
self._queue = PriorityQueue()
34+
35+
def get(self):
36+
"""
37+
:rtype: DelayMessage
38+
"""
39+
*_, delay_message = self._queue.get()
40+
return delay_message
41+
42+
def get_ready(self):
43+
"""Returns messages with expired ETA.
44+
45+
:return: delayed messages generator
46+
:rtype: typing.Generator[DelayMessage]
47+
"""
48+
while self.qsize():
49+
delay_message = self.get()
50+
if delay_message.eta > timezone.now():
51+
# Queue is ordered by message ETA.
52+
# Remaining messages should wait longer, we don't check them.
53+
self.put(delay_message)
54+
break
55+
56+
yield delay_message
57+
58+
def put(self, delay_message):
59+
"""Adds message to queue.
60+
61+
:param delay_message: DelayMessage instance.
62+
:type delay_message: DelayMessage
63+
"""
64+
assert isinstance(delay_message, DelayMessage)
65+
if self.full():
66+
raise Full("Delay queue is full")
67+
68+
self._queue.put((
69+
delay_message.eta.timestamp(),
70+
delay_message.delivery_tag,
71+
delay_message,
72+
))
73+
74+
def qsize(self):
75+
return self._queue.qsize()
76+
77+
def full(self):
78+
return (
79+
self._max_size is not None
80+
and self.qsize() >= self._max_size
81+
)

dj_cqrs/mixins.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
22

3+
import logging
4+
35
from django.conf import settings
46
from django.db import router, transaction
57
from django.db.models import DateField, DateTimeField, F, IntegerField, Manager, Model
@@ -8,6 +10,8 @@
810

911
from dj_cqrs.constants import (
1012
ALL_BASIC_FIELDS,
13+
DEFAULT_CQRS_MAX_RETRIES,
14+
DEFAULT_CQRS_RETRY_DELAY,
1115
FIELDS_TRACKER_FIELD_NAME,
1216
TRACKED_FIELDS_ATTR_NAME,
1317
)
@@ -16,6 +20,9 @@
1620
from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update
1721

1822

23+
logger = logging.getLogger('django-cqrs')
24+
25+
1926
class RawMasterMixin(Model):
2027

2128
"""Base class for MasterMixin. **Users shouldn't use this
@@ -404,3 +411,55 @@ def cqrs_delete(cls, master_data):
404411
raise NotImplementedError
405412

406413
return cls.cqrs.delete_instance(master_data)
414+
415+
@staticmethod
416+
def should_retry_cqrs(current_retry, exception=None):
417+
"""Checks if we should retry the message after current attempt.
418+
419+
:param current_retry: Current number of message retries.
420+
:type current_retry: int
421+
:param exception: Exception instance raised during message consume.
422+
:type exception: Exception, optional
423+
:return: True if message should be retried, False otherwise.
424+
:rtype: bool
425+
"""
426+
replica_settings = settings.CQRS.get('replica', {})
427+
if 'CQRS_MAX_RETRIES' in replica_settings and replica_settings['CQRS_MAX_RETRIES'] is None:
428+
# Infinite
429+
return True
430+
431+
min_value = 0
432+
max_retries = replica_settings.get('CQRS_MAX_RETRIES', DEFAULT_CQRS_MAX_RETRIES)
433+
if not isinstance(max_retries, int) or max_retries < min_value:
434+
logger.warning(
435+
"Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s",
436+
max_retries, DEFAULT_CQRS_MAX_RETRIES,
437+
)
438+
max_retries = DEFAULT_CQRS_MAX_RETRIES
439+
440+
return current_retry < max_retries
441+
442+
@staticmethod
443+
def get_cqrs_retry_delay(current_retry):
444+
"""Returns number of seconds to wait before requeuing the message.
445+
446+
:param current_retry: Current number of message retries.
447+
:type current_retry: int
448+
:return: Delay in seconds.
449+
:rtype: int
450+
"""
451+
retry_delay = (
452+
settings.CQRS
453+
.get('replica', {})
454+
.get('CQRS_RETRY_DELAY', DEFAULT_CQRS_RETRY_DELAY)
455+
)
456+
457+
min_value = 1
458+
if not isinstance(retry_delay, int) or retry_delay < min_value:
459+
logger.warning(
460+
"Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s",
461+
retry_delay, DEFAULT_CQRS_RETRY_DELAY,
462+
)
463+
retry_delay = DEFAULT_CQRS_RETRY_DELAY
464+
465+
return retry_delay

dj_cqrs/signals.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from dj_cqrs.controller import producer
88
from dj_cqrs.constants import SignalType
99
from dj_cqrs.dataclasses import TransportPayload
10-
10+
from dj_cqrs.utils import get_expires_datetime
1111

1212
post_bulk_create = Signal(providing_args=['instances', 'using'])
1313
"""
@@ -73,6 +73,7 @@ def post_save(cls, sender, **kwargs):
7373
instance.pk,
7474
queue,
7575
previous_data,
76+
expires=get_expires_datetime(),
7677
)
7778
producer.produce(payload)
7879

@@ -107,7 +108,13 @@ def post_delete(cls, sender, **kwargs):
107108

108109
signal_type = SignalType.DELETE
109110

110-
payload = TransportPayload(signal_type, sender.CQRS_ID, instance_data, instance.pk)
111+
payload = TransportPayload(
112+
signal_type,
113+
sender.CQRS_ID,
114+
instance_data,
115+
instance.pk,
116+
expires=get_expires_datetime(),
117+
)
111118
# Delete is always in transaction!
112119
transaction.on_commit(lambda: producer.produce(payload))
113120

dj_cqrs/transport/kombu.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,19 @@ def produce(cls, payload):
111111
def _consume_message(cls, body, message):
112112
try:
113113
dct = ujson.loads(body)
114-
for key in ('signal_type', 'cqrs_id', 'instance_data'):
115-
if key not in dct:
116-
raise ValueError
117-
118-
if 'instance_pk' not in dct:
119-
logger.warning('CQRS deprecated package structure.')
120-
121114
except ValueError:
122115
logger.error("CQRS couldn't be parsed: {}.".format(body))
123116
message.reject()
124117
return
125118

119+
required_keys = {'instance_pk', 'signal_type', 'cqrs_id', 'instance_data'}
120+
for key in required_keys:
121+
if key not in dct:
122+
msg = "CQRS couldn't proceed, %s isn't found in body: %s."
123+
logger.error(msg, key, body)
124+
message.reject()
125+
return
126+
126127
payload = TransportPayload(
127128
dct['signal_type'],
128129
dct['cqrs_id'],

0 commit comments

Comments
 (0)