Prefetch Peek #1840
Replies: 2 comments 4 replies
-
@mmcnabb-vms no other RabbitMQ client uses magic values such as "MM1" or "MM0" to communicate something to the consumer implementation. This should be a new consumer implementation that handles batching however you see fit. In the Java client of the past, and possibly this client, there was a You can implement something similar tailored for your needs, in your own codebase. Note that |
Beta Was this translation helpful? Give feedback.
-
Yep, this is exactly what you should do. As soon as we add |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I'd like some feedback on a feature I implemented locally. I haven't just gone ahead with a PR yet as it potentially incurs a performance penalty and overwrites other metadata that I didn't need for my use case so would be interested in some discussion or other suggestions.
The feature is "Prefetch Peek", i.e. when a message is delivered via an AsyncConsumer, the metadata will also include a flag to report whether or not further messages are waiting or not.
My consumer application processes and forwards messages upstream in batches of up to 100, however if there is not a full batch of data available I don't want it to potentially sit there for hours with unprocessed messages waiting for a batch to fill. It makes the consumer code much simpler to just have this information available when handling a message; otherwise I have to have a second "idle sweep" thread that looks for partial batches and flushes them, and guard against race conditions and so on.
My code change is:
and in my application's handler I will dispatch a partial batch if and only if "MM1" was not received in the ConsumerTag.
It seems to work well in practice. Of course there are often "false negatives", i.e. it can report no messages waiting when a message was fetched but is microseconds away from being ready; but that's not an issue for my use case. It's no big deal if several small batches are sent, but it is a big deal if messages grow stale.
I am ACKing messages using MultiAck after upstream processing of the batch returns, so that if my application crashes or is killed, then messages will be redelivered later when it is restarted.
Beta Was this translation helpful? Give feedback.
All reactions