Skip to content

Commit aa20ef5

Browse files
committed
Make Start threadsafe with Warmup + UT.
1 parent 0563114 commit aa20ef5

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

pkg/internal/controller/controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ func (c *Controller[request]) Warmup(ctx context.Context) error {
172172
return nil
173173
}
174174

175+
// Hold the lock to avoid concurrent access to c.startWatches with Start() when calling
176+
// startEventSources
177+
c.mu.Lock()
178+
defer c.mu.Unlock()
179+
175180
c.ensureDidWarmupFinishChanInitialized()
176181
err := c.startEventSources(ctx)
177182
c.didWarmupFinishSuccessfully.Store(err == nil)

pkg/internal/controller/controller_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ var _ = Describe("controller", func() {
533533
Expect(startCount.Load()).To(Equal(int32(1)), "Source should only be started once even when called multiple times")
534534
})
535535

536-
It("should block subsequent calls from returning until the first call to startEventSources has returned", func() {
536+
It("should block subsequent calls from returning until the first call to startEventSources has returned", func() {
537537
ctx, cancel := context.WithCancel(context.Background())
538538
defer cancel()
539539
ctrl.CacheSyncTimeout = 5 * time.Second
@@ -1257,6 +1257,39 @@ var _ = Describe("controller", func() {
12571257
close(ctrlWatchBlockingChan)
12581258
Eventually(hasNonWarmupCtrlWatchStarted.Load).Should(BeTrue())
12591259
})
1260+
1261+
It("should not race with Start and only start sources once", func() {
1262+
ctx, cancel := context.WithCancel(context.Background())
1263+
defer cancel()
1264+
1265+
ctrl.CacheSyncTimeout = time.Second
1266+
1267+
var watchStartedCount atomic.Int32
1268+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
1269+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1270+
watchStartedCount.Add(1)
1271+
return nil
1272+
}),
1273+
}
1274+
1275+
By("calling Warmup and Start concurrently")
1276+
go func() {
1277+
defer GinkgoRecover()
1278+
Expect(ctrl.Start(ctx)).To(Succeed())
1279+
}()
1280+
1281+
blockOnWarmupChan := make(chan struct{})
1282+
go func() {
1283+
defer GinkgoRecover()
1284+
Expect(ctrl.Warmup(ctx)).To(Succeed())
1285+
close(blockOnWarmupChan)
1286+
}()
1287+
1288+
<-blockOnWarmupChan
1289+
1290+
Expect(watchStartedCount.Load()).To(Equal(int32(1)), "source should only be started once")
1291+
Expect(ctrl.startWatches).To(BeNil(), "startWatches should be reset to nil after they are started")
1292+
})
12601293
})
12611294

12621295
Describe("Warmup with warmup disabled", func() {

0 commit comments

Comments
 (0)