Skip to content

Commit 579494f

Browse files
committed
fix: Move set_data_type outside transaction, to avoid locking collection row.
The logic was also incorrect, in that we should never store the data format if the format is unsupported or the file is empty.
1 parent 2616aa0 commit 579494f

File tree

5 files changed

+68
-56
lines changed

5 files changed

+68
-56
lines changed

process/exceptions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ class InvalidFormError(KingfisherProcessError, ValueError):
1111

1212

1313
class EmptyFormatError(KingfisherProcessError):
14-
"""Raised if a collection file's data type contains no data."""
14+
"""Raised if a collection file contains no data."""
1515

1616

1717
class UnsupportedFormatError(KingfisherProcessError):
18-
"""Raised if a collection file's data type is not supported."""
18+
"""Raised if a collection file's format is unsupported."""

process/management/commands/file_worker.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
create_note,
3333
create_step,
3434
decorator,
35+
delete_step,
3536
deleting_step,
3637
get_or_create,
3738
)
@@ -64,15 +65,9 @@ def handle(self, *args, **options):
6465
def finish(collection_id, collection_file_id, exception):
6566
# If a duplicate message is received causing an IntegrityError or similar, we still want to create the next step,
6667
# in case it was not created the first time. deleting_step() will delete any duplicate steps.
67-
if settings.ENABLE_CHECKER and not isinstance(
68-
exception,
69-
# See the try/except block in the callback() function of the file_worker worker.
70-
EmptyFormatError
71-
| FileNotFoundError
72-
| UnknownFormatError
73-
| UnsupportedFormatError
74-
| ijson.common.IncompleteJSONError,
75-
):
68+
#
69+
# See the try/except block in the callback() function of the file_worker worker.
70+
if settings.ENABLE_CHECKER and not isinstance(exception, FileNotFoundError | ijson.common.IncompleteJSONError):
7671
create_step(ProcessingStep.Name.CHECK, collection_id, collection_file_id=collection_file_id)
7772

7873

@@ -87,6 +82,25 @@ def callback(client_state, channel, method, properties, input_message):
8782
return
8883

