Skip to content

Commit c9a2973

Browse files
committed
Change warmup to use buffered error channel and add New method.
1 parent aa20ef5 commit c9a2973

File tree

3 files changed

+87
-40
lines changed

3 files changed

+87
-40
lines changed

pkg/controller/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
257257
}
258258

259259
// Create controller with dependencies set
260-
return &controller.Controller[request]{
260+
return controller.New[request](controller.ControllerOptions[request]{
261261
Do: options.Reconciler,
262262
RateLimiter: options.RateLimiter,
263263
NewQueue: options.NewQueue,
@@ -268,7 +268,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
268268
RecoverPanic: options.RecoverPanic,
269269
LeaderElected: options.NeedLeaderElection,
270270
EnableWarmup: options.EnableWarmup,
271-
}, nil
271+
}), nil
272272
}
273273

274274
// ReconcileIDFromContext gets the reconcileID from the current context.

pkg/internal/controller/controller.go

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,54 @@ import (
3838
"sigs.k8s.io/controller-runtime/pkg/source"
3939
)
4040

41+
type ControllerOptions[request comparable] struct {
42+
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
43+
// ensures that the state of the system matches the state specified in the object.
44+
// Defaults to the DefaultReconcileFunc.
45+
Do reconcile.TypedReconciler[request]
46+
47+
// RateLimiter is used to limit how frequently requests may be queued into the work queue.
48+
RateLimiter workqueue.TypedRateLimiter[request]
49+
50+
// NewQueue constructs the queue for this controller once the controller is ready to start.
51+
// This is a func because the standard Kubernetes work queues start themselves immediately, which
52+
// leads to goroutine leaks if something calls controller.New repeatedly.
53+
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]
54+
55+
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
56+
MaxConcurrentReconciles int
57+
58+
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
59+
// Defaults to 2 minutes if not set.
60+
CacheSyncTimeout time.Duration
61+
62+
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
63+
Name string
64+
65+
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
66+
// or for example when a watch is started.
67+
// Note: LogConstructor has to be able to handle nil requests as we are also using it
68+
// outside the context of a reconciliation.
69+
LogConstructor func(request *request) logr.Logger
70+
71+
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
72+
// Defaults to true.
73+
RecoverPanic *bool
74+
75+
// LeaderElected indicates whether the controller is leader elected or always running.
76+
LeaderElected *bool
77+
78+
// EnableWarmup specifies whether the controller should start its sources
79+
// when the manager is not the leader.
80+
// Defaults to false, which means that the controller will wait for leader election to start
81+
// before starting sources.
82+
EnableWarmup *bool
83+
}
84+
4185
// Controller implements controller.Controller.
86+
// WARNING: If directly instantiating a Controller vs. using the New method, ensure that the
87+
// warmupResultChan is instantiated as a buffered channel of size 1. Otherwise, the controller will
88+
// panic on having Warmup called.
4289
type Controller[request comparable] struct {
4390
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
4491
Name string
@@ -86,18 +133,9 @@ type Controller[request comparable] struct {
86133
// didStartEventSourcesOnce is used to ensure that the event sources are only started once.
87134
didStartEventSourcesOnce sync.Once
88135

89-
// ensureDidWarmupFinishChanInitializedOnce is used to ensure that the didWarmupFinishChan is
90-
// initialized to a non-nil channel.
91-
ensureDidWarmupFinishChanInitializedOnce sync.Once
92-
93-
// didWarmupFinish is closed when startEventSources returns. It is used to
94-
// signal to WaitForWarmupComplete that the event sources have finished syncing.
95-
didWarmupFinishChan chan struct{}
96-
97-
// didWarmupFinishSuccessfully is used to indicate whether the event sources have finished
98-
// successfully. If true, the event sources have finished syncing without error. If false, the
99-
// event sources have finished syncing but with error.
100-
didWarmupFinishSuccessfully atomic.Bool
136+
// warmupResultChan receives the result (nil / non-nil error) of the warmup method. It is
137+
// consumed by the WaitForWarmupComplete method that the warmup has finished.
138+
warmupResultChan chan error
101139

102140
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
103141
// or for example when a watch is started.
@@ -119,6 +157,22 @@ type Controller[request comparable] struct {
119157
EnableWarmup *bool
120158
}
121159

160+
func New[request comparable](options ControllerOptions[request]) *Controller[request] {
161+
return &Controller[request]{
162+
Do: options.Do,
163+
RateLimiter: options.RateLimiter,
164+
NewQueue: options.NewQueue,
165+
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
166+
CacheSyncTimeout: options.CacheSyncTimeout,
167+
Name: options.Name,
168+
LogConstructor: options.LogConstructor,
169+
RecoverPanic: options.RecoverPanic,
170+
LeaderElected: options.LeaderElected,
171+
EnableWarmup: options.EnableWarmup,
172+
warmupResultChan: make(chan error, 1),
173+
}
174+
}
175+
122176
// Reconcile implements reconcile.Reconciler.
123177
func (c *Controller[request]) Reconcile(ctx context.Context, req request) (_ reconcile.Result, err error) {
124178
defer func() {
@@ -177,10 +231,8 @@ func (c *Controller[request]) Warmup(ctx context.Context) error {
177231
c.mu.Lock()
178232
defer c.mu.Unlock()
179233

180-
c.ensureDidWarmupFinishChanInitialized()
181234
err := c.startEventSources(ctx)
182-
c.didWarmupFinishSuccessfully.Store(err == nil)
183-
close(c.didWarmupFinishChan)
235+
c.warmupResultChan <- err
184236

185237
return err
186238
}
@@ -192,9 +244,13 @@ func (c *Controller[request]) WaitForWarmupComplete(ctx context.Context) bool {
192244
return true
193245
}
194246

195-
c.ensureDidWarmupFinishChanInitialized()
196-
<-c.didWarmupFinishChan
197-
return c.didWarmupFinishSuccessfully.Load()
247+
warmupError, ok := <-c.warmupResultChan
248+
if !ok {
249+
// channel closed unexpectedly
250+
return false
251+
}
252+
253+
return warmupError == nil
198254
}
199255

200256
// Start implements controller.Controller.
@@ -441,15 +497,6 @@ func (c *Controller[request]) updateMetrics(reconcileTime time.Duration) {
441497
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
442498
}
443499

444-
// ensureDidWarmupFinishChanInitialized ensures that the didWarmupFinishChan is initialized. This is needed
445-
// because controller can directly be created from other packages like controller.Controller, and
446-
// there is no way for the caller to pass in the chan.
447-
func (c *Controller[request]) ensureDidWarmupFinishChanInitialized() {
448-
c.ensureDidWarmupFinishChanInitializedOnce.Do(func() {
449-
c.didWarmupFinishChan = make(chan struct{})
450-
})
451-
}
452-
453500
// ReconcileIDFromContext gets the reconcileID from the current context.
454501
func ReconcileIDFromContext(ctx context.Context) types.UID {
455502
r, ok := ctx.Value(reconcileIDKey{}).(types.UID)

pkg/internal/controller/controller_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var _ = Describe("controller", func() {
7272
queue = &controllertest.Queue{
7373
TypedInterface: workqueue.NewTyped[reconcile.Request](),
7474
}
75-
ctrl = &Controller[reconcile.Request]{
75+
ctrl = New[reconcile.Request](ControllerOptions[reconcile.Request]{
7676
MaxConcurrentReconciles: 1,
7777
Do: fakeReconcile,
7878
NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
@@ -81,7 +81,7 @@ var _ = Describe("controller", func() {
8181
LogConstructor: func(_ *reconcile.Request) logr.Logger {
8282
return log.RuntimeLog.WithName("controller").WithName("test")
8383
},
84-
}
84+
})
8585
})
8686

8787
Describe("Reconciler", func() {
@@ -353,14 +353,14 @@ var _ = Describe("controller", func() {
353353
TypedRateLimitingInterface: &controllertest.TypedQueue[TestRequest]{
354354
TypedInterface: workqueue.NewTyped[TestRequest](),
355355
}}
356-
ctrl := &Controller[TestRequest]{
356+
ctrl := New[TestRequest](ControllerOptions[TestRequest]{
357357
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
358358
return queue
359359
},
360360
LogConstructor: func(*TestRequest) logr.Logger {
361361
return log.RuntimeLog.WithName("controller").WithName("test")
362362
},
363-
}
363+
})
364364
ctrl.CacheSyncTimeout = time.Second
365365
src := &bisignallingSource[TestRequest]{
366366
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
@@ -1210,7 +1210,7 @@ var _ = Describe("controller", func() {
12101210
}),
12111211
}
12121212

1213-
nonWarmupCtrl := &Controller[reconcile.Request]{
1213+
nonWarmupCtrl := New[reconcile.Request](ControllerOptions[reconcile.Request]{
12141214
MaxConcurrentReconciles: 1,
12151215
Do: fakeReconcile,
12161216
NewQueue: func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
@@ -1222,12 +1222,12 @@ var _ = Describe("controller", func() {
12221222
CacheSyncTimeout: time.Second,
12231223
EnableWarmup: ptr.To(false),
12241224
LeaderElected: ptr.To(true),
1225-
startWatches: []source.TypedSource[reconcile.Request]{
1226-
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1227-
hasNonWarmupCtrlWatchStarted.Store(true)
1228-
return nil
1229-
}),
1230-
},
1225+
})
1226+
nonWarmupCtrl.startWatches = []source.TypedSource[reconcile.Request]{
1227+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1228+
hasNonWarmupCtrlWatchStarted.Store(true)
1229+
return nil
1230+
}),
12311231
}
12321232

12331233
By("Creating a manager")

0 commit comments

Comments
 (0)