diff --git a/examples/kind/main.go b/examples/kind/main.go index 8f0ff58..f1ba4fd 100644 --- a/examples/kind/main.go +++ b/examples/kind/main.go @@ -74,9 +74,9 @@ func main() { For(&corev1.Pod{}). Complete(mcreconcile.Func( func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { - log := log.FromContext(ctx).WithValues("cluster", req.ClusterName) + log := log.FromContext(ctx).WithValues("cluster", req.Cluster) - cl, err := mgr.GetCluster(ctx, req.ClusterName) + cl, err := mgr.GetCluster(ctx, req.Cluster) if err != nil { return reconcile.Result{}, err } diff --git a/examples/namespace/main.go b/examples/namespace/main.go index 080ef25..c5a231e 100644 --- a/examples/namespace/main.go +++ b/examples/namespace/main.go @@ -129,9 +129,9 @@ func main() { For(&corev1.ConfigMap{}). Complete(mcreconcile.Func( func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { - log := log.FromContext(ctx).WithValues("cluster", req.ClusterName) + log := log.FromContext(ctx).WithValues("cluster", req.Cluster) - cl, err := mgr.GetCluster(ctx, req.ClusterName) + cl, err := mgr.GetCluster(ctx, req.Cluster) if err != nil { return reconcile.Result{}, err } @@ -142,7 +142,7 @@ func main() { if err := client.Get(ctx, req.NamespacedName, cm); err != nil { return reconcile.Result{}, err } - log.Info("Reconciling configmap", "cluster", req.ClusterName, "ns", req.Namespace, "name", cm.Name, "uuid", cm.UID) + log.Info("Reconciling configmap", "cluster", req.Cluster, "ns", req.Namespace, "name", cm.Name, "uuid", cm.UID) return ctrl.Result{}, nil }, diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 47b519e..d31aa34 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -33,7 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - "sigs.k8s.io/controller-runtime/pkg/cluster" + ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -58,32 +58,32 @@ const ( ) // Builder builds a Controller. -type Builder = TypedBuilder[mcreconcile.Request] +type Builder = TypedBuilder[string, mcreconcile.TypedRequest[string]] // TypedBuilder builds a Controller. The request is the request type // that is passed to the workqueue and then to the Reconciler. // The workqueue de-duplicates identical requests. -type TypedBuilder[request mcreconcile.ClusterAware[request]] struct { +type TypedBuilder[cluster comparable, request mcreconcile.ClusterAware[cluster, request]] struct { forInput ForInput ownsInput []OwnsInput rawSources []source.TypedSource[request] - watchesInput []WatchesInput[request] - mgr mcmanager.Manager + watchesInput []WatchesInput[cluster, request] + mgr mcmanager.TypedManager[cluster] globalPredicates []predicate.Predicate - ctrl mccontroller.TypedController[request] + ctrl mccontroller.TypedController[cluster, request] ctrlOptions controller.TypedOptions[request] name string - newController func(name string, mgr mcmanager.Manager, options controller.TypedOptions[request]) (mccontroller.TypedController[request], error) + newController func(name string, mgr mcmanager.TypedManager[cluster], options controller.TypedOptions[request]) (mccontroller.TypedController[cluster, request], error) } // ControllerManagedBy returns a new controller builder that will be started by the provided Manager. func ControllerManagedBy(m mcmanager.Manager) *Builder { - return TypedControllerManagedBy[mcreconcile.Request](m) + return TypedControllerManagedBy[string, mcreconcile.Request](m) } // TypedControllerManagedBy returns a new typed controller builder that will be started by the provided Manager. -func TypedControllerManagedBy[request mcreconcile.ClusterAware[request]](m mcmanager.Manager) *TypedBuilder[request] { - return &TypedBuilder[request]{mgr: m} +func TypedControllerManagedBy[cluster comparable, request mcreconcile.ClusterAware[cluster, request]](m mcmanager.TypedManager[cluster]) *TypedBuilder[cluster, request] { + return &TypedBuilder[cluster, request]{mgr: m} } // ForInput represents the information set by the For method. @@ -101,7 +101,7 @@ type ForInput struct { // // This is the equivalent of calling // Watches(source.Kind(cache, &Type{}, &handler.EnqueueRequestForObject{})). -func (blder *TypedBuilder[request]) For(object client.Object, opts ...ForOption) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) For(object client.Object, opts ...ForOption) *TypedBuilder[cluster, request] { if blder.forInput.object != nil { blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") return blder @@ -133,7 +133,7 @@ type OwnsInput struct { // // By default, this is the equivalent of calling // Watches(source.Kind(cache, &Type{}, handler.EnqueueRequestForOwner([...], &OwnerType{}, OnlyControllerOwner()))). -func (blder *TypedBuilder[request]) Owns(object client.Object, opts ...OwnsOption) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) Owns(object client.Object, opts ...OwnsOption) *TypedBuilder[cluster, request] { input := OwnsInput{object: object} for _, opt := range opts { opt.ApplyToOwns(&input) @@ -152,23 +152,23 @@ type untypedWatchesInput interface { // NewTypedEventHandlerFunc is a constructor for a TypedEventHandler that uses // a given cluster. -type NewTypedEventHandlerFunc[request mcreconcile.ClusterAware[request]] func(string, cluster.Cluster) handler.TypedEventHandler[client.Object, request] +type NewTypedEventHandlerFunc[cluster comparable, request mcreconcile.ClusterAware[cluster, request]] func(cluster, ctrlcluster.Cluster) handler.TypedEventHandler[client.Object, request] // WatchesInput represents the information set by Watches method. -type WatchesInput[request mcreconcile.ClusterAware[request]] struct { +type WatchesInput[cluster comparable, request mcreconcile.ClusterAware[cluster, request]] struct { obj client.Object - handler NewTypedEventHandlerFunc[request] + handler NewTypedEventHandlerFunc[cluster, request] predicates []predicate.Predicate objectProjection objectProjection EngageOptions } -func (w *WatchesInput[request]) setPredicates(predicates []predicate.Predicate) { +func (w *WatchesInput[cluster, request]) setPredicates(predicates []predicate.Predicate) { w.predicates = predicates } -func (w *WatchesInput[request]) setObjectProjection(objectProjection objectProjection) { +func (w *WatchesInput[cluster, request]) setObjectProjection(objectProjection objectProjection) { w.objectProjection = objectProjection } @@ -177,15 +177,15 @@ func (w *WatchesInput[request]) setObjectProjection(objectProjection objectProje // // This is the equivalent of calling // WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)). -func (blder *TypedBuilder[request]) Watches( +func (blder *TypedBuilder[cluster, request]) Watches( object client.Object, - eventHandler NewTypedEventHandlerFunc[request], + eventHandler NewTypedEventHandlerFunc[cluster, request], opts ...WatchesOption, -) *TypedBuilder[request] { - input := WatchesInput[request]{ +) *TypedBuilder[cluster, request] { + input := WatchesInput[cluster, request]{ obj: object, - handler: func(name string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, request] { - hdlr := handlerWithCluster[client.Object, request](name, eventHandler(name, cl)) + handler: func(clRef cluster, cl ctrlcluster.Cluster) handler.TypedEventHandler[client.Object, request] { + hdlr := handlerWithCluster[client.Object, cluster, request](clRef, eventHandler(clRef, cl)) return handler.WithLowPriorityWhenUnchanged(hdlr) }, } @@ -225,11 +225,11 @@ func (blder *TypedBuilder[request]) Watches( // In the first case, controller-runtime will create another cache for the // concrete type on top of the metadata cache; this increases memory // consumption and leads to race conditions as caches are not in sync. -func (blder *TypedBuilder[request]) WatchesMetadata( +func (blder *TypedBuilder[cluster, request]) WatchesMetadata( object client.Object, - eventHandler NewTypedEventHandlerFunc[request], + eventHandler NewTypedEventHandlerFunc[cluster, request], opts ...WatchesOption, -) *TypedBuilder[request] { +) *TypedBuilder[cluster, request] { opts = append(opts, OnlyMetadata) return blder.Watches(object, eventHandler, opts...) } @@ -239,7 +239,7 @@ func (blder *TypedBuilder[request]) WatchesMetadata( // WatchesRawSource does not respect predicates configured through WithEventFilter. // // WatchesRawSource makes it possible to use typed handlers and predicates with `source.Kind` as well as custom source implementations. -func (blder *TypedBuilder[request]) WatchesRawSource(src source.TypedSource[request]) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) WatchesRawSource(src source.TypedSource[request]) *TypedBuilder[cluster, request] { blder.rawSources = append(blder.rawSources, src) return blder @@ -251,19 +251,19 @@ func (blder *TypedBuilder[request]) WatchesRawSource(src source.TypedSource[requ // of all watched objects. // // Defaults to the empty list. -func (blder *TypedBuilder[request]) WithEventFilter(p predicate.Predicate) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) WithEventFilter(p predicate.Predicate) *TypedBuilder[cluster, request] { blder.globalPredicates = append(blder.globalPredicates, p) return blder } // WithOptions overrides the controller options used in doController. Defaults to empty. -func (blder *TypedBuilder[request]) WithOptions(options controller.TypedOptions[request]) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) WithOptions(options controller.TypedOptions[request]) *TypedBuilder[cluster, request] { blder.ctrlOptions = options return blder } // WithLogConstructor overrides the controller options's LogConstructor. -func (blder *TypedBuilder[request]) WithLogConstructor(logConstructor func(*request) logr.Logger) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) WithLogConstructor(logConstructor func(*request) logr.Logger) *TypedBuilder[cluster, request] { blder.ctrlOptions.LogConstructor = logConstructor return blder } @@ -275,7 +275,7 @@ func (blder *TypedBuilder[request]) WithLogConstructor(logConstructor func(*requ // By default, controllers are named using the lowercase version of their kind. // // The name must be unique as it is used to identify the controller in metrics and logs. -func (blder *TypedBuilder[request]) Named(name string) *TypedBuilder[request] { +func (blder *TypedBuilder[cluster, request]) Named(name string) *TypedBuilder[cluster, request] { blder.name = name return blder } @@ -284,7 +284,7 @@ func (blder *TypedBuilder[request]) Named(name string) *TypedBuilder[request] { // // Note: use context.ReconcilerWithClusterInContext to inject the cluster name // into the and to use Manager.GetClusterInContext to retrieve the cluster. -func (blder *TypedBuilder[request]) Complete(r reconcile.TypedReconciler[request]) error { +func (blder *TypedBuilder[cluster, request]) Complete(r reconcile.TypedReconciler[request]) error { _, err := blder.Build(r) return err } @@ -293,7 +293,7 @@ func (blder *TypedBuilder[request]) Complete(r reconcile.TypedReconciler[request // // Note: use context.ReconcilerWithClusterInContext to inject the cluster name // into the and to use Manager.GetClusterInContext to retrieve the cluster. -func (blder *TypedBuilder[request]) Build(r reconcile.TypedReconciler[request]) (mccontroller.TypedController[request], error) { +func (blder *TypedBuilder[cluster, request]) Build(r reconcile.TypedReconciler[request]) (mccontroller.TypedController[cluster, request], error) { if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } @@ -317,8 +317,8 @@ func (blder *TypedBuilder[request]) Build(r reconcile.TypedReconciler[request]) return blder.ctrl, nil } -func (blder *TypedBuilder[request]) project(proj objectProjection) func(cluster.Cluster, client.Object) (client.Object, error) { - return func(cl cluster.Cluster, obj client.Object) (client.Object, error) { +func (blder *TypedBuilder[cluster, request]) project(proj objectProjection) func(ctrlcluster.Cluster, client.Object) (client.Object, error) { + return func(cl ctrlcluster.Cluster, obj client.Object) (client.Object, error) { switch proj { case projectAsNormal: return obj, nil @@ -336,31 +336,32 @@ func (blder *TypedBuilder[request]) project(proj objectProjection) func(cluster. } } -func (blder *TypedBuilder[request]) doWatch() error { +func (blder *TypedBuilder[cluster, request]) doWatch() error { // Reconcile type if blder.forInput.object != nil { var enqueueRequestForObject any if reflect.TypeFor[request]() == reflect.TypeOf(reconcile.Request{}) { enqueueRequestForObject = handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) } else if reflect.TypeFor[request]() == reflect.TypeOf(mcreconcile.Request{}) { - enqueueRequestForObject = handler.WithLowPriorityWhenUnchanged(&mchandler.EnqueueRequestForObject{}) + enqueueRequestForObject = handler.WithLowPriorityWhenUnchanged[client.Object, mcreconcile.TypedRequest[cluster]](&mchandler.TypedEnqueueRequestForObject[client.Object, cluster]{}) } else { return fmt.Errorf("For() can only be used with (mc)reconcile.Request, got %T", *new(request)) } - newHdler := func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[client.Object, request] { + newHdler := func(cl cluster, _ ctrlcluster.Cluster) handler.TypedEventHandler[client.Object, request] { var hdler handler.TypedEventHandler[client.Object, request] reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(enqueueRequestForObject)) - return handlerWithCluster[client.Object, request](clusterName, hdler) + return handlerWithCluster[client.Object, cluster, request](cl, hdler) } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, blder.forInput.predicates...) - src := mcsource.TypedKind[client.Object, request](blder.forInput.object, newHdler, allPredicates...). + src := mcsource.TypedKind[client.Object, cluster, request](blder.forInput.object, newHdler, allPredicates...). WithProjection(blder.project(blder.forInput.objectProjection)) if ptr.Deref(blder.forInput.engageWithLocalCluster, blder.mgr.GetProvider() == nil) { - src, err := src.ForCluster("", blder.mgr.GetLocalManager()) + var zero cluster + src, err := src.ForCluster(zero, blder.mgr.GetLocalManager()) if err != nil { return err } @@ -385,21 +386,22 @@ func (blder *TypedBuilder[request]) doWatch() error { opts = append(opts, handler.OnlyControllerOwner()) } - newHdler := func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[client.Object, request] { + newHdler := func(clRef cluster, cl ctrlcluster.Cluster) handler.TypedEventHandler[client.Object, request] { var hdler handler.TypedEventHandler[client.Object, request] reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner( blder.mgr.GetLocalManager().GetScheme(), cl.GetRESTMapper(), blder.forInput.object, opts..., )))) - return handlerWithCluster[client.Object, request](clusterName, hdler) + return handlerWithCluster[client.Object, cluster, request](clRef, hdler) } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) - src := mcsource.TypedKind[client.Object, request](own.object, newHdler, allPredicates...). + src := mcsource.TypedKind[client.Object, cluster, request](own.object, newHdler, allPredicates...). WithProjection(blder.project(own.objectProjection)) if ptr.Deref(own.engageWithLocalCluster, blder.mgr.GetProvider() == nil) { - src, err := src.ForCluster("", blder.mgr.GetLocalManager()) + var zero cluster + src, err := src.ForCluster(zero, blder.mgr.GetLocalManager()) if err != nil { return err } @@ -423,7 +425,8 @@ func (blder *TypedBuilder[request]) doWatch() error { allPredicates = append(allPredicates, w.predicates...) src := mcsource.TypedKind(w.obj, w.handler, allPredicates...).WithProjection(blder.project(w.objectProjection)) if ptr.Deref(w.engageWithLocalCluster, blder.mgr.GetProvider() == nil) { - src, err := src.ForCluster("", blder.mgr.GetLocalManager()) + var zero cluster + src, err := src.ForCluster(zero, blder.mgr.GetLocalManager()) if err != nil { return err } @@ -445,7 +448,7 @@ func (blder *TypedBuilder[request]) doWatch() error { return nil } -func (blder *TypedBuilder[request]) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) { +func (blder *TypedBuilder[cluster, request]) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) { if blder.name != "" { return blder.name, nil } @@ -455,7 +458,7 @@ func (blder *TypedBuilder[request]) getControllerName(gvk schema.GroupVersionKin return strings.ToLower(gvk.Kind), nil } -func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[request]) error { +func (blder *TypedBuilder[cluster, request]) doController(r reconcile.TypedReconciler[request]) error { globalOpts := blder.mgr.GetControllerOptions() ctrlOptions := blder.ctrlOptions @@ -525,7 +528,7 @@ func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[req } if blder.newController == nil { - blder.newController = mccontroller.NewTyped[request] + blder.newController = mccontroller.NewTyped[cluster, request] } // Build the controller and return. diff --git a/pkg/builder/multicluster_handler.go b/pkg/builder/multicluster_handler.go index 16c823e..be5f68f 100644 --- a/pkg/builder/multicluster_handler.go +++ b/pkg/builder/multicluster_handler.go @@ -23,81 +23,81 @@ import ( mcreconcile "github.com/multicluster-runtime/multicluster-runtime/pkg/reconcile" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" + ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" ) // StaticHandler returns a handler constructor with a static value. -func StaticHandler[object client.Object, request comparable](h handler.TypedEventHandler[object, request]) func(cluster.Cluster) handler.TypedEventHandler[object, request] { - return func(cl cluster.Cluster) handler.TypedEventHandler[object, request] { +func StaticHandler[object client.Object, request comparable](h handler.TypedEventHandler[object, request]) func(ctrlcluster.Cluster) handler.TypedEventHandler[object, request] { + return func(cl ctrlcluster.Cluster) handler.TypedEventHandler[object, request] { return h } } // handlerWithCluster wraps a handler and injects the cluster name into the // reuqests that are enqueued. -func handlerWithCluster[object any, request mcreconcile.ClusterAware[request]](name string, h handler.TypedEventHandler[object, request]) handler.TypedEventHandler[object, request] { +func handlerWithCluster[object any, cluster comparable, request mcreconcile.ClusterAware[cluster, request]](cl cluster, h handler.TypedEventHandler[object, request]) handler.TypedEventHandler[object, request] { return handler.TypedFuncs[object, request]{ CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { - h.Create(ctx, e, clusterAwareWorkqueue[request]{cluster: name, q: q}) + h.Create(ctx, e, clusterAwareWorkqueue[cluster, request]{cluster: cl, q: q}) }, UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { - h.Update(ctx, e, clusterAwareWorkqueue[request]{cluster: name, q: q}) + h.Update(ctx, e, clusterAwareWorkqueue[cluster, request]{cluster: cl, q: q}) }, DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request]) { - h.Delete(ctx, e, clusterAwareWorkqueue[request]{cluster: name, q: q}) + h.Delete(ctx, e, clusterAwareWorkqueue[cluster, request]{cluster: cl, q: q}) }, } } -type clusterAwareWorkqueue[request mcreconcile.ClusterAware[request]] struct { - cluster string +type clusterAwareWorkqueue[cluster comparable, request mcreconcile.ClusterAware[cluster, request]] struct { + cluster cluster q workqueue.TypedRateLimitingInterface[request] } -var _ workqueue.TypedInterface[mcreconcile.Request] = &clusterAwareWorkqueue[mcreconcile.Request]{} +var _ workqueue.TypedInterface[mcreconcile.Request] = &clusterAwareWorkqueue[string, mcreconcile.Request]{} -func (q clusterAwareWorkqueue[request]) Add(item request) { +func (q clusterAwareWorkqueue[cluster, request]) Add(item request) { q.q.Add(item.WithCluster(q.cluster)) } -func (q clusterAwareWorkqueue[request]) AddAfter(item request, duration time.Duration) { +func (q clusterAwareWorkqueue[cluster, request]) AddAfter(item request, duration time.Duration) { q.q.AddAfter(item.WithCluster(q.cluster), duration) } -func (q clusterAwareWorkqueue[request]) AddRateLimited(item request) { +func (q clusterAwareWorkqueue[cluster, request]) AddRateLimited(item request) { q.q.AddRateLimited(item.WithCluster(q.cluster)) } -func (q clusterAwareWorkqueue[request]) Forget(item request) { +func (q clusterAwareWorkqueue[cluster, request]) Forget(item request) { q.q.Forget(item.WithCluster(q.cluster)) } -func (q clusterAwareWorkqueue[request]) NumRequeues(item request) int { +func (q clusterAwareWorkqueue[cluster, request]) NumRequeues(item request) int { return q.q.NumRequeues(item.WithCluster(q.cluster)) } -func (q clusterAwareWorkqueue[request]) Len() int { +func (q clusterAwareWorkqueue[cluster, request]) Len() int { return q.q.Len() } -func (q clusterAwareWorkqueue[request]) Get() (item request, shutdown bool) { +func (q clusterAwareWorkqueue[cluster, request]) Get() (item request, shutdown bool) { return q.q.Get() } -func (q clusterAwareWorkqueue[request]) Done(item request) { +func (q clusterAwareWorkqueue[cluster, request]) Done(item request) { q.q.Done(item.WithCluster(q.cluster)) } -func (q clusterAwareWorkqueue[request]) ShutDown() { +func (q clusterAwareWorkqueue[cluster, request]) ShutDown() { q.q.ShutDown() } -func (q clusterAwareWorkqueue[request]) ShutDownWithDrain() { +func (q clusterAwareWorkqueue[cluster, request]) ShutDownWithDrain() { q.q.ShutDownWithDrain() } -func (q clusterAwareWorkqueue[request]) ShuttingDown() bool { +func (q clusterAwareWorkqueue[cluster, request]) ShuttingDown() bool { return q.q.ShuttingDown() } diff --git a/pkg/builder/multicluster_options.go b/pkg/builder/multicluster_options.go index b328e4e..f785e7c 100644 --- a/pkg/builder/multicluster_options.go +++ b/pkg/builder/multicluster_options.go @@ -59,10 +59,10 @@ func (w EngageOptions) ApplyToWatches(opts untypedWatchesInput) { } } -func (w *WatchesInput[request]) setEngageWithLocalCluster(engage bool) { +func (w *WatchesInput[cluster, request]) setEngageWithLocalCluster(engage bool) { w.engageWithLocalCluster = &engage } -func (w *WatchesInput[request]) setEngageWithProviderClusters(engage bool) { +func (w *WatchesInput[cluster, request]) setEngageWithProviderClusters(engage bool) { w.engageWithProviderClusters = &engage } diff --git a/pkg/context/cluster.go b/pkg/context/cluster.go index 06139d4..090edef 100644 --- a/pkg/context/cluster.go +++ b/pkg/context/cluster.go @@ -28,21 +28,21 @@ type clusterKeyType string const clusterKey clusterKeyType = "cluster" // WithCluster returns a new context with the given cluster. -func WithCluster(ctx context.Context, cluster string) context.Context { - return context.WithValue(ctx, clusterKey, cluster) +func WithCluster[cluster comparable](ctx context.Context, cl cluster) context.Context { + return context.WithValue(ctx, clusterKey, cl) } // ClusterFrom returns the cluster from the context. -func ClusterFrom(ctx context.Context) (string, bool) { - cluster, ok := ctx.Value(clusterKey).(string) - return cluster, ok +func ClusterFrom[cluster comparable](ctx context.Context) (cluster, bool) { + cl, ok := ctx.Value(clusterKey).(cluster) + return cl, ok } // ReconcilerWithClusterInContext returns a reconciler that sets the cluster name in the // context. func ReconcilerWithClusterInContext(r reconcile.Reconciler) mcreconcile.Reconciler { return reconcile.TypedFunc[mcreconcile.Request](func(ctx context.Context, req mcreconcile.Request) (reconcile.Result, error) { - ctx = WithCluster(ctx, req.ClusterName) + ctx = WithCluster(ctx, req.Cluster) return r.Reconcile(ctx, req.Request) }) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a1c0237..cdea472 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -23,7 +23,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" + ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/source" @@ -37,18 +37,18 @@ import ( // from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item. // Work typically is reads and writes Kubernetes objects to make the system state match the state specified // in the object Spec. -type Controller = TypedController[mcreconcile.Request] +type Controller = TypedController[string, mcreconcile.Request] // Options are the arguments for creating a new Controller. type Options = controller.TypedOptions[mcreconcile.Request] // TypedController implements an API. -type TypedController[request comparable] interface { +type TypedController[cluster, request comparable] interface { controller.TypedController[request] - multicluster.Aware + multicluster.Aware[cluster] // MultiClusterWatch watches the provided Source. - MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error + MultiClusterWatch(src mcsource.TypedSource[client.Object, cluster, request]) error } // New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have @@ -56,14 +56,14 @@ type TypedController[request comparable] interface { // // The name must be unique as it is used to identify the controller in metrics and logs. func New(name string, mgr mcmanager.Manager, options Options) (Controller, error) { - return NewTyped(name, mgr, options) + return NewTyped[string, mcreconcile.Request](name, mgr, options) } // NewTyped returns a new typed controller registered with the Manager, // // The name must be unique as it is used to identify the controller in metrics and logs. -func NewTyped[request comparable](name string, mgr mcmanager.Manager, options controller.TypedOptions[request]) (TypedController[request], error) { - c, err := NewTypedUnmanaged(name, mgr, options) +func NewTyped[cluster, request comparable](name string, mgr mcmanager.TypedManager[cluster], options controller.TypedOptions[request]) (TypedController[cluster, request], error) { + c, err := NewTypedUnmanaged[cluster, request](name, mgr, options) if err != nil { return nil, err } @@ -77,85 +77,85 @@ func NewTyped[request comparable](name string, mgr mcmanager.Manager, options co // // The name must be unique as it is used to identify the controller in metrics and logs. func NewUnmanaged(name string, mgr mcmanager.Manager, options Options) (Controller, error) { - return NewTypedUnmanaged[mcreconcile.Request](name, mgr, options) + return NewTypedUnmanaged[string, mcreconcile.Request](name, mgr, options) } // NewTypedUnmanaged returns a new typed controller without adding it to the manager. // // The name must be unique as it is used to identify the controller in metrics and logs. -func NewTypedUnmanaged[request comparable](name string, mgr mcmanager.Manager, options controller.TypedOptions[request]) (TypedController[request], error) { +func NewTypedUnmanaged[cluster, request comparable](name string, mgr mcmanager.TypedManager[cluster], options controller.TypedOptions[request]) (TypedController[cluster, request], error) { c, err := controller.NewTypedUnmanaged[request](name, mgr.GetLocalManager(), options) if err != nil { return nil, err } - return &mcController[request]{ + return &mcController[cluster, request]{ TypedController: c, - clusters: make(map[string]engagedCluster), + clusters: make(map[cluster]engagedCluster[cluster]), }, nil } -var _ TypedController[mcreconcile.Request] = &mcController[mcreconcile.Request]{} +var _ TypedController[string, mcreconcile.Request] = &mcController[string, mcreconcile.Request]{} -type mcController[request comparable] struct { +type mcController[cluster, request comparable] struct { controller.TypedController[request] lock sync.Mutex - clusters map[string]engagedCluster - sources []mcsource.TypedSource[client.Object, request] + clusters map[cluster]engagedCluster[cluster] + sources []mcsource.TypedSource[client.Object, cluster, request] } -type engagedCluster struct { - name string - cluster cluster.Cluster +type engagedCluster[cluster comparable] struct { + clRef cluster + cluster ctrlcluster.Cluster } -func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error { +func (c *mcController[cluster, request]) Engage(ctx context.Context, clRef cluster, cl ctrlcluster.Cluster) error { c.lock.Lock() defer c.lock.Unlock() - if old, ok := c.clusters[name]; ok && old.cluster == cl { + if old, ok := c.clusters[clRef]; ok && old.cluster == cl { return nil } ctx, cancel := context.WithCancel(ctx) // pass through in case the controller itself is cluster aware - if ctrl, ok := c.TypedController.(multicluster.Aware); ok { - if err := ctrl.Engage(ctx, name, cl); err != nil { + if ctrl, ok := c.TypedController.(multicluster.Aware[cluster]); ok { + if err := ctrl.Engage(ctx, clRef, cl); err != nil { return err } } // engage cluster aware instances for _, aware := range c.sources { - src, err := aware.ForCluster(name, cl) + src, err := aware.ForCluster(clRef, cl) if err != nil { cancel() - return fmt.Errorf("failed to engage for cluster %q: %w", name, err) + return fmt.Errorf("failed to engage for cluster %q: %w", clRef, err) } if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil { cancel() - return fmt.Errorf("failed to watch for cluster %q: %w", name, err) + return fmt.Errorf("failed to watch for cluster %q: %w", clRef, err) } } - ec := engagedCluster{ - name: name, + ec := engagedCluster[cluster]{ + clRef: clRef, cluster: cl, } - c.clusters[name] = ec + c.clusters[clRef] = ec go func() { c.lock.Lock() defer c.lock.Unlock() - if c.clusters[name] == ec { - delete(c.clusters, name) + if c.clusters[clRef] == ec { + delete(c.clusters, clRef) } }() return nil } -func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error { +func (c *mcController[cluster, request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, cluster, request]) error { c.lock.Lock() defer c.lock.Unlock() diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index 2db49fe..98a038f 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -32,52 +32,52 @@ var _ handler.TypedEventHandler[client.Object, mcreconcile.Request] = &EnqueueRe // EnqueueRequestForObject enqueues a reconcile.Request containing the Name, Namespace and ClusterName of the object that is the source of the Event. // (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject should be used by multi-cluster // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. -type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object] +type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object, string] -// TypedEnqueueRequestForObject enqueues a reconcile.Request containing the Name, Namespace and ClusterName of the object that is the source of the Event. +// TypedEnqueueRequestForObject enqueues a reconcile.Request containing the Name, Namespace and Cluster of the object that is the source of the Event. // (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueRequestForObject should be used by multi-cluster // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. // // TypedEnqueueRequestForObject is experimental and subject to future change. -type TypedEnqueueRequestForObject[object client.Object] struct { - ClusterName string +type TypedEnqueueRequestForObject[object client.Object, cluster comparable] struct { + Cluster cluster } // Create implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +func (e *TypedEnqueueRequestForObject[T, cluster]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.TypedRequest[cluster]]) { if isNil(evt.Object) { log.FromContext(ctx).WithName("eventhandler").WithName("EnqueueRequestForObject"). Error(nil, "CreateEvent received with no metadata", "event", evt) return } - q.Add(mcreconcile.Request{ + q.Add(mcreconcile.TypedRequest[cluster]{ Request: reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}, - ClusterName: e.ClusterName, + Cluster: e.Cluster, }) } // Update implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +func (e *TypedEnqueueRequestForObject[T, cluster]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.TypedRequest[cluster]]) { switch { case !isNil(evt.ObjectNew): - q.Add(mcreconcile.Request{ + q.Add(mcreconcile.TypedRequest[cluster]{ Request: reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), }}, - ClusterName: e.ClusterName, + Cluster: e.Cluster, }) case !isNil(evt.ObjectOld): q.Add( - mcreconcile.Request{ + mcreconcile.TypedRequest[cluster]{ Request: reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), }}, - ClusterName: e.ClusterName, + Cluster: e.Cluster, }) default: log.FromContext(ctx).WithName("eventhandler").WithName("EnqueueRequestForObject"). @@ -86,34 +86,34 @@ func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event. } // Delete implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +func (e *TypedEnqueueRequestForObject[T, cluster]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.TypedRequest[cluster]]) { if isNil(evt.Object) { log.FromContext(ctx).WithName("eventhandler").WithName("EnqueueRequestForObject"). Error(nil, "DeleteEvent received with no metadata", "event", evt) return } - q.Add(mcreconcile.Request{ + q.Add(mcreconcile.TypedRequest[cluster]{ Request: reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}, - ClusterName: e.ClusterName, + Cluster: e.Cluster, }) } // Generic implements EventHandler. -func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { +func (e *TypedEnqueueRequestForObject[T, cluster]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[mcreconcile.TypedRequest[cluster]]) { if isNil(evt.Object) { log.FromContext(ctx).WithName("eventhandler").WithName("EnqueueRequestForObject"). Error(nil, "GenericEvent received with no metadata", "event", evt) return } - q.Add(mcreconcile.Request{ + q.Add(mcreconcile.TypedRequest[cluster]{ Request: reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}, - ClusterName: e.ClusterName, + Cluster: e.Cluster, }) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5e633bb..7b20502 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cluster" + ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -41,7 +41,7 @@ var _ manager.Manager = &probe{} type probe struct { Manager - cluster.Cluster + ctrlcluster.Cluster } // Add adds a runnable. @@ -54,14 +54,17 @@ func (p *probe) Start(_ context.Context) error { return nil } -// Manager is a multi-cluster-aware manager, like the controller-runtime Cluster, +// Manager is a TypedManager with a string as the cluster type. +type Manager = TypedManager[string] + +// TypedManager is a multi-cluster-aware manager, like the controller-runtime Cluster, // but without the direct cluster.Cluster methods. -type Manager interface { +type TypedManager[cluster comparable] interface { // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). - Add(Runnable) error + Add(Runnable[cluster]) error // Elected is closed when this manager is elected leader of a group of // managers, either because it won a leader election or because no leader @@ -104,49 +107,59 @@ type Manager interface { // returns an existing cluster if it has been created before. // If no cluster is known to the provider under the given cluster name, // an error should be returned. - GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error) + GetCluster(ctx context.Context, cl cluster) (ctrlcluster.Cluster, error) // ClusterFromContext returns the default cluster set in the context. - ClusterFromContext(ctx context.Context) (cluster.Cluster, error) + ClusterFromContext(ctx context.Context) (ctrlcluster.Cluster, error) // GetLocalManager returns the underlying controller-runtime manager of the host. GetLocalManager() manager.Manager // GetProvider returns the multicluster provider, or nil if it is not set. - GetProvider() multicluster.Provider + GetProvider() multicluster.TypedProvider[cluster] - multicluster.Aware + multicluster.Aware[cluster] } // Runnable allows a component to be started. // It's very important that Start blocks until // it's done running. -type Runnable interface { +type Runnable[cluster comparable] interface { manager.Runnable - multicluster.Aware + multicluster.Aware[cluster] } -var _ Manager = &mcManager{} +var _ Manager = &mcManager[string]{} -type mcManager struct { +type mcManager[cluster comparable] struct { manager.Manager - provider multicluster.Provider + provider multicluster.TypedProvider[cluster] - mcRunnables []multicluster.Aware + mcRunnables []multicluster.Aware[cluster] } // New returns a new Manager for creating Controllers. func New(config *rest.Config, provider multicluster.Provider, opts manager.Options) (Manager, error) { + return NewTyped[string](config, provider, opts) +} + +// WithMultiCluster wraps a host manager to run multi-cluster controllers. +func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) { + return TypedWithMultiCluster[string](mgr, provider) +} + +// NewTyped returns a new Manager for creating Controllers. +func NewTyped[cluster comparable](config *rest.Config, provider multicluster.TypedProvider[cluster], opts manager.Options) (TypedManager[cluster], error) { mgr, err := manager.New(config, opts) if err != nil { return nil, err } - return WithMultiCluster(mgr, provider) + return TypedWithMultiCluster[cluster](mgr, provider) } -// WithMultiCluster wraps a host manager to run multi-cluster controllers. -func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Manager, error) { - return &mcManager{ +// TypedWithMultiCluster wraps a host manager to run multi-cluster controllers. +func TypedWithMultiCluster[cluster comparable](mgr manager.Manager, provider multicluster.TypedProvider[cluster]) (TypedManager[cluster], error) { + return &mcManager[cluster]{ Manager: mgr, provider: provider, }, nil @@ -156,38 +169,39 @@ func WithMultiCluster(mgr manager.Manager, provider multicluster.Provider) (Mana // returns an existing cluster if it has been created before. // If no cluster is known to the provider under the given cluster name, // an error should be returned. -func (m *mcManager) GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error) { - if clusterName == "" { +func (m *mcManager[cluster]) GetCluster(ctx context.Context, cl cluster) (ctrlcluster.Cluster, error) { + var zero cluster + if cl == zero { return m.Manager, nil } if m.provider == nil { - return nil, fmt.Errorf("no multicluster provider set, but cluster %q passed", clusterName) + return nil, fmt.Errorf("no multicluster provider set, but cluster %q passed", cl) } - return m.provider.Get(ctx, clusterName) + return m.provider.Get(ctx, cl) } // ClusterFromContext returns the default cluster set in the context. -func (m *mcManager) ClusterFromContext(ctx context.Context) (cluster.Cluster, error) { - clusterName, ok := mccontext.ClusterFrom(ctx) +func (m *mcManager[cluster]) ClusterFromContext(ctx context.Context) (ctrlcluster.Cluster, error) { + cl, ok := mccontext.ClusterFrom[cluster](ctx) if !ok { return nil, fmt.Errorf("no cluster set in context, use ReconcilerWithCluster helper when building the controller") } - return m.GetCluster(ctx, clusterName) + return m.GetCluster(ctx, cl) } // GetLocalManager returns the underlying controller-runtime manager of the host. -func (m *mcManager) GetLocalManager() manager.Manager { +func (m *mcManager[cluster]) GetLocalManager() manager.Manager { return m.Manager } // GetProvider returns the multicluster provider, or nil if it is not set. -func (m *mcManager) GetProvider() multicluster.Provider { +func (m *mcManager[cluster]) GetProvider() multicluster.TypedProvider[cluster] { return m.provider } // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. -func (m *mcManager) Add(r Runnable) (err error) { +func (m *mcManager[cluster]) Add(r Runnable[cluster]) (err error) { m.mcRunnables = append(m.mcRunnables, r) defer func() { if err != nil { @@ -200,12 +214,12 @@ func (m *mcManager) Add(r Runnable) (err error) { // Engage gets called when the component should start operations for the given // Cluster. ctx is cancelled when the cluster is disengaged. -func (m *mcManager) Engage(ctx context.Context, name string, cl cluster.Cluster) error { +func (m *mcManager[cluster]) Engage(ctx context.Context, clRef cluster, cl ctrlcluster.Cluster) error { ctx, cancel := context.WithCancel(ctx) for _, r := range m.mcRunnables { - if err := r.Engage(ctx, name, cl); err != nil { + if err := r.Engage(ctx, clRef, cl); err != nil { cancel() - return fmt.Errorf("failed to engage cluster %q: %w", name, err) + return fmt.Errorf("failed to engage cluster %q: %w", clRef, err) } } return nil diff --git a/pkg/multicluster/multicluster.go b/pkg/multicluster/multicluster.go index e616b71..64641e9 100644 --- a/pkg/multicluster/multicluster.go +++ b/pkg/multicluster/multicluster.go @@ -19,12 +19,12 @@ package multicluster import ( "context" - "sigs.k8s.io/controller-runtime/pkg/cluster" + ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster" ) // Aware is an interface that can be implemented by components that // can engage and disengage when clusters are added or removed at runtime. -type Aware interface { +type Aware[cluster comparable] interface { // Engage gets called when the component should start operations for the given Cluster. // The given context is tied to the Cluster's lifecycle and will be cancelled when the // Cluster is removed or an error occurs. @@ -38,10 +38,13 @@ type Aware interface { // __||____/ /_ // |___ \ // `--------' - Engage(context.Context, string, cluster.Cluster) error + Engage(context.Context, cluster, ctrlcluster.Cluster) error } -// Provider allows to retrieve clusters by name. The provider is responsible for discovering +// Provider is the TypedProvider with string as the cluster type. +type Provider = TypedProvider[string] + +// TypedProvider allows to retrieve clusters by name. The provider is responsible for discovering // and managing the lifecycle of each cluster, calling `Engage` on the manager // it is run against whenever a new cluster is discovered and cancelling the // context used on engage when a cluster is unregistered. @@ -49,10 +52,10 @@ type Aware interface { // Example: A Cluster API provider would be responsible for discovering and // managing clusters that are backed by Cluster API resources, which can live // in multiple namespaces in a single management cluster. -type Provider interface { +type TypedProvider[cluster comparable] interface { // Get returns a cluster for the given identifying cluster name. Get // returns an existing cluster if it has been created before. // If no cluster is known to the provider under the given cluster name, // an error should be returned. - Get(ctx context.Context, clusterName string) (cluster.Cluster, error) + Get(ctx context.Context, cl cluster) (ctrlcluster.Cluster, error) } diff --git a/pkg/reconcile/request.go b/pkg/reconcile/request.go index b6ca33d..1dccbd5 100644 --- a/pkg/reconcile/request.go +++ b/pkg/reconcile/request.go @@ -17,7 +17,8 @@ limitations under the License. package reconcile import ( - "k8s.io/apimachinery/pkg/types" + "fmt" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -27,41 +28,49 @@ type Reconciler = reconcile.TypedReconciler[Request] // Func is a function that implements the reconcile interface. type Func = reconcile.TypedFunc[Request] -// Request contains the information necessary to reconcile a Kubernetes object. This includes the -// information to uniquely identify the object - its Name and Namespace. It does NOT contain information about -// any specific Event or the object contents itself. -type Request struct { +// Request contains the information necessary to reconcile a Kubernetes object. +// This includes the information to uniquely identify the object - its Name and +// Namespace. It does NOT contain information about any specific Event or the +// object contents itself. +type Request = TypedRequest[string] + +// TypedRequest contains the information necessary to reconcile a Kubernetes object. +// This includes the information to uniquely identify the object - its Name and +// Namespace. It does NOT contain information about any specific Event or the +// object contents itself. +type TypedRequest[cluster comparable] struct { reconcile.Request - // ClusterName is the name of the cluster that the request belongs to. - ClusterName string + // Cluster is the name of the cluster that the request belongs to. + Cluster cluster } // ClusterAware is an interface for cluster-aware requests. -type ClusterAware[T any] interface { +type ClusterAware[cluster comparable, T any] interface { comparable - // Cluster returns the name of the cluster that the request belongs to. - Cluster() string + // GetCluster returns the name of the cluster that the request belongs to. + GetCluster() cluster // WithCluster sets the name of the cluster that the request belongs to. - WithCluster(string) T + WithCluster(cluster) T } // String returns the general purpose string representation. -func (r Request) String() string { - if r.ClusterName == "" { +func (r TypedRequest[cluster]) String() string { + var zero cluster + if r.Cluster == zero { return r.Request.String() } - return "cluster://" + r.ClusterName + string(types.Separator) + r.Request.String() + return fmt.Sprintf("cluster://%s/%s", r.Cluster, r.Request) } -// Cluster returns the name of the cluster that the request belongs to. -func (r Request) Cluster() string { - return r.ClusterName +// GetCluster returns the name of the cluster that the request belongs to. +func (r TypedRequest[cluster]) GetCluster() cluster { + return r.Cluster } // WithCluster sets the name of the cluster that the request belongs to. -func (r Request) WithCluster(name string) Request { - r.ClusterName = name +func (r TypedRequest[cluster]) WithCluster(cl cluster) TypedRequest[cluster] { + r.Cluster = cl return r } diff --git a/pkg/source/source.go b/pkg/source/source.go index 1cd1d17..22eb1b0 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -18,7 +18,7 @@ package source import ( "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/cluster" + ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" @@ -34,7 +34,7 @@ import ( // * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). // // Users may build their own Source implementations. -type Source = TypedSource[client.Object, mcreconcile.Request] +type Source = TypedSource[client.Object, string, mcreconcile.Request] // TypedSource is a generic source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) // which should be processed by event.EventHandlers to enqueue a request. @@ -44,50 +44,50 @@ type Source = TypedSource[client.Object, mcreconcile.Request] // * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). // // Users may build their own Source implementations. -type TypedSource[object client.Object, request comparable] interface { - ForCluster(string, cluster.Cluster) (source.TypedSource[request], error) +type TypedSource[object client.Object, cluster, request comparable] interface { + ForCluster(cluster, ctrlcluster.Cluster) (source.TypedSource[request], error) } // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. -type SyncingSource[object client.Object] TypedSyncingSource[object, mcreconcile.Request] +type SyncingSource[object client.Object] TypedSyncingSource[object, string, mcreconcile.Request] // TypedSyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. -type TypedSyncingSource[object client.Object, request comparable] interface { - TypedSource[object, request] - SyncingForCluster(string, cluster.Cluster) (source.TypedSyncingSource[request], error) - WithProjection(func(cluster.Cluster, object) (object, error)) TypedSyncingSource[object, request] +type TypedSyncingSource[object client.Object, cluster, request comparable] interface { + TypedSource[object, cluster, request] + SyncingForCluster(cluster, ctrlcluster.Cluster) (source.TypedSyncingSource[request], error) + WithProjection(func(ctrlcluster.Cluster, object) (object, error)) TypedSyncingSource[object, cluster, request] } // Kind creates a KindSource with the given cache provider. func Kind[object client.Object]( obj object, - handler func(string, cluster.Cluster) handler.TypedEventHandler[object, mcreconcile.Request], + handler func(string, ctrlcluster.Cluster) handler.TypedEventHandler[object, mcreconcile.Request], predicates ...predicate.TypedPredicate[object], ) SyncingSource[object] { - return TypedKind[object, mcreconcile.Request](obj, handler, predicates...) + return TypedKind[object, string, mcreconcile.Request](obj, handler, predicates...) } // TypedKind creates a KindSource with the given cache provider. -func TypedKind[object client.Object, request mcreconcile.ClusterAware[request]]( +func TypedKind[object client.Object, cluster comparable, request mcreconcile.ClusterAware[cluster, request]]( obj object, - handler func(string, cluster.Cluster) handler.TypedEventHandler[object, request], + handler func(cluster, ctrlcluster.Cluster) handler.TypedEventHandler[object, request], predicates ...predicate.TypedPredicate[object], -) TypedSyncingSource[object, request] { - return &kind[object, request]{ +) TypedSyncingSource[object, cluster, request] { + return &kind[object, cluster, request]{ obj: obj, handler: handler, predicates: predicates, - project: func(_ cluster.Cluster, obj object) (object, error) { return obj, nil }, + project: func(_ ctrlcluster.Cluster, obj object) (object, error) { return obj, nil }, } } -type kind[object client.Object, request comparable] struct { +type kind[object client.Object, cluster comparable, request comparable] struct { obj object - handler func(name string, cl cluster.Cluster) handler.TypedEventHandler[object, request] + handler func(name cluster, cl ctrlcluster.Cluster) handler.TypedEventHandler[object, request] predicates []predicate.TypedPredicate[object] - project func(cluster.Cluster, object) (object, error) + project func(ctrlcluster.Cluster, object) (object, error) } type clusterKind[object client.Object, request comparable] struct { @@ -95,27 +95,27 @@ type clusterKind[object client.Object, request comparable] struct { } // WithProjection sets the projection function for the KindSource. -func (k *kind[object, request]) WithProjection(project func(cluster.Cluster, object) (object, error)) TypedSyncingSource[object, request] { +func (k *kind[object, cluster, request]) WithProjection(project func(ctrlcluster.Cluster, object) (object, error)) TypedSyncingSource[object, cluster, request] { k.project = project return k } -func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (source.TypedSource[request], error) { +func (k *kind[object, cluster, request]) ForCluster(clRef cluster, cl ctrlcluster.Cluster) (source.TypedSource[request], error) { obj, err := k.project(cl, k.obj) if err != nil { return nil, err } return &clusterKind[object, request]{ - TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(name, cl), k.predicates...), + TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(clRef, cl), k.predicates...), }, nil } -func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (source.TypedSyncingSource[request], error) { +func (k *kind[object, cluster, request]) SyncingForCluster(clRef cluster, cl ctrlcluster.Cluster) (source.TypedSyncingSource[request], error) { obj, err := k.project(cl, k.obj) if err != nil { return nil, err } return &clusterKind[object, request]{ - TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(name, cl), k.predicates...), + TypedSyncingSource: source.TypedKind(cl.GetCache(), obj, k.handler(clRef, cl), k.predicates...), }, nil } diff --git a/providers/kind/provider.go b/providers/kind/provider.go index 5ef34df..6ccc365 100644 --- a/providers/kind/provider.go +++ b/providers/kind/provider.go @@ -24,6 +24,7 @@ import ( "time" "github.com/go-logr/logr" + mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager" "github.com/multicluster-runtime/multicluster-runtime/pkg/multicluster" "k8s.io/apimachinery/pkg/util/sets" diff --git a/providers/namespace/provider.go b/providers/namespace/provider.go index b3ef306..dde94a3 100644 --- a/providers/namespace/provider.go +++ b/providers/namespace/provider.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/go-logr/logr" + mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager" corev1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime"