Skip to content

Commit 4987aa5

Browse files
committed
Fix Redis fallback ack_deadline_ms handling by updating message IDs at receive time
1 parent 0f603e8 commit 4987aa5

File tree

1 file changed

+22
-19
lines changed

1 file changed

+22
-19
lines changed

omniqueue/src/backends/redis/fallback.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,29 +51,32 @@ async fn receive_with_timeout<R: RedisConnection>(
5151
consumer: &RedisConsumer<R>,
5252
timeout: Duration,
5353
) -> Result<Option<Delivery>> {
54-
let payload: Option<Vec<u8>> = consumer
55-
.redis
56-
.get()
57-
.await
58-
.map_err(QueueError::generic)?
59-
.brpoplpush(
60-
&consumer.queue_key,
61-
&consumer.processing_queue_key,
62-
// The documentation at https://redis.io/docs/latest/commands/brpoplpush/ does not
63-
// state what unit the timeout is, but `BLPOP` and `BLMPOP` have similar timeout
64-
// parameters that are documented as being seconds.
65-
timeout.as_secs_f64(),
66-
)
54+
let mut conn = consumer.redis.get().await.map_err(QueueError::generic)?;
55+
56+
let payload: Option<Vec<u8>> = conn
57+
.brpop(&consumer.queue_key, timeout.as_secs_f64())
6758
.await
6859
.map_err(QueueError::generic)?;
6960

7061
match payload {
71-
Some(old_payload) => Some(internal_to_delivery(
72-
internal_from_list(&old_payload)?.into(),
73-
consumer,
74-
old_payload,
75-
))
76-
.transpose(),
62+
Some(old_payload) => {
63+
// Creating a new payload with a new timestamp
64+
// this is done to avoid the message being re-enqueued
65+
// too early!
66+
let new_payload = internal_to_list_payload(internal_from_list(&old_payload)?.into());
67+
68+
let _: () = conn
69+
.lpush(&consumer.processing_queue_key, &new_payload)
70+
.await
71+
.map_err(QueueError::generic)?;
72+
73+
Some(internal_to_delivery(
74+
internal_from_list(&new_payload)?.into(),
75+
consumer,
76+
new_payload,
77+
))
78+
.transpose()
79+
}
7780
None => Ok(None),
7881
}
7982
}

0 commit comments

Comments
 (0)