3
3
4
4
from django .conf import settings
5
5
from django .core .management .base import CommandParser
6
+ from django .core .paginator import Paginator
6
7
from django .utils import timezone
7
8
8
9
from stac_api .models .general import BaseAssetUpload
@@ -22,11 +23,53 @@ def __str__(self):
22
23
23
24
class Handler (CommandHandler ):
24
25
25
- def delete (self , instance , object_type ):
26
- if self .options ['dry_run' ]:
27
- self .print_success (f'skipping deletion of { object_type } { instance } ' )
28
- else :
29
- instance .delete ()
26
+ def delete_by_batch (self , queryset , object_type , batch_size ):
27
+ # When many rows are involved, looping over each one is very slow.
28
+ # Running a single delete() against all of them consumes a lot of memory
29
+ # and does not delete anything if it fails mid-way. Hence, we batch.
30
+ #
31
+ # Django's delete() method already batches deletions in groups of 100
32
+ # rows. These batches are wrapped within transactions. It does not seem
33
+ # to be designed to allow disabling the transaction or tweaking the
34
+ # batch size.
35
+ # https://github.com/django/django/blob/main/django/db/models/sql/subqueries.py#L26
36
+ # https://github.com/django/django/blob/main/django/db/models/deletion.py#L454
37
+ # Also, it does not seem to do anything to reduce memory consumption.
38
+ #
39
+ # In our case, we don't need the deletions to be transactional. If we
40
+ # die in the middle, it's fine if some rows are deleted and some are
41
+ # not. We can remove the remaining rows next time we run. That's better
42
+ # than waiting forever, to fail and to have to start from scratch next
43
+ # time.
44
+ type_name = f'stac_api.{ object_type .__name__ } '
45
+ deleted_count = 0
46
+ # We delete rows as we iterate over them. This only works if we iterate
47
+ # from the end to the beginning. But we also want to delete the objects
48
+ # in the order of the QuerySet we received. Hence, we first reverse the
49
+ # the QuerySet then we reverse the iterator.
50
+ queryset = queryset .reverse ()
51
+ paginator = Paginator (queryset , batch_size )
52
+ for page_number in reversed (paginator .page_range ):
53
+ page = paginator .page (page_number )
54
+ # We cannot just call page.object_list.delete() because DELETE
55
+ # does not support LIMIT/OFFSET. So instead we extract the ids
56
+ # then we'll build a new QuerySet to DELETE them.
57
+ ids = page .object_list .values ('id' )
58
+ expected_deletions = len (ids )
59
+ dry_run_prefix = ''
60
+ if self .options ['dry_run' ]:
61
+ dry_run_prefix = '[dry run]: '
62
+ deleted_objs = {}
63
+ actual_deletions = expected_deletions
64
+ else :
65
+ (_ , deleted_objs ) = object_type .objects .filter (id__in = ids ).delete ()
66
+ actual_deletions = deleted_objs .get (type_name , 0 )
67
+ deleted_count += actual_deletions
68
+ self .print_success (
69
+ f'{ dry_run_prefix } Deleted { deleted_count } /{ paginator .count } { type_name } .'
70
+ f' In this batch: { actual_deletions } /{ expected_deletions } .'
71
+ f' All objects in this batch: { deleted_objs } .'
72
+ )
30
73
31
74
def _raise_if_too_many_deletions (self , max_deletions , max_deletions_pct , items_count ):
32
75
if items_count > max_deletions :
@@ -42,18 +85,20 @@ def _raise_if_too_many_deletions(self, max_deletions, max_deletions_pct, items_c
42
85
43
86
def run (self ):
44
87
self .print_success ('running command to remove expired items' )
88
+ batch_size = self .options ['batch_size' ]
45
89
min_age_hours = self .options ['min_age_hours' ]
46
90
max_deletions = self .options ['max_deletions' ]
47
91
max_deletions_pct = self .options ['max_deletions_percentage' ]
48
92
self .print_warning (
49
93
f"deleting no more than { max_deletions } or "
50
94
f"{ max_deletions_pct } %% items expired for longer"
51
- f" than { min_age_hours } hours"
95
+ f" than { min_age_hours } hours, { batch_size } at a time "
52
96
)
53
97
54
98
expiration = timezone .now () - timedelta (hours = min_age_hours )
55
99
56
- items = Item .objects .filter (properties_expires__lte = expiration )
100
+ items = Item .objects .filter (properties_expires__lte = expiration
101
+ ).order_by ('properties_expires' )
57
102
items_count = items .count ()
58
103
59
104
self ._raise_if_too_many_deletions (max_deletions , max_deletions_pct , items_count )
@@ -69,8 +114,8 @@ def run(self):
69
114
"WARNING: There were still pending asset uploads for expired items. "
70
115
"These were likely stale, so we aborted them"
71
116
)
72
- self .delete (assets , 'assets' )
73
- self .delete (items , 'items' )
117
+ self .delete_by_batch (assets , Asset , batch_size )
118
+ self .delete_by_batch (items , Item , batch_size )
74
119
75
120
if self .options ['dry_run' ]:
76
121
self .print_success (f'[dry run] would have removed { items_count } expired items' )
@@ -105,6 +150,13 @@ def add_arguments(self, parser: CommandParser) -> None:
105
150
action = 'store_true' ,
106
151
help = 'Simulate deleting items, without actually deleting them'
107
152
)
153
+ default_batch_size = settings .DELETE_EXPIRED_ITEMS_BATCH_SIZE
154
+ parser .add_argument (
155
+ '--batch-size' ,
156
+ type = 'positive_int' ,
157
+ default = default_batch_size ,
158
+ help = f"How many rows to delete at a time ({ default_batch_size } )"
159
+ )
108
160
default_min_age = settings .DELETE_EXPIRED_ITEMS_OLDER_THAN_HOURS
109
161
parser .add_argument (
110
162
'--min-age-hours' ,
0 commit comments