diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 8f14bfdbfc..648d0d75b4 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -113,6 +113,10 @@ type Informer interface { // the handler again and an error if the handler cannot be added. AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) + // AddEventHandlerWithOptions is a variant of AddEventHandlerWithResyncPeriod where + // all optional parameters are passed in as a struct. + AddEventHandlerWithOptions(handler toolscache.ResourceEventHandler, options toolscache.HandlerOptions) (toolscache.ResourceEventHandlerRegistration, error) + // RemoveEventHandler removes a previously added event handler given by // its registration handle. // This function is guaranteed to be idempotent and thread-safe. @@ -207,11 +211,11 @@ type Options struct { // to reduce the caches memory usage. DefaultTransform toolscache.TransformFunc - // DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called + // DefaultWatchErrorHandler will be used to set the WatchErrorHandler which is called // whenever ListAndWatch drops the connection with an error. // // After calling this handler, the informer will backoff and retry. - DefaultWatchErrorHandler toolscache.WatchErrorHandler + DefaultWatchErrorHandler toolscache.WatchErrorHandlerWithContext // DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy // for everything that doesn't specify this. diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index d3cbf2c7fb..b21a9c6345 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -25,21 +25,26 @@ import ( "sync" "time" + "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) +var log = logf.RuntimeLog.WithName("cache") + // InformersOpts configures an InformerMap. type InformersOpts struct { HTTPClient *http.Client @@ -52,7 +57,7 @@ type InformersOpts struct { Transform cache.TransformFunc UnsafeDisableDeepCopy bool EnableWatchBookmarks bool - WatchErrorHandler cache.WatchErrorHandler + WatchErrorHandler cache.WatchErrorHandlerWithContext } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -105,7 +110,8 @@ func (c *Cache) Start(stop <-chan struct{}) { // Stop on either the whole map stopping or just this informer being removed. internalStop, cancel := syncs.MergeChans(stop, c.stop) defer cancel() - c.Informer.Run(internalStop) + // Convert the stop channel to a context and then add the logger. + c.Informer.RunWithContext(logr.NewContext(wait.ContextForChannel(internalStop), log)) } type tracker struct { @@ -181,10 +187,10 @@ type Informers struct { // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer - // WatchErrorHandler allows the shared index informer's + // watchErrorHandler allows the shared index informer's // watchErrorHandler to be set by overriding the options // or to use the default watchErrorHandler - watchErrorHandler cache.WatchErrorHandler + watchErrorHandler cache.WatchErrorHandlerWithContext } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -376,7 +382,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O // Set WatchErrorHandler on SharedIndexInformer if set if ip.watchErrorHandler != nil { - if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { + if err := sharedIndexInformer.SetWatchErrorHandlerWithContext(ip.watchErrorHandler); err != nil { return nil, false, err } } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f1e14a131c..f033f85e77 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -390,6 +390,23 @@ func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolsca return handles, nil } +// AddEventHandlerWithOptions adds the handler with options to each namespaced informer. +func (i *multiNamespaceInformer) AddEventHandlerWithOptions(handler toolscache.ResourceEventHandler, options toolscache.HandlerOptions) (toolscache.ResourceEventHandlerRegistration, error) { + handles := handlerRegistration{ + handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)), + } + + for ns, informer := range i.namespaceToInformer { + registration, err := informer.AddEventHandlerWithOptions(handler, options) + if err != nil { + return nil, err + } + handles.handles[ns] = registration + } + + return handles, nil +} + // RemoveEventHandler removes a previously added event handler given by its registration handle. func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error { handles, ok := h.(handlerRegistration) diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 2cbf12dbab..0b9c43c347 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -99,12 +99,24 @@ func (f *FakeInformer) HasSynced() bool { return f.Synced } -// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. +// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { f.handlers = append(f.handlers, eventHandlerWrapper{handler}) return nil, nil } +// AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration. +func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { + f.handlers = append(f.handlers, eventHandlerWrapper{handler}) + return nil, nil +} + +// AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration. +func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) { + f.handlers = append(f.handlers, eventHandlerWrapper{handler}) + return nil, nil +} + // Run implements the Informer interface. Increments f.RunCount. func (f *FakeInformer) Run(<-chan struct{}) { f.RunCount++ @@ -135,15 +147,6 @@ func (f *FakeInformer) Delete(obj metav1.Object) { } } -// AddEventHandlerWithResyncPeriod does nothing. TODO(community): Implement this. -func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil -} - -func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, options cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil -} - // RemoveEventHandler does nothing. TODO(community): Implement this. func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error { return nil @@ -169,7 +172,8 @@ func (f *FakeInformer) SetWatchErrorHandler(cache.WatchErrorHandler) error { return nil } -func (f *FakeInformer) SetWatchErrorHandlerWithContext(handler cache.WatchErrorHandlerWithContext) error { +// SetWatchErrorHandlerWithContext does nothing. TODO(community): Implement this. +func (f *FakeInformer) SetWatchErrorHandlerWithContext(cache.WatchErrorHandlerWithContext) error { return nil } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index d45476d390..d4047b7a09 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -168,7 +168,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { defer c.mu.Unlock() // TODO(pwittrock): Reconsider HandleCrash - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithLogger(c.LogConstructor(nil)) // NB(directxman12): launch the sources *before* trying to wait for the // caches to sync so that they have a chance to register their intended diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 2fdfbde8e3..6844239180 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -10,7 +10,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -18,6 +20,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) +var logKind = logf.RuntimeLog.WithName("source").WithName("Kind") + // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). type Kind[object client.Object, request comparable] struct { // Type is the type of object to watch. e.g. &v1.Pod{} @@ -68,12 +72,12 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type kindMatchErr := &meta.NoKindMatchError{} switch { case errors.As(lastErr, &kindMatchErr): - log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", + logKind.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", "kind", kindMatchErr.GroupKind) case runtime.IsNotRegisteredError(lastErr): - log.Error(lastErr, "kind must be registered to the Scheme") + logKind.Error(lastErr, "kind must be registered to the Scheme") default: - log.Error(lastErr, "failed to get informer from cache") + logKind.Error(lastErr, "failed to get informer from cache") } return false, nil // Retry. } @@ -87,7 +91,9 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type return } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs()) + _, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs(), toolscache.HandlerOptions{ + Logger: &logKind, + }) if err != nil { ks.startedErr <- err return diff --git a/pkg/log/log_test.go b/pkg/log/log_test.go index b75604b6be..f3bb4d6ec2 100644 --- a/pkg/log/log_test.go +++ b/pkg/log/log_test.go @@ -194,12 +194,12 @@ var _ = Describe("logging", func() { }() go func() { defer GinkgoRecover() - delegLog.WithValues("with-value") + delegLog.WithValues("key", "with-value") close(withValuesDone) }() go func() { defer GinkgoRecover() - child.WithValues("grandchild") + child.WithValues("key", "grandchild") close(grandChildDone) }() go func() { diff --git a/pkg/source/source.go b/pkg/source/source.go index 267a6470b8..ed59925eef 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -22,11 +22,13 @@ import ( "fmt" "sync" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -34,6 +36,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) +var logInformer = logf.RuntimeLog.WithName("source").WithName("Informer") + // Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) // which should be processed by event.EventHandlers to enqueue reconcile.Requests. // @@ -282,7 +286,9 @@ func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimiting return errors.New("must specify Informer.Handler") } - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs()) + _, err := is.Informer.AddEventHandlerWithOptions(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs(), toolscache.HandlerOptions{ + Logger: &logInformer, + }) if err != nil { return err }