Skip to content

Commit 16a746b

Browse files
committed
Add fuzztest and fix bug it found
1 parent 3d9b997 commit 16a746b

File tree

2 files changed

+144
-17
lines changed

2 files changed

+144
-17
lines changed

pkg/controllerworkqueue/workqueue.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,19 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5757
}
5858

5959
cwq := &controllerworkqueue[T]{
60-
items: map[T]*item[T]{},
61-
queue: btree.NewG(32, less[T]),
62-
tryPush: make(chan struct{}, 1),
63-
rateLimiter: opts.RateLimiter,
64-
locked: sets.Set[T]{},
65-
done: make(chan struct{}),
66-
get: make(chan item[T]),
67-
now: time.Now,
68-
tick: time.Tick,
60+
items: map[T]*item[T]{},
61+
queue: btree.NewG(32, less[T]),
62+
// itemOrWaiterAdded indicates that an item or
63+
// waiter was added. It must be buffered, because
64+
// if we currently process items we can't tell
65+
// if that included the new item/waiter.
66+
itemOrWaiterAdded: make(chan struct{}, 1),
67+
rateLimiter: opts.RateLimiter,
68+
locked: sets.Set[T]{},
69+
done: make(chan struct{}),
70+
get: make(chan item[T]),
71+
now: time.Now,
72+
tick: time.Tick,
6973
}
7074

7175
go cwq.spin()
@@ -79,7 +83,7 @@ type controllerworkqueue[T comparable] struct {
7983
items map[T]*item[T]
8084
queue *btree.BTreeG[*item[T]]
8185

82-
tryPush chan struct{}
86+
itemOrWaiterAdded chan struct{}
8387

8488
rateLimiter workqueue.TypedRateLimiter[T]
8589

@@ -119,7 +123,7 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
119123
}
120124

121125
var readyAt *time.Time
122-
if o.After != 0 {
126+
if o.After > 0 {
123127
readyAt = ptr.To(w.now().Add(o.After))
124128
}
125129
if _, ok := w.items[key]; !ok {
@@ -151,9 +155,9 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
151155
}
152156
}
153157

