Skip to content

Commit d858cd6

Browse files
committed
fix: deadlock on start missing watches
Signed-off-by: Alexandre Gaudreault <alexandre_gaudreault@intuit.com>
1 parent a22b346 commit d858cd6

File tree

1 file changed

+33
-27
lines changed

1 file changed

+33
-27
lines changed

pkg/cache/cluster.go

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -492,21 +492,24 @@ func (c *clusterCache) startMissingWatches() error {
492492

493493
err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
494494
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents
495-
if err != nil && c.isRestrictedResource(err) {
496-
keep := false
497-
if c.respectRBAC == RespectRbacStrict {
498-
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
499-
if permErr != nil {
500-
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
495+
if err != nil {
496+
if c.isRestrictedResource(err) {
497+
keep := false
498+
if c.respectRBAC == RespectRbacStrict {
499+
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
500+
if permErr != nil {
501+
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
502+
}
503+
keep = k
504+
}
505+
// if we are not allowed to list the resource, remove it from the watch list
506+
if !keep {
507+
delete(c.apisMeta, api.GroupKind)
508+
delete(namespacedResources, api.GroupKind)
509+
return nil
501510
}
502-
keep = k
503-
}
504-
// if we are not allowed to list the resource, remove it from the watch list
505-
if !keep {
506-
delete(c.apisMeta, api.GroupKind)
507-
delete(namespacedResources, api.GroupKind)
508-
return nil
509511
}
512+
return fmt.Errorf("failed to start watch %s: %w", api.GroupKind.String(), err)
510513
}
511514
go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
512515
return nil
@@ -527,11 +530,13 @@ func runSynced(lock sync.Locker, action func() error) error {
527530
}
528531

529532
// listResources creates list pager and enforces number of concurrent list requests
533+
// The callback should not wait on any locks that may be held by other callers.
530534
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
531535
if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
532536
return "", err
533537
}
534538
defer c.listSemaphore.Release(1)
539+
535540
var retryCount int64 = 0
536541
resourceVersion := ""
537542
listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
@@ -568,30 +573,31 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
568573
}
569574

570575
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
571-
return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
572-
var items []*Resource
573-
err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
576+
var items []*Resource
577+
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
578+
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
574579
if un, ok := obj.(*unstructured.Unstructured); !ok {
575580
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
576581
} else {
577582
items = append(items, c.newResource(un))
578583
}
579584
return nil
580585
})
586+
})
581587

582-
if err != nil {
583-
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
584-
}
585-
if lock {
586-
return runSynced(&c.lock, func() error {
587-
c.replaceResourceCache(api.GroupKind, items, ns)
588-
return nil
589-
})
590-
} else {
588+
if err != nil {
589+
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
590+
}
591+
592+
if lock {
593+
return resourceVersion, runSynced(&c.lock, func() error {
591594
c.replaceResourceCache(api.GroupKind, items, ns)
592595
return nil
593-
}
594-
})
596+
})
597+
} else {
598+
c.replaceResourceCache(api.GroupKind, items, ns)
599+
return resourceVersion, nil
600+
}
595601
}
596602

597603
func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {

0 commit comments

Comments
 (0)