1
1
package controllerworkqueue
2
2
3
3
import (
4
- "sort"
5
4
"sync"
6
5
"sync/atomic"
7
6
"time"
8
7
8
+ "github.com/google/btree"
9
9
"k8s.io/apimachinery/pkg/util/sets"
10
10
"k8s.io/client-go/util/workqueue"
11
11
"k8s.io/utils/clock"
@@ -58,7 +58,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
58
58
59
59
cwq := & controllerworkqueue [T ]{
60
60
items : map [T ]* item [T ]{},
61
- queue : queue [T ]{} ,
61
+ queue : btree . NewG ( 32 , less [T ]) ,
62
62
tryPush : make (chan struct {}, 1 ),
63
63
rateLimiter : opts .RateLimiter ,
64
64
locked : sets.Set [T ]{},
@@ -77,12 +77,16 @@ type controllerworkqueue[T comparable] struct {
77
77
// lock has to be acquired for any access to either items or queue
78
78
lock sync.Mutex
79
79
items map [T ]* item [T ]
80
- queue queue [ T ]
80
+ queue * btree. BTreeG [ * item [ T ] ]
81
81
82
82
tryPush chan struct {}
83
83
84
84
rateLimiter workqueue.TypedRateLimiter [T ]
85
85
86
+ // addedCounter is a counter of elements added, we need it
87
+ // because unixNano is not guaranteed to be unique.
88
+ addedCounter uint64
89
+
86
90
// locked contains the keys we handed out through Get() and that haven't
87
91
// yet been returned through Done().
88
92
locked sets.Set [T ]
@@ -106,7 +110,6 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
106
110
w .lock .Lock ()
107
111
defer w .lock .Unlock ()
108
112
109
- var hadChanges bool
110
113
for _ , key := range items {
111
114
if o .RateLimited {
112
115
after := w .rateLimiter .When (key )
@@ -121,30 +124,30 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
121
124
}
122
125
if _ , ok := w .items [key ]; ! ok {
123
126
item := & item [T ]{
124
- key : key ,
125
- priority : o .Priority ,
126
- readyAt : readyAt ,
127
+ key : key ,
128
+ addedAtUnixNano : w .now ().UnixNano (),
129
+ addedCounter : w .addedCounter ,
130
+ priority : o .Priority ,
131
+ readyAt : readyAt ,
127
132
}
128
133
w .items [key ] = item
129
- w .queue = append ( w . queue , item )
130
- hadChanges = true
134
+ w .queue . ReplaceOrInsert ( item )
135
+ w . addedCounter ++
131
136
continue
132
137
}
133
138
134
- if o .Priority > w .items [key ].priority {
135
- w .items [key ].priority = o .Priority
136
- hadChanges = true
139
+ // The b-tree de-duplicates based on ordering and any change here
140
+ // will affect the order - Just delete and re-add.
141
+ item , _ := w .queue .Delete (w .items [key ])
142
+ if o .Priority > item .priority {
143
+ item .priority = o .Priority
137
144
}
138
145
139
- if w .items [key ].readyAt != nil && (readyAt == nil || readyAt .Before (* w .items [key ].readyAt )) {
140
- w .items [key ].readyAt = readyAt
141
- hadChanges = true
146
+ if item .readyAt != nil && (readyAt == nil || readyAt .Before (* item .readyAt )) {
147
+ item .readyAt = readyAt
142
148
}
143
- }
144
149
145
- if hadChanges {
146
- sort .Stable (w .queue )
147
- w .doTryPush ()
150
+ w .queue .ReplaceOrInsert (item )
148
151
}
149
152
}
150
153
@@ -176,42 +179,30 @@ func (w *controllerworkqueue[T]) spin() {
176
179
w .lockedLock .Lock ()
177
180
defer w .lockedLock .Unlock ()
178
181
179
- // toRemove is a list of indexes to remove from the queue.
180
- // We can not do it in-place as we would be manipulating the
181
- // slice we are iterating over. We have to do it backwards, as
182
- // otherwise the indexes become invalid.
183
- var toRemove []int
184
- defer func () {
185
- for i := len (toRemove ) - 1 ; i >= 0 ; i -- {
186
- idxToRemove := toRemove [i ]
187
- if idxToRemove == len (w .queue )- 1 {
188
- w .queue = w .queue [:idxToRemove ]
189
- } else {
190
- w .queue = append (w .queue [:idxToRemove ], w .queue [idxToRemove + 1 :]... )
191
- }
192
- }
193
- }()
194
- for idx , item := range w .queue {
182
+ w .queue .Ascend (func (item * item [T ]) bool {
195
183
if w .waiters .Load () == 0 { // no waiters, return as we can not hand anything out anyways
196
- return
184
+ return false
197
185
}
186
+
198
187
// No next element we can process
199
- if w . queue [ 0 ]. readyAt != nil && w . queue [ 0 ] .readyAt .After (w .now ()) {
200
- nextReady = w .tick (w . queue [ 0 ] .readyAt .Sub (w .now ()))
201
- return
188
+ if item . readyAt != nil && item .readyAt .After (w .now ()) {
189
+ nextReady = w .tick (item .readyAt .Sub (w .now ()))
190
+ return false
202
191
}
203
192
204
193
// Item is locked, we can not hand it out
205
194
if w .locked .Has (item .key ) {
206
- continue
195
+ return true
207
196
}
208
197
209
198
w .get <- * item
210
199
w .locked .Insert (item .key )
211
- delete (w .items , item .key )
212
200
w .waiters .Add (- 1 )
213
- toRemove = append (toRemove , idx )
214
- }
201
+ delete (w .items , item .key )
202
+ w .queue .Delete (item )
203
+
204
+ return true
205
+ })
215
206
}()
216
207
}
217
208
}
@@ -274,37 +265,36 @@ func (w *controllerworkqueue[T]) Len() int {
274
265
w .lock .Lock ()
275
266
defer w .lock .Unlock ()
276
267
277
- return len ( w .queue )
268
+ return w .queue . Len ( )
278
269
}
279
270
280
- // queue is the actual queue. It implements heap.Interface.
281
- type queue [T comparable ] []* item [T ]
282
-
283
- func (q queue [T ]) Len () int {
284
- return len (q )
285
- }
286
-
287
- func (q queue [T ]) Less (i , j int ) bool {
288
- switch {
289
- case q [i ].readyAt == nil && q [j ].readyAt != nil :
271
+ func less [T comparable ](a , b * item [T ]) bool {
272
+ if a .readyAt == nil && b .readyAt != nil {
290
273
return true
291
- case q [i ].readyAt != nil && q [j ].readyAt == nil :
274
+ }
275
+ if a .readyAt != nil && b .readyAt == nil {
292
276
return false
293
- case q [i ].readyAt != nil && q [j ].readyAt != nil :
294
- return q [i ].readyAt .Before (* q [j ].readyAt )
277
+ }
278
+ if a .readyAt != nil && b .readyAt != nil && ! a .readyAt .Equal (* b .readyAt ) {
279
+ return a .readyAt .Before (* b .readyAt )
280
+ }
281
+ if a .priority != b .priority {
282
+ return a .priority > b .priority
295
283
}
296
284
297
- return q [i ].priority > q [j ].priority
298
- }
285
+ if a .addedAtUnixNano != b .addedAtUnixNano {
286
+ return a .addedAtUnixNano < b .addedAtUnixNano
287
+ }
299
288
300
- func (q queue [T ]) Swap (i , j int ) {
301
- q [i ], q [j ] = q [j ], q [i ]
289
+ return a .addedCounter < b .addedCounter
302
290
}
303
291
304
292
type item [T comparable ] struct {
305
- key T
306
- priority int
307
- readyAt * time.Time
293
+ key T
294
+ addedAtUnixNano int64
295
+ addedCounter uint64
296
+ priority int
297
+ readyAt * time.Time
308
298
}
309
299
310
300
func wrapWithMetrics [T comparable ](q * controllerworkqueue [T ], name string , provider workqueue.MetricsProvider ) PriorityQueue [T ] {
0 commit comments