Skip to content

Commit 2ed6574

Browse files
committed
:sparkling: POC of a priority queue
This change contains the POC of a priority workqueue that allows to prioritize events over one another. It is opt-in and will by default de-prioritize events originating from the initial listwatch and from periodic resyncs.
1 parent c1331a5 commit 2ed6574

File tree

13 files changed

+1184
-109
lines changed

13 files changed

+1184
-109
lines changed

.golangci.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ issues:
165165
- linters:
166166
- dupl
167167
path: _test\.go
168+
- linters:
169+
- revive
170+
path: .*/internal/.*
171+
- linters:
172+
- unused
173+
# Seems to incorrectly trigger on the two implementations that are only
174+
# used through an interface and not directly..?
175+
path: pkg/controllerworkqueue/metrics\.go
168176

169177
run:
170178
go: "1.23"

examples/priorityqueue/main.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"time"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
"sigs.k8s.io/controller-runtime/pkg/builder"
27+
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
28+
"sigs.k8s.io/controller-runtime/pkg/config"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
30+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
31+
"sigs.k8s.io/controller-runtime/pkg/manager"
32+
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
33+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
34+
)
35+
36+
func init() {
37+
}
38+
39+
func main() {
40+
if err := run(); err != nil {
41+
fmt.Fprintf(os.Stderr, "%v\n", err)
42+
os.Exit(1)
43+
}
44+
}
45+
46+
func run() error {
47+
log.SetLogger(zap.New())
48+
49+
// Setup a Manager
50+
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
51+
Controller: config.Controller{UsePriorityQueue: true},
52+
})
53+
if err != nil {
54+
return fmt.Errorf("failed to set up controller-manager: %w", err)
55+
}
56+
57+
if err := builder.ControllerManagedBy(mgr).
58+
For(&corev1.ConfigMap{}).
59+
Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
60+
log.FromContext(ctx).Info("Reconciling")
61+
time.Sleep(10 * time.Second)
62+
63+
return reconcile.Result{}, nil
64+
})); err != nil {
65+
return fmt.Errorf("failed to set up controller: %w", err)
66+
}
67+
68+
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
69+
return fmt.Errorf("failed to start manager: %w", err)
70+
}
71+
72+
return nil
73+
}

pkg/builder/controller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches(
163163
) *TypedBuilder[request] {
164164
input := WatchesInput[request]{
165165
obj: object,
166-
handler: eventHandler,
166+
handler: handler.WithLowPriorityWhenUnchanged(eventHandler),
167167
}
168168
for _, opt := range opts {
169169
opt.ApplyToWatches(&input)
@@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error {
317317
}
318318

319319
var hdler handler.TypedEventHandler[client.Object, request]
320-
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
320+
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})))
321321
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
322322
allPredicates = append(allPredicates, blder.forInput.predicates...)
323323
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
@@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error {
341341
}
342342

343343
var hdler handler.TypedEventHandler[client.Object, request]
344-
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
344+
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner(
345345
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
346346
blder.forInput.object,
347347
opts...,
348-
)))
348+
))))
349349
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
350350
allPredicates = append(allPredicates, own.predicates...)
351351
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)

pkg/config/controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,8 @@ type Controller struct {
5353
// NeedLeaderElection indicates whether the controller needs to use leader election.
5454
// Defaults to true, which means the controller will use leader election.
5555
NeedLeaderElection *bool
56+
57+
// UsePriorityQueue is experimental and configures if controllers that do not have a
58+
// NewQueue() configured should default to the priority queue.
59+
UsePriorityQueue bool
5660
}

pkg/controller/controller.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28+
"sigs.k8s.io/controller-runtime/pkg/controllerworkqueue"
2829
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
2930
"sigs.k8s.io/controller-runtime/pkg/manager"
3031
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -193,10 +194,16 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
193194
}
194195

195196
if options.NewQueue == nil {
196-
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
197-
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
198-
Name: controllerName,
199-
})
197+
if mgr.GetControllerOptions().UsePriorityQueue {
198+
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
199+
return controllerworkqueue.New[request](controllerName)
200+
}
201+
} else {
202+
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
203+
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
204+
Name: controllerName,
205+
})
206+
}
200207
}
201208
}
202209

