-
Notifications
You must be signed in to change notification settings - Fork 2.2k
queue: add new BackpressureQueue[T] variant #9838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are limited to specific labels. 🏷️ Labels to auto review (1)
Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Pull reviewers statsStats of the last 30 days for lnd:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a pass of this and made a bunch of comments. There seems to be a lot of LLM fluff (although I could be mistaken) that I don't like.
queue/back_pressure_test.go
Outdated
t, res.IsOk(), "Enqueue of 42 should not error: %v", res.Err(), | ||
) | ||
require.True(t, | ||
res.UnwrapOrFail(t), "Item 42 should have been dropped by "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I was confused, because this can never fail, since you checked res.IsOk()
above. But we are checking the success case of the result, which should be true
. Maybe consider res.UnwrapOr(false)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think can remove the above check, and just have it be UnwrapOrFail
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With Enqueue returning an error now this makes so much more sense. 👍
In this commit, we add a new type of queue: the back pressure queue. This is a bounded queue based on a simple channel, that will consult a predicate to decide if we should preemptively drop a message or not. We then provide a sample predicate for this use case, based on random early dropping. Given a min and max threshold, we'll start to drop message randomly once we get past the min threshold, ramping up to the max threshold where we'll start to always drop the message.
2e346d0
to
3511604
Compare
Pushed up a new version. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a second pass. Mainly nits (line length! 😄) and some clarifications from my side about the use of generics in the tests.
Suffice to say that I really like this. I already used it in #10219 so I would love to see this merged.
@yyforyongyu Could you have a go at this as well?
queue/back_pressure_test.go
Outdated
t, res.IsOk(), "Enqueue of 42 should not error: %v", res.Err(), | ||
) | ||
require.True(t, | ||
res.UnwrapOrFail(t), "Item 42 should have been dropped by "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With Enqueue returning an error now this makes so much more sense. 👍
} | ||
|
||
// intQueueMachine is a concrete wrapper for queueMachine[int] for rapid. | ||
type intQueueMachine struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant
type intQueueMachine struct {
tb rapid.TB
capacity int
queue *BackpressureQueue[int]
modelQueue []int
dropPredicate DropPredicate[int]
itemGenerator *rapid.Generator[int]
}
and take it from there.
} | ||
|
||
// intQueueMachine is a concrete wrapper for queueMachine[int] for rapid. | ||
type intQueueMachine struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what copilot came up with. Github doesn't allow me to upload .diff
files, hence the .txt
extension. no-generics.txt
|
||
// queueMachine is the generic state machine logic for testing | ||
// BackpressureQueue. T must be comparable for use in assertions. | ||
type queueMachine[T comparable] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: line length
Will stop making these comments to not swamp the review with it, but there are several places where the max line length is exceeded.
// | ||
// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, | ||
// and gives early back-pressure signals to senders. | ||
func RandomEarlyDrop[T any](minThreshold, maxThreshold int, opts ...REDOption) DropPredicate[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: line length
@yyforyongyu: review reminder |
In this commit, we add a new type of queue: the back pressure queue. This is a bounded queue based on a simple channel, that will consult a predicate to decide if we should preemptively drop a message or not.
We then provide a sample predicate for this use case, based on random early dropping. Given a min and max threshold, we'll start to drop message randomly once we get past the min threshold, ramping up to the max threshold where we'll start to always drop the message.