Skip to content

Commit 1526cfb

Browse files
improved logging and job progress tracking
1 parent be69807 commit 1526cfb

File tree

4 files changed

+43
-52
lines changed

4 files changed

+43
-52
lines changed

ami/jobs/models.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -684,17 +684,8 @@ def run(cls, job: "Job"):
684684
job.started_at = datetime.datetime.now()
685685
job.finished_at = None
686686

687-
# Add tracking stage and save job
688-
job.progress.add_stage(name="Tracking", key="tracking")
689-
job.save()
690-
691687
perform_tracking(job)
692688

693-
job.progress.update_stage(
694-
"tracking",
695-
status=JobState.SUCCESS,
696-
progress=1,
697-
)
698689
job.update_status(JobState.SUCCESS)
699690
job.logger.info("Tracking job finished successfully.")
700691
job.finished_at = datetime.datetime.now()

ami/main/tests/test_tracking.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_tracking_exactly_reproduces_occurrences(self):
5757
Occurrence.objects.filter(event=self.event).delete()
5858

5959
# Run the tracking algorithm to regenerate occurrences
60-
assign_occurrences_by_tracking_images(self.source_images, logger)
60+
assign_occurrences_by_tracking_images(self.event, logger)
6161

6262
# Capture new tracking-generated occurrence groups
6363
new_groups = {

ami/ml/models/pipeline.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -885,10 +885,6 @@ def save_results(
885885
detections=detections,
886886
logger=job_logger,
887887
)
888-
# job_logger.info(f"Creating occurrences for {len(detections)} detections ")
889-
# check if every image in the sessions in processed
890-
891-
# assign_occurrences_by_tracking(detections=detections, logger=job_logger)
892888

893889
# Update precalculated counts on source images and events
894890
source_images = list(source_images)

ami/ml/tracking.py

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def assign_occurrences_from_detection_chains(source_images, logger):
123123
Walk detection chains across source images and assign a new occurrence to each chain.
124124
"""
125125
visited = set()
126-
126+
created_occurrences_count = 0
127127
for image in source_images:
128128
for det in image.detections.all():
129129
if det.id in visited or getattr(det, "previous_detection", None) is not None:
@@ -143,34 +143,39 @@ def assign_occurrences_from_detection_chains(source_images, logger):
143143
for occ_id in old_occurrences:
144144
try:
145145
Occurrence.objects.filter(id=occ_id).delete()
146-
logger.info(f"Deleted old occurrence {occ_id} before reassignment.")
146+
logger.debug(f"Deleted old occurrence {occ_id} before reassignment.")
147147
except Exception as e:
148-
logger.warning(f"Failed to delete occurrence {occ_id}: {e}")
148+
logger.info(f"Failed to delete occurrence {occ_id}: {e}")
149149

150150
occurrence = Occurrence.objects.create(
151151
event=chain[0].source_image.event,
152152
deployment=chain[0].source_image.deployment,
153153
project=chain[0].source_image.project,
154154
)
155+
created_occurrences_count += 1
155156

156157
for d in chain:
157158
d.occurrence = occurrence
158159
d.save()
159160

160161
occurrence.save()
161162

162-
logger.info(f"Assigned occurrence {occurrence.pk} to chain of {len(chain)} detections")
163+
logger.debug(f"Assigned occurrence {occurrence.pk} to chain of {len(chain)} detections")
164+
logger.info(
165+
f"Assigned {created_occurrences_count} occurrences from detection chains across {len(source_images)} images."
166+
)
163167

164168

165169
def assign_occurrences_by_tracking_images(
166-
source_images,
167-
logger,
168-
cost_threshold: float = TRACKING_COST_THRESHOLD,
170+
event, logger, cost_threshold: float = TRACKING_COST_THRESHOLD, job=None
169171
) -> None:
170172
"""
171173
Track detections across ordered source images and assign them to occurrences.
172174
"""
173-
logger.info(f"Starting occurrence tracking over {len(source_images)} images")
175+
from ami.jobs.models import JobState
176+
177+
source_images = event.captures.order_by("timestamp")
178+
logger.info(f"Found {len(source_images)} source images for event {event.pk}")
174179
if len(source_images) < 2:
175180
logger.info("Not enough images to perform tracking. At least 2 images are required.")
176181
return
@@ -181,13 +186,10 @@ def assign_occurrences_by_tracking_images(
181186
current_detections = list(current_image.detections.all())
182187
next_detections = list(next_image.detections.all())
183188

184-
logger.info(
185-
f"""Tracking: Processing image {i + 1}/{len(source_images)}:
186-
{len(current_detections)} -> {len(next_detections)} detections"""
187-
)
189+
logger.debug(f"""Tracking: Processing image {i + 1}/{len(source_images)}""")
188190
# Get the most common algorithm for the current event
189191
most_common_algorithm = get_most_common_algorithm_for_event(current_image.event)
190-
logger.info(
192+
logger.debug(
191193
f"""Using most common algorithm for event {current_image.event.pk}:
192194
{most_common_algorithm.name if most_common_algorithm else 'None'}"""
193195
)
@@ -201,8 +203,21 @@ def assign_occurrences_by_tracking_images(
201203
algorithm=most_common_algorithm,
202204
logger=logger,
203205
)
206+
if job:
207+
job.progress.update_stage(
208+
f"event_{event.pk}",
209+
status=JobState.STARTED,
210+
progress=(i + 1) / (len(source_images) - 1),
211+
)
212+
job.save()
204213

205214
assign_occurrences_from_detection_chains(source_images, logger)
215+
if job:
216+
job.progress.update_stage(
217+
f"event_{event.pk}",
218+
progress=1.0,
219+
)
220+
job.save()
206221

207222

208223
def pair_detections(
@@ -220,7 +235,7 @@ def pair_detections(
220235
221236
Only pairs with cost < threshold are considered.
222237
"""
223-
logger.info(f"Pairing {len(current_detections)} - >{len(next_detections)} detections")
238+
logger.debug(f"Pairing {len(current_detections)} - >{len(next_detections)} detections")
224239

225240
potential_matches = []
226241

@@ -258,16 +273,16 @@ def pair_detections(
258273
continue
259274
# check if next detection has a previous detection already assigned
260275
if getattr(next_det, "previous_detection", None) is not None:
261-
logger.info(f"{next_det.id} already has previous detection: {next_det.previous_detection.id}")
276+
logger.debug(f"{next_det.id} already has previous detection: {next_det.previous_detection.id}")
262277
previous_detection = getattr(next_det, "previous_detection", None)
263278
previous_detection.next_detection = None
264279
previous_detection.save()
265-
logger.info(f"Cleared previous detection {previous_detection.pk} -> {next_det.pk} link")
280+
logger.debug(f"Cleared previous detection {previous_detection.pk} -> {next_det.pk} link")
266281

267-
logger.info(f"Trying to link {det.id} => {next_det.id}")
282+
logger.debug(f"Trying to link {det.id} => {next_det.id}")
268283
det.next_detection = next_det
269284
det.save()
270-
logger.info(f"Linked detection {det.id} => {next_det.id} with cost {cost:.4f}")
285+
logger.debug(f"Linked detection {det.id} => {next_det.id} with cost {cost:.4f}")
271286

272287
assigned_current_ids.add(det.id)
273288
assigned_next_ids.add(next_det.id)
@@ -278,27 +293,25 @@ def perform_tracking(job):
278293
Perform detection tracking for all events in the job's source image collection.
279294
Runs tracking only if all images in an event have processed detections with features.
280295
"""
281-
from ami.jobs.models import JobState
282296

283297
cost_threshold = job.params.get("cost_threshold", TRACKING_COST_THRESHOLD)
284298
job.logger.info("Tracking started")
285299
job.logger.info(f"Using cost threshold: {cost_threshold}")
286-
job.progress.update_stage("tracking", status=JobState.STARTED, progress=0)
287-
job.save()
288-
job.logger.info("Progresss updated and job saved")
289300
collection = job.source_image_collection
290301
if not collection:
291-
job.logger.warning("Tracking: No source image collection found. Skipping tracking.")
302+
job.logger.info("Tracking: No source image collection found. Skipping tracking.")
292303
return
293304
job.logger.info("Tracking: Fetching events for collection %s", collection.pk)
294-
events_qs = Event.objects.filter(captures__collections=collection).distinct()
305+
events_qs = Event.objects.filter(captures__collections=collection).order_by("created_at").distinct()
295306
total_events = events_qs.count()
296307
events = events_qs.iterator()
297-
processed_events = 0
298308
job.logger.info("Tracking: Found %d events in collection %s", total_events, collection.pk)
299-
for event in events:
300-
job.logger.info("Tracking: Processing event %s", event.pk)
301-
source_images = event.captures.order_by("timestamp")
309+
for event in events_qs:
310+
job.progress.add_stage(name=f"Event {event.pk}", key=f"event_{event.pk}")
311+
job.save()
312+
for idx, event in enumerate(events, start=1):
313+
job.logger.info(f"Tracking: Processing event {idx}/{total_events} (Event ID: {event.pk})")
314+
302315
# Check if there are human identifications in the event
303316
if Occurrence.objects.filter(event=event, identifications__isnull=False).exists():
304317
job.logger.info(f"Tracking: Skipping tracking for event {event.pk}: human identifications present.")
@@ -311,16 +324,7 @@ def perform_tracking(job):
311324
continue
312325

313326
job.logger.info(f"Tracking: Running tracking for event {event.pk}")
314-
assign_occurrences_by_tracking_images(source_images, job.logger, cost_threshold=cost_threshold)
315-
processed_events += 1
316-
317-
job.progress.update_stage(
318-
"tracking",
319-
status=JobState.STARTED,
320-
progress=processed_events / total_events if total_events else 1,
321-
)
322-
job.save()
327+
assign_occurrences_by_tracking_images(event, job.logger, cost_threshold=cost_threshold, job=job)
323328

324329
job.logger.info("Tracking: Finished tracking.")
325-
job.progress.update_stage("tracking", status=JobState.SUCCESS, progress=1)
326330
job.save()

0 commit comments

Comments
 (0)