pkg/controllerworkqueue/metrics.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package controllerworkqueue
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"k8s.io/apimachinery/pkg/util/sets"
8+
"k8s.io/client-go/util/workqueue"
9+
"k8s.io/utils/clock"
10+
)
11+
12+
// This file is mostly a copy of unexported code from
13+
// https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go
14+
15+
type queueMetrics[T comparable] interface {
16+
add(item T)
17+
get(item T)
18+
done(item T)
19+
updateUnfinishedWork()
20+
}
21+
22+
func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, clock clock.Clock) queueMetrics[T] {
23+
if len(name) == 0 {
24+
return noMetrics[T]{}
25+
}
26+
return &defaultQueueMetrics[T]{
27+
clock: clock,
28+
depth: mp.NewDepthMetric(name),
29+
adds: mp.NewAddsMetric(name),
30+
latency: mp.NewLatencyMetric(name),
31+
workDuration: mp.NewWorkDurationMetric(name),
32+
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
33+
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
34+
added: sets.Set[T]{},
35+
addTimes: map[T]time.Time{},
36+
processingStartTimes: map[T]time.Time{},
37+
}
38+
}
39+
40+
// defaultQueueMetrics expects the caller to lock before setting any metrics.
41+
type defaultQueueMetrics[T comparable] struct {
42+
clock clock.Clock
43+
44+
// current depth of a workqueue
45+
depth workqueue.GaugeMetric
46+
// total number of adds handled by a workqueue
47+
adds workqueue.CounterMetric
48+
// how long an item stays in a workqueue
49+
latency workqueue.HistogramMetric
50+
// how long processing an item from a workqueue takes
51+
workDuration workqueue.HistogramMetric
52+
53+
mapLock sync.RWMutex
54+
added sets.Set[T]
55+
addTimes map[T]time.Time
56+
processingStartTimes map[T]time.Time
57+
58+
// how long have current threads been working?
59+
unfinishedWorkSeconds workqueue.SettableGaugeMetric
60+
longestRunningProcessor workqueue.SettableGaugeMetric
61+
}
62+
63+
func (m *defaultQueueMetrics[T]) add(item T) {
64+
if m == nil {
65+
return
66+
}
67+
68+
m.adds.Inc()
69+
70+
m.mapLock.Lock()
71+
defer m.mapLock.Unlock()
72+
if !m.added.Has(item) {
73+
m.added.Insert(item)
74+
m.depth.Inc()
75+
}
76+
if _, exists := m.addTimes[item]; !exists {
77+
m.addTimes[item] = m.clock.Now()
78+
}
79+
}
80+
81+
func (m *defaultQueueMetrics[T]) get(item T) {
82+
if m == nil {
83+
return
84+
}
85+
86+
m.mapLock.Lock()
87+
defer m.mapLock.Unlock()
88+
89+
m.depth.Dec()
90+
m.added.Delete(item)
91+
92+
m.processingStartTimes[item] = m.clock.Now()
93+
if startTime, exists := m.addTimes[item]; exists {
94+
m.latency.Observe(m.sinceInSeconds(startTime))
95+
delete(m.addTimes, item)
96+
}
97+
}
98+
99+
func (m *defaultQueueMetrics[T]) done(item T) {
100+
if m == nil {
101+
return
102+
}
103+
104+
m.mapLock.Lock()
105+
defer m.mapLock.Unlock()
106+
if startTime, exists := m.processingStartTimes[item]; exists {
107+
m.workDuration.Observe(m.sinceInSeconds(startTime))
108+
delete(m.processingStartTimes, item)
109+
}
110+
}
111+
112+
func (m *defaultQueueMetrics[T]) updateUnfinishedWork() {
113+
m.mapLock.RLock()
114+
defer m.mapLock.RUnlock()
115+
// Note that a summary metric would be better for this, but prometheus
116+
// doesn't seem to have non-hacky ways to reset the summary metrics.
117+
var total float64
118+
var oldest float64
119+
for _, t := range m.processingStartTimes {
120+
age := m.sinceInSeconds(t)
121+
total += age
122+
if age > oldest {
123+
oldest = age
124+
}
125+
}
126+
m.unfinishedWorkSeconds.Set(total)
127+
m.longestRunningProcessor.Set(oldest)
128+
}
129+
130+
// Gets the time since the specified start in seconds.
131+
func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 {
132+
return m.clock.Since(start).Seconds()
133+
}
134+
135+
type noMetrics[T any] struct{}
136+
137+
func (noMetrics[T]) add(item T) {}
138+
func (noMetrics[T]) get(item T) {}
139+
func (noMetrics[T]) done(item T) {}
140+
func (noMetrics[T]) updateUnfinishedWork() {}

0 commit comments

Comments
 (0)