Skip to content

Commit d8650df

Browse files
committed
Address edge case for watch added after warmup completes.
1 parent 5df573f commit d8650df

File tree

2 files changed

+38
-4
lines changed

2 files changed

+38
-4
lines changed

pkg/internal/controller/controller.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ type Controller[request comparable] struct {
127127
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
128128
startWatches []source.TypedSource[request]
129129

130+
// startedEventSources is used to track if the event sources have been started.
131+
// It ensures that we append sources to c.startWatches only until we call Start() / Warmup()
132+
// It is true if startEventSourcesLocked has been called at least once.
133+
startedEventSources bool
134+
130135
// didStartEventSourcesOnce is used to ensure that the event sources are only started once.
131136
didStartEventSourcesOnce sync.Once
132137

@@ -199,10 +204,9 @@ func (c *Controller[request]) Watch(src source.TypedSource[request]) error {
199204
c.mu.Lock()
200205
defer c.mu.Unlock()
201206

202-
// Controller hasn't started yet, store the watches locally and return.
203-
//
204-
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
205-
if !c.Started {
207+
// Sources weren't started yet, store the watches locally and return.
208+
// These sources are going to be held until either Warmup() or Start(...) is called.
209+
if !c.startedEventSources {
206210
c.startWatches = append(c.startWatches, src)
207211
return nil
208212
}
@@ -228,6 +232,9 @@ func (c *Controller[request]) Warmup(ctx context.Context) error {
228232
c.mu.Lock()
229233
defer c.mu.Unlock()
230234

235+
// Set the ctx so later calls to watch use this internal context of nil
236+
c.ctx = ctx
237+
231238
return c.startEventSourcesLocked(ctx)
232239
}
233240

@@ -364,6 +371,10 @@ func (c *Controller[request]) startEventSourcesLocked(ctx context.Context) error
364371
// We should never hold watches more than necessary, each watch source can hold a backing cache,
365372
// which won't be garbage collected if we hold a reference to it.
366373
c.startWatches = nil
374+
375+
// Mark event sources as started after resetting the startWatches slice to no-op a Watch()
376+
// call after event sources have been started.
377+
c.startedEventSources = true
367378
})
368379

369380
return retErr

pkg/internal/controller/controller_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,29 @@ var _ = Describe("controller", func() {
12931293
Expect(watchStartedCount.Load()).To(Equal(int32(1)), "source should only be started once")
12941294
Expect(ctrl.startWatches).To(BeNil(), "startWatches should be reset to nil after they are started")
12951295
})
1296+
1297+
It("should start sources added after Warmup is called", func() {
1298+
ctx, cancel := context.WithCancel(context.Background())
1299+
defer cancel()
1300+
1301+
ctrl.CacheSyncTimeout = time.Second
1302+
1303+
Expect(ctrl.Warmup(ctx)).To(Succeed())
1304+
1305+
By("starting a watch after warmup is added")
1306+
var didWatchStart atomic.Bool
1307+
Expect(ctrl.Watch(source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1308+
didWatchStart.Store(true)
1309+
return nil
1310+
}))).To(Succeed())
1311+
1312+
go func() {
1313+
defer GinkgoRecover()
1314+
Expect(ctrl.Start(ctx)).To(Succeed())
1315+
}()
1316+
1317+
Eventually(didWatchStart.Load).Should(BeTrue(), "watch should be started if it is added after Warmup")
1318+
})
12961319
})
12971320

12981321
Describe("Warmup with warmup disabled", func() {

0 commit comments

Comments
 (0)