8984
try:
85+
# Detect and save the data_type before the transaction, to avoid locking collection rows during process_file().
86+
#
87+
# NOTE: Kingfisher Process assumes a single format for all collection files. Unknown, unsupported, or empty
88+
# formats are detected only for the first collection files processed.
89+
try:
90+
set_data_type(collection, collection_file)
91+
except (UnknownFormatError, UnsupportedFormatError): # UnknownFormatError is raised by detect_format()
92+
logger.exception("Source %s yields an unknown or unsupported format, skipping", collection.source_id)
93+
delete_step(ProcessingStep.Name.LOAD, collection_file_id=collection_file_id)
94+
create_note(collection, ERROR, f"Source {collection.source_id} yields unknown format", data=input_message)
95+
nack(client_state, channel, method.delivery_tag, requeue=False)
96+
return
97+
except EmptyFormatError as e:
98+
# Don't log a message, since sources with empty packages also have non-empty packages.
99+
delete_step(ProcessingStep.Name.LOAD, collection_file_id=collection_file_id)
100+
create_note(collection, CollectionNote.Level.WARNING, str(e), data=input_message)
101+
nack(client_state, channel, method.delivery_tag, requeue=False)
102+
return
103+
90104
for attempt in range(1, MAX_ATTEMPTS + 1):
91105
try:
92106
with (
@@ -113,26 +127,18 @@ def callback(client_state, channel, method, properties, input_message):
113127
publish(client_state, channel, message, routing_key)
114128

115129
if upgraded_collection_file_id:
116-
# The deleting_step() context manager sets upgraded_collection_file_id only if successful, so we don't need
117-
# to create this step in the finish() function.
130+
# The deleting_step() context manager sets upgraded_collection_file_id only if successful, so we can create
131+
# this step here instead of in the finish() function.
118132
if settings.ENABLE_CHECKER:
119133
create_step(ProcessingStep.Name.CHECK, collection_id, collection_file_id=upgraded_collection_file_id)
120134

121135
message = {"collection_id": collection_id, "collection_file_id": upgraded_collection_file_id}
122136
publish(client_state, channel, message, routing_key)
123-
# "Expected" errors.
124-
except EmptyFormatError as e: # raised by process_file()
125-
create_note(collection, CollectionNote.Level.WARNING, str(e), data=input_message)
126-
nack(client_state, channel, method.delivery_tag, requeue=False)
127137
# Irrecoverable errors. Discard the message to allow other messages to be processed.
128138
except FileNotFoundError: # raised by detect_format() or open()
129139
logger.exception("%s has disappeared, skipping", collection_file.filename)
130140
create_note(collection, ERROR, f"{collection_file.filename} has disappeared", data=input_message)
131141
nack(client_state, channel, method.delivery_tag, requeue=False)
132-
except (UnknownFormatError, UnsupportedFormatError): # raised by detect_format() or process_file()
133-
logger.exception("Source %s yields an unknown or unsupported format, skipping", collection.source_id)
134-
create_note(collection, ERROR, f"Source {collection.source_id} yields unknown format", data=input_message)
135-
nack(client_state, channel, method.delivery_tag, requeue=False)
136142
except ijson.common.IncompleteJSONError: # raised by ijson.parse()
137143
logger.exception("Source %s yields invalid JSON, skipping", collection.source_id)
138144
create_note(collection, ERROR, f"Source {collection.source_id} yields invalid JSON", data=input_message)
@@ -151,20 +157,9 @@ def process_file(collection_file) -> int | None:
151157
:param collection_file: collection file for which should be releases checked
152158
:returns: upgraded collection file id or None (if there is no upgrade planned)
153159
"""
154-
data_type = _get_data_type(collection_file)
155-
156-
data_format = data_type["format"]
157-
158-
# https://github.com/open-contracting/kingfisher-collect/issues/1012
159-
if data_format == Format.empty_package:
160-
raise EmptyFormatError(f"Empty format '{data_format}' for file {collection_file}.")
161-
if data_format not in SUPPORTED_FORMATS:
162-
raise UnsupportedFormatError(
163-
f"Unsupported format '{data_format}' for file {collection_file}. "
164-
f"Must be one of: {', '.join(sorted(SUPPORTED_FORMATS))}."
165-
)
160+
data_type = collection_file.collection.data_type
166161

167-
if data_format == Format.compiled_release:
162+
if data_type["format"] == Format.compiled_release:
168163
package = None
169164
else:
170165
package = _read_package_data_from_file(collection_file.filename, data_type)
@@ -188,12 +183,21 @@ def process_file(collection_file) -> int | None:
188183
return None
189184

190185

191-
def _get_data_type(collection_file):
192-
collection = collection_file.collection
186+
def set_data_type(collection, collection_file):
193187
if not collection.data_type:
194188
detected_format, is_concatenated, is_array = detect_format(
195189
collection_file.filename, additional_prefixes=("extensions",)
196190
)
191+
192+
# https://github.com/open-contracting/kingfisher-collect/issues/1012
193+
if detected_format == Format.empty_package:
194+
raise EmptyFormatError(f"Empty format '{detected_format}' for file {collection_file}.")
195+
if detected_format not in SUPPORTED_FORMATS:
196+
raise UnsupportedFormatError(
197+
f"Unsupported format '{detected_format}' for file {collection_file}. "
198+
f"Must be one of: {', '.join(sorted(SUPPORTED_FORMATS))}."
199+
)
200+
197201
data_type = {
198202
"format": detected_format,
199203
"concatenated": is_concatenated,
@@ -207,8 +211,6 @@ def _get_data_type(collection_file):
207211
upgraded_collection.data_type = data_type
208212
upgraded_collection.save(update_fields=["data_type"])
209213

210-
return collection.data_type
211-
212214

213215
class ControlCodesFilter:
214216
def __init__(self, file):

process/management/commands/finisher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ def callback(client_state, channel, method, properties, input_message):
4949
return
5050

5151
if completable(collection):
52-
# COUNT first, before any UPDATE, to avoid locking rows in between UPDATEs.
52+
# COUNT first, before any UPDATE, to avoid locking the collection row in between UPDATEs.
5353
counts = _count_releases_and_records(collection)
54+
# If not for the lock, we'd COUNT only if `updated`, to save time on simultaneous messages.
5455
if upgraded_collection := collection.get_upgraded_collection():
5556
upgraded_collection_counts = _count_releases_and_records(upgraded_collection)
5657
else:

process/util.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,11 @@
1010
import simplejson as json
1111
from django.conf import settings
1212
from django.db import IntegrityError, connections, transaction
13-
from ocdskit.exceptions import UnknownFormatError
1413
from yapw.clients import AsyncConsumer, Blocking
1514
from yapw.decorators import decorate
1615
from yapw.methods import add_callback_threadsafe, nack
1716

18-
from process.exceptions import AlreadyExists, EmptyFormatError, InvalidFormError, UnsupportedFormatError
17+
from process.exceptions import AlreadyExists, InvalidFormError
1918
from process.models import Collection, CollectionFile, CollectionNote, ProcessingStep, Record
2019

2120
logger = logging.getLogger(__name__)
@@ -131,10 +130,7 @@ def deleting_step(*args, **kwargs):
131130
InvalidFormError,
132131
IntegrityError,
133132
# See the try/except block in the callback() function of the file_worker worker.
134-
EmptyFormatError,
135133
FileNotFoundError,
136-
UnknownFormatError,
137-
UnsupportedFormatError,
138134
ijson.common.IncompleteJSONError,
139135
) as exception:
140136
delete_step(*args, **kwargs, exception=exception)

tests/processors/test_process_file.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from ocdskit.exceptions import UnknownFormatError
33

44
from process.exceptions import EmptyFormatError, UnsupportedFormatError
5-
from process.management.commands.file_worker import process_file
5+
from process.management.commands.file_worker import process_file, set_data_type
66
from process.models import CollectionFile, PackageData, Release
77
from tests.fixtures import collection
88

@@ -12,45 +12,58 @@ def test_empty_format(self):
1212
source = collection()
1313
source.save()
1414

15+
collection_file = CollectionFile(collection=source, filename="tests/fixtures/detect-format_empty.json")
16+
1517
with self.assertRaisesRegex(
1618
EmptyFormatError,
1719
r"^Empty format 'empty package' for file tests/fixtures/detect-format_empty\.json \(id: {id}\)\.$",
1820
):
19-
process_file(CollectionFile(collection=source, filename="tests/fixtures/detect-format_empty.json"))
21+
set_data_type(source, collection_file)
2022

21-
self.assertEqual(source.data_type, {"array": False, "format": "empty package", "concatenated": False})
23+
self.assertEqual(source.data_type, {})
2224

2325
def test_unsupported_format(self):
2426
source = collection()
2527
source.save()
2628

29+
collection_file = CollectionFile(collection=source, filename="tests/fixtures/detect-format_versioned.json")
30+
2731
with self.assertRaisesRegex(
2832
UnsupportedFormatError,
2933
r"^Unsupported format 'versioned release' for file tests/fixtures/detect-format_versioned.json "
3034
r"\(id: {id}\). Must be one of: compiled release, record package, release package\.$",
3135
):
32-
process_file(CollectionFile(collection=source, filename="tests/fixtures/detect-format_versioned.json"))
36+
set_data_type(source, collection_file)
3337

34-
self.assertEqual(source.data_type, {"array": False, "format": "versioned release", "concatenated": False})
38+
self.assertEqual(source.data_type, {})
3539

3640
def test_unknown_format(self):
3741
source = collection()
3842
source.save()
3943

44+
collection_file = CollectionFile(collection=source, filename="tests/fixtures/detect-format_object.json")
45+
4046
with self.assertRaisesRegex(UnknownFormatError, r"^top-level JSON value is a non-OCDS object$"):
41-
process_file(CollectionFile(collection=source, filename="tests/fixtures/detect-format_object.json"))
47+
set_data_type(source, collection_file)
4248

4349
self.assertEqual(source.data_type, {})
4450

51+
def test_file_not_found(self):
52+
source = collection()
53+
source.save()
4554

46-
class ProcessFileTests(TransactionTestCase):
47-
fixtures = ["tests/fixtures/complete_db.json"]
55+
collection_file = CollectionFile(collection=source, filename="tests/fixtures/detect-format_nonexistent.json")
4856

49-
def test_file_not_found(self):
5057
with self.assertRaises(FileNotFoundError) as e:
51-
collection_file = CollectionFile.objects.select_related("collection").get(pk=5)
52-
process_file(collection_file)
53-
self.assertEqual(str(e.exception), "[Errno 2] No such file or directory: 'ocds-px0z7d-10094-10001-1'")
58+
set_data_type(collection_file.collection, collection_file)
59+
60+
self.assertEqual(
61+
str(e.exception), "[Errno 2] No such file or directory: 'tests/fixtures/detect-format_nonexistent.json'"
62+
)
63+
64+
65+
class ProcessFileTests(TransactionTestCase):
66+
fixtures = ["tests/fixtures/complete_db.json"]
5467

5568
def test_happy_day(self):
5669
collection_file = CollectionFile.objects.get(pk=1)

0 commit comments

Comments
 (0)