-
Notifications
You must be signed in to change notification settings - Fork 25
Open
Description
Once in a while the container running the Taskiq workers will show no activity - but tasks are definitely being posted (I have enevything instrumented with New Relic). I am forced to restart the workers container when I get the New Relic alerts.
With help from Perplexity AI, I had to add a lot of socket timeout parameters:
# Create the result backend
redis_async_result = RedisAsyncResultBackend(
redis_url=REDIS_URL_FOR_TASKIQ_RESULTS,
# DEV-3738 Perplexity/Claude recommended adding the next chunk, after seeing ChatGPT's suggestion:
# NEW: Connection resilience parameters (passed to BlockingConnectionPool)
socket_connect_timeout=5, # Fast failure on connection attempts, forcing quick reconnects instead of hanging
retry_on_error=[ConnectionError, TimeoutError], # Automatically retries operations that fail due to stale connections
timeout=None, # Makes the pool wait for available connections instead of creating new ones during spikes (prevents thundering herd)
# DEV-3738. ChatGPT recommended:
socket_keepalive=True, # keep TCP alive
socket_timeout=5, # fail faster if socket drops
health_check_interval=23, # Regular validation of idle connections to catch stale ones before they cause hangs
retry_on_timeout=True, # auto-retry
# We decided to put a "garbage collector"
result_ex_time=60 * 10, # Max time in seconds for the result to be consumed, before auto-expiry.
max_connection_pool_size=_REDIS_POOL_SIZE # TRICKY: if you do not pass, it assumes 2ˆ32 and reset() loops/creates this many instances in the pool. Bad impl.
)
broker = ListQueueBroker(
url=REDIS_URL_FOR_TASKIQ,
max_connection_pool_size= _REDIS_POOL_SIZE, # TRICKY: if you do not pass, it assumes 2ˆ32 and reset() loops/creates this many instances in the pool. Bad impl.
# DEV-3738 Perplexity/Claude recommended adding the next chunk,
socket_keepalive=True,
socket_timeout=TASKIQ_SOCKET_TIMEOUT_GETTING_TASK, # WARNING: if you use this with a low value, the main thread listening for tasks will raise a Timeout exception and print to the console.
socket_connect_timeout=5, # Fast failure on connection attempts, forcing quick reconnects instead of hanging
health_check_interval=23, # Regular validation of idle connections to catch stale ones before they cause hangs
retry_on_timeout=True,
retry_on_error=[ConnectionError, TimeoutError], # Automatically retries operations that fail due to stale connections
timeout=None, # Makes the pool wait for available connections instead of creating new ones during spikes (prevents thundering herd)
).with_result_backend(redis_async_result)
# DEV-3759 - allows the usage of @broker.task(retry_on_error=True, max_retries=10, delay=5)
broker = broker.with_middlewares(
SmartRetryMiddleware(default_retry_count=5,
default_delay=5,
use_jitter=True,
use_delay_exponent=True)
)
The trick seems to be this one in particular: socket_timeout=TASKIQ_SOCKET_TIMEOUT_GETTING_TASK
This makes sure if "artificial idleness" is caused, the exception is raised and the process gets restarted - no need to restart the container.
I was wondering if there is a "more correct" approach to work with brpop
and still be resilient to these "blips" in Redis/Azure.
gustavoandretti-code
Metadata
Metadata
Assignees
Labels
No labels