Skip to content

Commit 555f72e

Browse files
authored
Merge pull request #10 from maxipavlovic/feature/LITE-17018
LITE-17018 Commands for check and sync of deleted objects in Replica
2 parents b101c10 + 3d1fbc7 commit 555f72e

File tree

7 files changed

+467
-0
lines changed

7 files changed

+467
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=
146146
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync
147147
```
148148

149+
* If it's important to check sync and clean up deleted objects within replica service in K8S:
150+
```bash
151+
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_diff_replica --cqrs-id=author |
152+
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_deleted_diff_master |
153+
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_sync_replica
154+
```
155+
149156
Development
150157
===========
151158

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
import sys
4+
5+
import ujson
6+
7+
from django.core.management.base import BaseCommand, CommandError
8+
from django.db import connection
9+
10+
from dj_cqrs.registries import MasterRegistry
11+
12+
13+
GET_NON_EXISTING_PKS_SQL_TEMPLATE = """
14+
SELECT t.pk
15+
FROM (
16+
WITH t0(pk) AS (
17+
VALUES {values}
18+
)
19+
SELECT *
20+
FROM t0
21+
) t
22+
LEFT JOIN {table} m ON m.{pk_field} = t.pk
23+
WHERE m.{pk_field} IS NULL
24+
"""
25+
26+
27+
class Command(BaseCommand):
28+
help = 'Diff of deleted CQRS models pks from master diff stream.'
29+
30+
@classmethod
31+
def serialize_out(cls, package):
32+
return ujson.dumps(package)
33+
34+
@classmethod
35+
def deserialize_in(cls, package_line):
36+
return ujson.loads(package_line)
37+
38+
def handle(self, *args, **options):
39+
with sys.stdin as f:
40+
first_line = f.readline()
41+
model = self._get_model(first_line)
42+
self.stdout.write(first_line.strip())
43+
44+
with connection.cursor() as cursor:
45+
for package_line in f:
46+
master_data = self.deserialize_in(package_line)
47+
48+
sql = GET_NON_EXISTING_PKS_SQL_TEMPLATE.format(
49+
values=','.join(["({})".format(pk) for pk in master_data]),
50+
table=model._meta.db_table,
51+
pk_field=model._meta.pk.attname,
52+
)
53+
54+
cursor.execute(sql)
55+
diff_ids = [r[0] for r in cursor.fetchall()]
56+
if diff_ids:
57+
self.stdout.write(self.serialize_out(diff_ids))
58+
self.stderr.write('PK to delete: {}'.format(str(diff_ids)))
59+
60+
@staticmethod
61+
def _get_model(first_line):
62+
cqrs_id = first_line.split(',')[0]
63+
model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
64+
65+
if not model:
66+
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
67+
68+
return model
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
import ujson
4+
from django.core.exceptions import FieldError
5+
from django.core.management.base import BaseCommand, CommandError
6+
from django.utils.timezone import now
7+
8+
from dj_cqrs.management.commands.utils import batch_qs
9+
from dj_cqrs.registries import ReplicaRegistry
10+
11+
12+
class Command(BaseCommand):
13+
help = 'Streaming diff of CQRS model pks from replica service to check for deleted objects.'
14+
15+
@classmethod
16+
def serialize_package(cls, package):
17+
return ujson.dumps(package)
18+
19+
def add_arguments(self, parser):
20+
parser.add_argument(
21+
'--cqrs-id', '-cid',
22+
help='CQRS_ID of the replica model',
23+
type=str,
24+
required=True,
25+
)
26+
parser.add_argument(
27+
'--filter', '-f',
28+
help='Filter kwargs',
29+
type=str,
30+
default=None,
31+
)
32+
parser.add_argument(
33+
'--batch', '-b',
34+
help='Batch size',
35+
type=int,
36+
default=10000,
37+
)
38+
39+
def handle(self, *args, **options):
40+
model = self._get_model(options)
41+
batch_size = self._get_batch_size(options)
42+
43+
qs = model._default_manager.values().order_by()
44+
if options['filter']:
45+
try:
46+
kwargs = ujson.loads(options['filter'])
47+
if not isinstance(kwargs, dict):
48+
raise ValueError
49+
except ValueError:
50+
raise CommandError('Bad filter kwargs!')
51+
52+
try:
53+
qs = qs.filter(**kwargs)
54+
except FieldError as e:
55+
raise CommandError('Bad filter kwargs! {}'.format(str(e)))
56+
57+
if not qs.exists():
58+
self.stderr.write('No objects found for filter!')
59+
return
60+
61+
current_dt = now()
62+
self.stdout.write('{},{}'.format(model.CQRS_ID, str(current_dt)))
63+
64+
for bqs in batch_qs(qs.values_list('pk', flat=True), batch_size=batch_size):
65+
self.stdout.write(self.serialize_package(list(bqs)))
66+
67+
@staticmethod
68+
def _get_model(options):
69+
cqrs_id = options['cqrs_id']
70+
model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
71+
72+
if not model:
73+
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
74+
75+
return model
76+
77+
@staticmethod
78+
def _get_batch_size(options):
79+
return options['batch']
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
import sys
4+
5+
import ujson
6+
7+
from django.core.management.base import BaseCommand, CommandError
8+
from django.db import DatabaseError
9+
10+
from dj_cqrs.registries import ReplicaRegistry
11+
12+
13+
class Command(BaseCommand):
14+
help = 'Diff for deleted objects synchronizer from CQRS master stream.'
15+
16+
@classmethod
17+
def deserialize_in(cls, package_line):
18+
return ujson.loads(package_line)
19+
20+
def handle(self, *args, **options):
21+
with sys.stdin as f:
22+
first_line = f.readline().strip()
23+
model = self._get_model(first_line)
24+
25+
for pks_line in f:
26+
try:
27+
model._default_manager.filter(
28+
pk__in=self.deserialize_in(pks_line.strip()),
29+
).delete()
30+
except DatabaseError as e:
31+
print(str(e), file=sys.stderr)
32+
33+
@staticmethod
34+
def _get_model(first_line):
35+
cqrs_id = first_line.split(',')[0]
36+
model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
37+
38+
if not model:
39+
raise CommandError('Wrong CQRS ID: {}!'.format(cqrs_id))
40+
41+
return model
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
import sys
4+
from io import StringIO
5+
6+
import pytest
7+
from django.core.management import CommandError, call_command
8+
from django.utils.timezone import now
9+
10+
from tests.dj_master.models import Author
11+
from tests.dj_replica.models import AuthorRef
12+
13+
14+
COMMAND_NAME = 'cqrs_deleted_diff_master'
15+
16+
17+
def test_bad_cqrs_id(mocker):
18+
mocker.patch.object(sys, 'stdin', StringIO('invalid,datetime\n'))
19+
20+
with pytest.raises(CommandError) as e:
21+
call_command(COMMAND_NAME)
22+
23+
assert 'Wrong CQRS ID: invalid!' in str(e)
24+
25+
26+
@pytest.mark.django_db
27+
def test_no_rows(mocker, capsys):
28+
mocker.patch.object(sys, 'stdin', StringIO('author,datetime\n'))
29+
call_command(COMMAND_NAME)
30+
31+
captured = capsys.readouterr()
32+
assert not captured.err
33+
assert 'author,datetime\n' == captured.out
34+
35+
36+
def replica_master_pipe(capsys, mocker, *args):
37+
call_command('cqrs_deleted_diff_replica', '--cqrs-id=author', *args)
38+
captured = capsys.readouterr()
39+
mocker.patch.object(sys, 'stdin', StringIO(captured.out))
40+
call_command(COMMAND_NAME)
41+
return capsys.readouterr()
42+
43+
44+
@pytest.mark.django_db
45+
def test_first_row(capsys, mocker):
46+
AuthorRef.objects.create(name='author', id=1, cqrs_revision=0, cqrs_updated=now())
47+
48+
captured = replica_master_pipe(capsys, mocker)
49+
50+
first_row = captured.out.split('\n')[0]
51+
assert '{},'.format(Author.CQRS_ID) in first_row
52+
53+
54+
@pytest.mark.django_db
55+
def test_all_synced(capsys, mocker):
56+
mocker.patch('dj_cqrs.controller.producer.produce')
57+
58+
Author.objects.create(id=1, name='name')
59+
AuthorRef.objects.create(id=1, name='name', cqrs_revision=0, cqrs_updated=now())
60+
61+
captured = replica_master_pipe(capsys, mocker)
62+
assert not captured.err
63+
64+
65+
@pytest.mark.django_db
66+
def test_sync_for_all(mocker, capsys):
67+
for i in range(3):
68+
AuthorRef.objects.create(name=str(i), id=i, cqrs_revision=i, cqrs_updated=now())
69+
70+
captured = replica_master_pipe(capsys, mocker)
71+
for err_text in ('PK to delete', '0', '1', '2'):
72+
assert err_text in captured.err
73+
74+
second_row = captured.out.split('\n')[1]
75+
assert '[0,1,2]' in second_row
76+
77+
78+
@pytest.mark.django_db
79+
def test_partial_sync(mocker, capsys):
80+
mocker.patch('dj_cqrs.controller.producer.produce')
81+
82+
Author.objects.create(id=1, name='name')
83+
AuthorRef.objects.create(id=1, name='name', cqrs_revision=0, cqrs_updated=now())
84+
AuthorRef.objects.create(id=2, name='2', cqrs_revision=0, cqrs_updated=now())
85+
86+
captured = replica_master_pipe(capsys, mocker)
87+
assert 'PK to delete' in captured.err
88+
assert '2' in captured.err
89+
90+
second_row = captured.out.split('\n')[1]
91+
assert '[2]' in second_row
92+
93+
94+
@pytest.mark.django_db
95+
def test_sync_batch(mocker, capsys):
96+
mocker.patch('dj_cqrs.controller.producer.produce')
97+
98+
for i in range(3):
99+
AuthorRef.objects.create(id=i, name=str(i), cqrs_revision=0, cqrs_updated=now())
100+
Author.objects.create(id=1, name='name')
101+
102+
captured = replica_master_pipe(capsys, mocker, '--batch=1')
103+
for err_text in ('PK to delete', '0', '2'):
104+
assert err_text in captured.err
105+
106+
second_row = captured.out.split('\n')[1]
107+
assert '[0]' in second_row
108+
109+
third_row = captured.out.split('\n')[2]
110+
assert '[2]' in third_row
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright © 2021 Ingram Micro Inc. All rights reserved.
2+
3+
import ujson
4+
5+
import pytest
6+
from django.core.management import CommandError, call_command
7+
from django.utils.timezone import now
8+
9+
from tests.dj_replica.models import AuthorRef
10+
11+
12+
COMMAND_NAME = 'cqrs_deleted_diff_replica'
13+
14+
15+
def test_no_cqrs_id():
16+
with pytest.raises(CommandError):
17+
call_command(COMMAND_NAME)
18+
19+
20+
def test_bad_cqrs_id():
21+
with pytest.raises(CommandError) as e:
22+
call_command(COMMAND_NAME, '--cqrs-id=invalid')
23+
24+
assert 'Wrong CQRS ID: invalid!' in str(e)
25+
26+
27+
@pytest.mark.django_db
28+
def test_first_row(capsys):
29+
AuthorRef.objects.create(name='author', id=1, cqrs_revision=0, cqrs_updated=now())
30+
31+
call_command(COMMAND_NAME, '--cqrs-id=author')
32+
33+
captured = capsys.readouterr()
34+
assert '{},'.format(AuthorRef.CQRS_ID) in captured.out
35+
36+
37+
@pytest.mark.django_db
38+
def test_objects_less_than_batch(capsys):
39+
AuthorRef.objects.create(name='author', id=1, cqrs_revision=0, cqrs_updated=now())
40+
41+
call_command(COMMAND_NAME, '--cqrs-id=author')
42+
43+
captured = capsys.readouterr()
44+
out_lines = captured.out.split('\n')
45+
46+
assert ujson.loads(out_lines[1]) == [1]
47+
48+
49+
@pytest.mark.django_db
50+
def test_objects_more_than_batch(capsys):
51+
for i in range(3):
52+
AuthorRef.objects.create(name=str(i), id=i, cqrs_revision=i, cqrs_updated=now())
53+
54+
call_command(COMMAND_NAME, '--cqrs-id=author', '--batch=2')
55+
56+
captured = capsys.readouterr()
57+
out_lines = captured.out.split('\n')
58+
assert ujson.loads(out_lines[1]) == [0, 1]
59+
assert ujson.loads(out_lines[2]) == [2]
60+
61+
62+
@pytest.mark.django_db
63+
def test_filter_no_objects(capsys):
64+
call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id__in": [1, 2]}')
65+
66+
captured = capsys.readouterr()
67+
assert 'No objects found for filter!' in captured.err
68+
69+
70+
@pytest.mark.django_db
71+
def test_objects_are_filtered(capsys):
72+
for i in range(2):
73+
AuthorRef.objects.create(name=str(i), id=i, cqrs_revision=i, cqrs_updated=now())
74+
75+
call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id__in": [1, 3]}')
76+
77+
captured = capsys.readouterr()
78+
out_lines = captured.out.split('\n')
79+
80+
assert ujson.loads(out_lines[1]) == [1]

0 commit comments

Comments
 (0)