Skip to content

Commit 9a0e533

Browse files
committed
Fix Celery task result Redis connection errors (v2.5.53)
Add safe_task_ready() and safe_task_get() helper functions with retry logic to handle Redis connection errors in Celery's internal backend. The previous fix (v2.5.51) addressed application-level Redis connections but Celery's task.ready() and task.get() methods use their own internal Redis connection which can still fail with connection reset errors. This fix wraps those Celery methods with retry logic and graceful error handling to prevent integrity scan crashes.
1 parent dd048ec commit 9a0e533

File tree

3 files changed

+86
-5
lines changed

3 files changed

+86
-5
lines changed

CHANGELOG.MD

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0).
77

8+
## [2.5.53] - 2026-01-01
9+
10+
### Bug Fixes
11+
12+
- **Fix Celery task result Redis connection errors**: Integrity scan crashed when checking task results
13+
- Root cause: Celery's `task.ready()` and `task.get()` use their own internal Redis connection, separate from our fixed Redis client
14+
- When Celery's Redis connection gets reset, task result checking fails with `ConnectionResetError`
15+
- Error message: `'Connection' object has no attribute 'register_connect_callback'`
16+
- **Solution**: Added wrapper functions with retry logic for Celery task result operations
17+
- `safe_task_ready()`: Safely check task status with 3 retries and 0.5s delay
18+
- `safe_task_get()`: Safely get task result with 3 retries and 0.5s delay
19+
- Both functions catch `redis.ConnectionError`, `redis.TimeoutError`, `ConnectionResetError`, and `AttributeError`
20+
- On transient failure, retries before giving up
21+
- Files affected: `pixelprobe/services/maintenance_service.py`
22+
23+
---
24+
825
## [2.5.52] - 2026-01-01
926

1027
### Bug Fixes

pixelprobe/services/maintenance_service.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,70 @@
1717

1818
logger = logging.getLogger(__name__)
1919

20+
21+
def safe_task_ready(task, max_retries=3, retry_delay=0.5):
22+
"""
23+
Safely check if a Celery task is ready with retry logic for Redis connection errors.
24+
25+
Celery's task.ready() uses its internal Redis connection which can get reset.
26+
This wrapper adds retry logic to handle transient connection failures.
27+
28+
Args:
29+
task: Celery AsyncResult object
30+
max_retries: Number of retry attempts
31+
retry_delay: Delay between retries in seconds
32+
33+
Returns:
34+
bool: True if task is ready, False if not ready or on persistent error
35+
"""
36+
import redis
37+
38+
for attempt in range(max_retries):
39+
try:
40+
return task.ready()
41+
except (redis.ConnectionError, redis.TimeoutError, ConnectionResetError, AttributeError) as e:
42+
if attempt < max_retries - 1:
43+
logger.warning(f"Redis connection error checking task status (attempt {attempt + 1}/{max_retries}): {e}")
44+
time.sleep(retry_delay)
45+
else:
46+
logger.error(f"Failed to check task status after {max_retries} attempts: {e}")
47+
# Return False to keep task in active list and retry later
48+
return False
49+
except Exception as e:
50+
logger.error(f"Unexpected error checking task status: {e}")
51+
return False
52+
return False
53+
54+
55+
def safe_task_get(task, timeout=1, max_retries=3, retry_delay=0.5):
56+
"""
57+
Safely get a Celery task result with retry logic for Redis connection errors.
58+
59+
Args:
60+
task: Celery AsyncResult object
61+
timeout: Timeout for getting the result
62+
max_retries: Number of retry attempts
63+
retry_delay: Delay between retries in seconds
64+
65+
Returns:
66+
Result dict or None on error
67+
"""
68+
import redis
69+
70+
for attempt in range(max_retries):
71+
try:
72+
return task.get(timeout=timeout)
73+
except (redis.ConnectionError, redis.TimeoutError, ConnectionResetError, AttributeError) as e:
74+
if attempt < max_retries - 1:
75+
logger.warning(f"Redis connection error getting task result (attempt {attempt + 1}/{max_retries}): {e}")
76+
time.sleep(retry_delay)
77+
else:
78+
logger.error(f"Failed to get task result after {max_retries} attempts: {e}")
79+
raise
80+
except Exception:
81+
raise
82+
return None
83+
2084
class MaintenanceService:
2185
"""Service for maintenance operations like cleanup and file monitoring"""
2286

@@ -326,9 +390,9 @@ def _run_cleanup(self, cleanup_id, file_paths=None, schedule_id=None):
326390
# Collect completed tasks and free up slots
327391
still_active = []
328392
for task in active_tasks:
329-
if task.ready():
393+
if safe_task_ready(task):
330394
try:
331-
check_result = task.get(timeout=1)
395+
check_result = safe_task_get(task, timeout=1)
332396
total_files_processed += 1
333397

334398
if not check_result.get('exists'):
@@ -717,9 +781,9 @@ def _run_file_changes_check(self, check_id: str, file_paths=None, schedule_id=No
717781
still_active = []
718782
for task_info in active_tasks:
719783
task, file_size = task_info['task'], task_info['size']
720-
if task.ready():
784+
if safe_task_ready(task):
721785
try:
722-
result = task.get(timeout=1)
786+
result = safe_task_get(task, timeout=1)
723787
total_files_processed += 1
724788

725789
# Update last integrity check timestamp for this file

version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# Default version - this is the single source of truth
55

66

7-
_DEFAULT_VERSION = '2.5.52'
7+
_DEFAULT_VERSION = '2.5.53'
88

99

1010
# Allow override via environment variable for CI/CD, but default to the hardcoded version

0 commit comments

Comments
 (0)