@@ -22,11 +22,49 @@ def __str__(self):
22
22
23
23
class Handler (CommandHandler ):
24
24
25
- def delete (self , instance , object_type ):
26
- if self .options ['dry_run' ]:
27
- self .print_success (f'skipping deletion of { object_type } { instance } ' )
28
- else :
29
- instance .delete ()
25
+ def delete_by_batch (self , queryset , object_type , batch_size ):
26
+ # When many rows are involved, looping over each one is very slow.
27
+ # Running a single delete() against all of them consumes a lot of memory
28
+ # and does not delete anything if it fails mid-way. Hence, we batch.
29
+ #
30
+ # Django's delete() method already batches deletions in groups of 100
31
+ # rows. These batches are wrapped within transactions. It does not seem
32
+ # to be designed to allow disabling the transaction or tweaking the
33
+ # batch size.
34
+ # https://github.com/django/django/blob/main/django/db/models/sql/subqueries.py#L26
35
+ # https://github.com/django/django/blob/main/django/db/models/deletion.py#L454
36
+ # Also, it does not seem to do anything to reduce memory consumption.
37
+ #
38
+ # In our case, we don't need the deletions to be transactional. If we
39
+ # die in the middle, it's fine if some rows are deleted and some are
40
+ # not. We can remove the remaining rows next time we run. That's better
41
+ # than waiting forever, to fail and to have to start from scratch next
42
+ # time.
43
+ type_name = f'stac_api.{ object_type .__name__ } '
44
+ total = queryset .count ()
45
+ deleted_count = 0
46
+ while deleted_count < total :
47
+ # We cannot just call queryset[:batch_size].delete() because DELETE
48
+ # does not support LIMIT/OFFSET. So instead we extract the ids
49
+ # then we'll build a new QuerySet to DELETE them.
50
+ ids = queryset .values ('id' )[:batch_size ]
51
+ expected_deletions = len (ids )
52
+ if expected_deletions == 0 :
53
+ break
54
+ dry_run_prefix = ''
55
+ if self .options ['dry_run' ]:
56
+ dry_run_prefix = '[dry run]: '
57
+ deleted_objs = {}
58
+ actual_deletions = expected_deletions
59
+ else :
60
+ (_ , deleted_objs ) = object_type .objects .filter (id__in = ids ).delete ()
61
+ actual_deletions = deleted_objs .get (type_name , 0 )
62
+ deleted_count += actual_deletions
63
+ self .print_success (
64
+ f'{ dry_run_prefix } Deleted { deleted_count } /{ total } { type_name } .'
65
+ f' In this batch: { actual_deletions } /{ expected_deletions } .'
66
+ f' All objects in this batch: { deleted_objs } .'
67
+ )
30
68
31
69
def _raise_if_too_many_deletions (self , max_deletions , max_deletions_pct , items_count ):
32
70
if items_count > max_deletions :
@@ -42,18 +80,20 @@ def _raise_if_too_many_deletions(self, max_deletions, max_deletions_pct, items_c
42
80
43
81
def run (self ):
44
82
self .print_success ('running command to remove expired items' )
83
+ batch_size = self .options ['batch_size' ]
45
84
min_age_hours = self .options ['min_age_hours' ]
46
85
max_deletions = self .options ['max_deletions' ]
47
86
max_deletions_pct = self .options ['max_deletions_percentage' ]
48
87
self .print_warning (
49
88
f"deleting no more than { max_deletions } or "
50
89
f"{ max_deletions_pct } %% items expired for longer"
51
- f" than { min_age_hours } hours"
90
+ f" than { min_age_hours } hours, { batch_size } at a time "
52
91
)
53
92
54
93
expiration = timezone .now () - timedelta (hours = min_age_hours )
55
94
56
- items = Item .objects .filter (properties_expires__lte = expiration )
95
+ items = Item .objects .filter (properties_expires__lte = expiration
96
+ ).order_by ('properties_expires' )
57
97
items_count = items .count ()
58
98
59
99
self ._raise_if_too_many_deletions (max_deletions , max_deletions_pct , items_count )
@@ -69,8 +109,8 @@ def run(self):
69
109
"WARNING: There were still pending asset uploads for expired items. "
70
110
"These were likely stale, so we aborted them"
71
111
)
72
- self .delete (assets , 'assets' )
73
- self .delete (items , 'items' )
112
+ self .delete_by_batch (assets , Asset , batch_size )
113
+ self .delete_by_batch (items , Item , batch_size )
74
114
75
115
if self .options ['dry_run' ]:
76
116
self .print_success (f'[dry run] would have removed { items_count } expired items' )
@@ -105,6 +145,13 @@ def add_arguments(self, parser: CommandParser) -> None:
105
145
action = 'store_true' ,
106
146
help = 'Simulate deleting items, without actually deleting them'
107
147
)
148
+ default_batch_size = settings .DELETE_EXPIRED_ITEMS_BATCH_SIZE
149
+ parser .add_argument (
150
+ '--batch-size' ,
151
+ type = 'positive_int' ,
152
+ default = default_batch_size ,
153
+ help = f"How many rows to delete at a time ({ default_batch_size } )"
154
+ )
108
155
default_min_age = settings .DELETE_EXPIRED_ITEMS_OLDER_THAN_HOURS
109
156
parser .add_argument (
110
157
'--min-age-hours' ,
0 commit comments