From 447e183712db0b8d5d068f79f7093335c8721aec Mon Sep 17 00:00:00 2001 From: Soroushsrd Date: Fri, 9 May 2025 03:36:34 +0330 Subject: [PATCH] Fix Redis fallback ack_deadline_ms handling by updating message IDs at receive time --- omniqueue/src/backends/redis/fallback.rs | 41 +++++++++++++----------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index 2239ada..57dc6bb 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -51,29 +51,32 @@ async fn receive_with_timeout( consumer: &RedisConsumer, timeout: Duration, ) -> Result> { - let payload: Option> = consumer - .redis - .get() - .await - .map_err(QueueError::generic)? - .brpoplpush( - &consumer.queue_key, - &consumer.processing_queue_key, - // The documentation at https://redis.io/docs/latest/commands/brpoplpush/ does not - // state what unit the timeout is, but `BLPOP` and `BLMPOP` have similar timeout - // parameters that are documented as being seconds. - timeout.as_secs_f64(), - ) + let mut conn = consumer.redis.get().await.map_err(QueueError::generic)?; + + let payload: Option> = conn + .brpop(&consumer.queue_key, timeout.as_secs_f64()) .await .map_err(QueueError::generic)?; match payload { - Some(old_payload) => Some(internal_to_delivery( - internal_from_list(&old_payload)?.into(), - consumer, - old_payload, - )) - .transpose(), + Some(old_payload) => { + // Creating a new payload with a new timestamp + // this is done to avoid the message being re-enqueued + // too early! + let new_payload = internal_to_list_payload(internal_from_list(&old_payload)?); + + let _: () = conn + .lpush(&consumer.processing_queue_key, &new_payload) + .await + .map_err(QueueError::generic)?; + + Some(internal_to_delivery( + internal_from_list(&new_payload)?.into(), + consumer, + new_payload, + )) + .transpose() + } None => Ok(None), } }