Skip to content

✨ Implement warm replica support for controllers #3192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8239300
[Warm Replicas] Implement warm replica support for controllers.
godwinpang Apr 9, 2025
73fc8fa
Remove irrelevant runnable_group.go code.
godwinpang Apr 14, 2025
be1b1c2
Rename ShouldWarmup.
godwinpang Apr 14, 2025
c9b99eb
fmt
godwinpang Apr 14, 2025
e7a2bbf
Change to atomic.Bool to avoid race in test.
godwinpang Apr 14, 2025
854987c
Address comments.
godwinpang Apr 29, 2025
072ad4b
Add ready check to block controller startup until warmup is complete.
godwinpang May 2, 2025
43118a3
Keep test helper structs private.
godwinpang May 2, 2025
b67bc65
Address comments.
godwinpang May 12, 2025
fc7c8c5
Fix lint.
godwinpang May 12, 2025
6bb4616
Address naming + comments from sbueringer.
godwinpang May 13, 2025
ccc7485
Refactor tests to use HaveValue.
godwinpang May 13, 2025
54f4fe3
Document + add UT for WaitForWarmupComplete behavior on ctx cancellat…
godwinpang May 14, 2025
667bb03
Add unit test that exercises controller warmup integration with manager.
godwinpang May 14, 2025
66e3be4
Add UT that verifies WaitForWarmupComplete blocking / non-blocking be…
godwinpang May 14, 2025
d9cc96b
Verify r.Others.startQueue in runnables test cases.
godwinpang May 14, 2025
65a04d5
Fix UT to verify runnable ordering.
godwinpang May 14, 2025
c201bfa
Fix UT for WaitForWarmupComplete blocking.
godwinpang May 15, 2025
5a13db4
Document !NeedLeaderElection+NeedWarmup behavior
godwinpang May 15, 2025
4879527
Fix test race.
godwinpang May 16, 2025
57acc77
Cleanup test wrapper runnables.
godwinpang May 16, 2025
1987b54
Make didStartEventSources run once with sync.Once + UT.
godwinpang May 16, 2025
a49f3a4
Rewrite Warmup to avoid polling.
godwinpang May 16, 2025
89f5479
Rename NeedWarmup to EnableWarmup.
godwinpang May 16, 2025
9d5ddfb
Clarify comment on Warmup.
godwinpang May 16, 2025
66f64f0
Move reset watches critical section inside of startEventSources.
godwinpang May 16, 2025
0563114
Add test to assert startEventSources blocking behavior.
godwinpang May 16, 2025
aa20ef5
Make Start threadsafe with Warmup + UT.
godwinpang May 16, 2025
c9a2973
Change warmup to use buffered error channel and add New method.
godwinpang May 19, 2025
79a7b95
Fail in warmup directly and rely on sync.Once for warmup thread-safet…
godwinpang May 20, 2025
c1d8ea4
Sync controller EnableWarmup comments.
godwinpang May 20, 2025
5df573f
Rename to startEventSourcesLocked and lock with c.mu
godwinpang May 21, 2025
d8650df
Address edge case for watch added after warmup completes.
godwinpang May 21, 2025
a03f404
Fix test description and set leaderelection==true
godwinpang May 21, 2025
dcf4b8b
Fix lint.
godwinpang May 21, 2025
ba51d28
Change shutdown order to shutdown warmup runnables in parallel with o…
godwinpang May 22, 2025
ea2aa0e
Fix test races by ensuring goroutines do not outlive their It blocks.
godwinpang May 22, 2025
730b30e
Block on source start on context cancel.
godwinpang May 22, 2025
bca3e2a
Guard access to c.Queue explicitly.
godwinpang May 22, 2025
12e938c
Initialize queue in warmup with test.
godwinpang May 22, 2025
de4232d
Fix watch comment.
godwinpang May 22, 2025
a3dc13b
Add warmup to manager and controller integration tests.
godwinpang May 22, 2025
84d2053
fmt + lint.
godwinpang May 23, 2025
cefd22d
Add tests for Warmup for parity with Start.
godwinpang Jun 24, 2025
8c7b12f
golangci-lint
godwinpang Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ type Controller struct {
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// EnableWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its
// sources without waiting to become leader.
// Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without
// leader election do not wait on leader election to start their sources.
// Defaults to false.
EnableWarmup *bool

// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
Expand Down
22 changes: 20 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ type TypedOptions[request comparable] struct {
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool

// EnableWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its
// sources without waiting to become leader.
// Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without
// leader election do not wait on leader election to start their sources.
// Defaults to false.
EnableWarmup *bool
}

// DefaultFromConfig defaults the config from a config.Controller
Expand Down Expand Up @@ -124,6 +137,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller
if options.NeedLeaderElection == nil {
options.NeedLeaderElection = config.NeedLeaderElection
}

if options.EnableWarmup == nil {
options.EnableWarmup = config.EnableWarmup
}
}

// Controller implements an API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -243,7 +260,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
}

// Create controller with dependencies set
return &controller.Controller[request]{
return controller.New[request](controller.ControllerOptions[request]{
Do: options.Reconciler,
RateLimiter: options.RateLimiter,
NewQueue: options.NewQueue,
Expand All @@ -253,7 +270,8 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
}, nil
EnableWarmup: options.EnableWarmup,
}), nil
}

// ReconcileIDFromContext gets the reconcileID from the current context.
Expand Down
66 changes: 66 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,5 +474,71 @@ var _ = Describe("controller.Controller", func() {
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeFalse())
})