154-
func (w *controllerworkqueue[T]) doTryPush() {
158+
func (w *controllerworkqueue[T]) notifyItemOrWaiterAdded() {
155159
select {
156-
case w.tryPush <- struct{}{}:
160+
case w.itemOrWaiterAdded <- struct{}{}:
157161
default:
158162
}
159163
}
@@ -162,11 +166,12 @@ func (w *controllerworkqueue[T]) spin() {
162166
blockForever := make(chan time.Time)
163167
var nextReady <-chan time.Time
164168
nextReady = blockForever
169+
165170
for {
166171
select {
167172
case <-w.done:
168173
return
169-
case <-w.tryPush:
174+
case <-w.itemOrWaiterAdded:
170175
case <-nextReady:
171176
}
172177

@@ -186,7 +191,11 @@ func (w *controllerworkqueue[T]) spin() {
186191

187192
// No next element we can process
188193
if item.readyAt != nil && item.readyAt.After(w.now()) {
189-
nextReady = w.tick(item.readyAt.Sub(w.now()))
194+
readyAt := item.readyAt.Sub(w.now())
195+
if readyAt <= 0 { // Toctou race with the above check
196+
readyAt = 1
197+
}
198+
nextReady = w.tick(readyAt)
190199
return false
191200
}
192201

@@ -222,7 +231,7 @@ func (w *controllerworkqueue[T]) AddRateLimited(item T) {
222231
func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) {
223232
w.waiters.Add(1)
224233

225-
w.doTryPush()
234+
w.notifyItemOrWaiterAdded()
226235
item := <-w.get
227236

228237
return item.key, item.priority, w.shutdown.Load()
@@ -249,7 +258,7 @@ func (w *controllerworkqueue[T]) Done(item T) {
249258
w.lockedLock.Lock()
250259
defer w.lockedLock.Unlock()
251260
w.locked.Delete(item)
252-
w.doTryPush()
261+
w.notifyItemOrWaiterAdded()
253262
}
254263

255264
func (w *controllerworkqueue[T]) ShutDown() {

pkg/controllerworkqueue/workqueue_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"testing"
66
"time"
77

8+
fuzz "github.com/google/gofuzz"
89
. "github.com/onsi/ginkgo/v2"
910
. "github.com/onsi/gomega"
11+
"k8s.io/apimachinery/pkg/util/sets"
1012
)
1113

1214
var _ = Describe("Controllerworkqueue", func() {
@@ -254,6 +256,122 @@ func BenchmarkAddGetDone(b *testing.B) {
254256
}
255257
}
256258

259+
// TestFuzzPrioriorityQueue validates a set of basic
260+
// invariants that should always be true:
261+
//
262+
// - The queue is threadsafe when multiple producers and consumers
263+
// are involved
264+
// - There are no deadlocks
265+
// - An item is never handed out again before it is returned
266+
// - Items in the queue are de-duplicated
267+
// - max(existing priority, new priority) is used
268+
func TestFuzzPrioriorityQueue(t *testing.T) {
269+
t.Parallel()
270+
271+
seed := time.Now().UnixNano()
272+
t.Logf("seed: %d", seed)
273+
f := fuzz.NewWithSeed(seed)
274+
fuzzLock := sync.Mutex{}
275+
fuzz := func(in any) {
276+
fuzzLock.Lock()
277+
defer fuzzLock.Unlock()
278+
279+
f.Fuzz(in)
280+
}
281+
282+
inQueue := map[string]int{}
283+
inQueueLock := sync.Mutex{}
284+
285+
handedOut := sets.Set[string]{}
286+
handedOutLock := sync.Mutex{}
287+
288+
wg := sync.WaitGroup{}
289+
q, _ := newQueue()
290+
291+
for range 10 {
292+
wg.Add(1)
293+
go func() {
294+
defer wg.Done()
295+
296+
for range 1000 {
297+
opts, item := AddOpts{}, ""
298+
299+
fuzz(&opts)
300+
fuzz(&item)
301+
302+
if opts.After > 100*time.Millisecond {
303+
opts.After = 10 * time.Millisecond
304+
}
305+
opts.RateLimited = false
306+
307+
func() {
308+
inQueueLock.Lock()
309+
defer inQueueLock.Unlock()
310+
311+
q.AddWithOpts(opts, item)
312+
if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority {
313+
inQueue[item] = opts.Priority
314+
}
315+
}()
316+
}
317+
}()
318+
319+
}
320+
for range 100 {
321+
wg.Add(1)
322+
323+
go func() {
324+
defer wg.Done()
325+
326+
for {
327+
item, cont := func() (string, bool) {
328+
inQueueLock.Lock()
329+
defer inQueueLock.Unlock()
330+
331+
if len(inQueue) == 0 {
332+
return "", false
333+
}
334+
335+
item, priority, _ := q.GetWithPriority()
336+
if expected := inQueue[item]; expected != priority {
337+
t.Errorf("got priority %d, expected %d", priority, expected)
338+
}
339+
delete(inQueue, item)
340+
return item, true
341+
}()
342+
343+
if !cont {
344+
return
345+
}
346+
347+
func() {
348+
handedOutLock.Lock()
349+
defer handedOutLock.Unlock()
350+
351+
if handedOut.Has(item) {
352+
t.Errorf("item %s got handed out more than once", item)
353+
}
354+
handedOut.Insert(item)
355+
}()
356+
357+
func() {
358+
handedOutLock.Lock()
359+
defer handedOutLock.Unlock()
360+
361+
handedOut.Delete(item)
362+
q.Done(item)
363+
}()
364+
}
365+
}()
366+
}
367+
368+
wg.Wait()
369+
370+
if expected := len(inQueue); expected != q.Len() {
371+
t.Errorf("Expected queue length to be %d, was %d", expected, q.Len())
372+
}
373+
}
374+
257375
func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
258376
metrics := newFakeMetricsProvider()
259377
q := New("test", func(o *Opts[string]) {

0 commit comments

Comments
 (0)