It("should set EnableWarmup correctly", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

// Test with EnableWarmup set to true
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
EnableWarmup: ptr.To(true),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithWarmup.EnableWarmup).To(HaveValue(BeTrue()))

// Test with EnableWarmup set to false
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
EnableWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithoutWarmup.EnableWarmup).To(HaveValue(BeFalse()))

// Test with EnableWarmup not set (should default to nil)
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithDefaultWarmup.EnableWarmup).To(BeNil())
})

It("should inherit EnableWarmup from manager config", func() {
// Test with manager default setting EnableWarmup to true
managerWithWarmup, err := manager.New(cfg, manager.Options{
Controller: config.Controller{
EnableWarmup: ptr.To(true),
},
})
Expect(err).NotTo(HaveOccurred())
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlInheritingWarmup.EnableWarmup).To(HaveValue(BeTrue()))

// Test that explicit controller setting overrides manager setting
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
EnableWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlOverridingWarmup.EnableWarmup).To(HaveValue(BeFalse()))
})
})
})
205 changes: 147 additions & 58 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,50 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type ControllerOptions[request comparable] struct {
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.TypedReconciler[request]

// RateLimiter is used to limit how frequently requests may be queued into the work queue.
RateLimiter workqueue.TypedRateLimiter[request]

// NewQueue constructs the queue for this controller once the controller is ready to start.
// This is a func because the standard Kubernetes work queues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]

// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int

// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration

// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string

// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *request) logr.Logger

// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
// Defaults to true.
RecoverPanic *bool

// LeaderElected indicates whether the controller is leader elected or always running.
LeaderElected *bool

// EnableWarmup specifies whether the controller should start its sources
// when the manager is not the leader.
// Defaults to false, which means that the controller will wait for leader election to start
// before starting sources.
EnableWarmup *bool
}

// Controller implements controller.Controller.
type Controller[request comparable] struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Expand Down Expand Up @@ -83,6 +127,9 @@ type Controller[request comparable] struct {
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []source.TypedSource[request]

// didStartEventSourcesOnce is used to ensure that the event sources are only started once.
didStartEventSourcesOnce sync.Once

// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
Expand All @@ -95,6 +142,34 @@ type Controller[request comparable] struct {

// LeaderElected indicates whether the controller is leader elected or always running.
LeaderElected *bool

// EnableWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its
// sources without waiting to become leader.
// Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without
// leader election do not wait on leader election to start their sources.
// Defaults to false.
EnableWarmup *bool
}

func New[request comparable](options ControllerOptions[request]) *Controller[request] {
return &Controller[request]{
Do: options.Do,
RateLimiter: options.RateLimiter,
NewQueue: options.NewQueue,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
Name: options.Name,
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.LeaderElected,
EnableWarmup: options.EnableWarmup,
}
}

// Reconcile implements reconcile.Reconciler.
Expand Down Expand Up @@ -144,6 +219,15 @@ func (c *Controller[request]) NeedLeaderElection() bool {
return *c.LeaderElected
}

// Warmup implements the manager.WarmupRunnable interface.
func (c *Controller[request]) Warmup(ctx context.Context) error {
if c.EnableWarmup == nil || !*c.EnableWarmup {
return nil
}

return c.startEventSources(ctx)
}

// Start implements controller.Controller.
func (c *Controller[request]) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
Expand Down Expand Up @@ -185,12 +269,6 @@ func (c *Controller[request]) Start(ctx context.Context) error {

c.LogConstructor(nil).Info("Starting Controller")

// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil

// Launch workers to process resources
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
Expand Down Expand Up @@ -221,60 +299,71 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// startEventSources launches all the sources registered with this controller and waits
// for them to sync. It returns an error if any of the sources fail to start or sync.
func (c *Controller[request]) startEventSources(ctx context.Context) error {
errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})

if !ok {
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
log = log.WithValues("source", fmt.Sprintf("%s", watch))
}
didStartSyncingSource := &atomic.Bool{}
errGroup.Go(func() error {
// Use a timeout for starting and syncing the source to avoid silently
// blocking startup indefinitely if it doesn't come up.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
go func() {
defer close(sourceStartErrChan)
log.Info("Starting EventSource")
if err := watch.Start(ctx, c.Queue); err != nil {
sourceStartErrChan <- err
return
}
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
return
}
didStartSyncingSource.Store(true)
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
log.Error(err, "Could not wait for Cache to sync")
sourceStartErrChan <- err
var retErr error

c.didStartEventSourcesOnce.Do(func() {
errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})
if !ok {
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
log = log.WithValues("source", fmt.Sprintf("%s", watch))
}
didStartSyncingSource := &atomic.Bool{}
errGroup.Go(func() error {
// Use a timeout for starting and syncing the source to avoid silently
// blocking startup indefinitely if it doesn't come up.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
go func() {
defer close(sourceStartErrChan)
log.Info("Starting EventSource")
if err := watch.Start(ctx, c.Queue); err != nil {
sourceStartErrChan <- err
return
}
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
return
}
didStartSyncingSource.Store(true)
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
log.Error(err, "Could not wait for Cache to sync")
sourceStartErrChan <- err
}
}()

select {
case err := <-sourceStartErrChan:
return err
case <-sourceStartCtx.Done():
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
return <-sourceStartErrChan
}
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
return nil
}
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}
}()
})
}
retErr = errGroup.Wait()

select {
case err := <-sourceStartErrChan:
return err
case <-sourceStartCtx.Done():
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
return <-sourceStartErrChan
}
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
return nil
}
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}
})
}
return errGroup.Wait()
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
})

return retErr
}

// processNextWorkItem will read a single work item off the workqueue and
Expand Down
Loading
Loading