diff --git a/ATTRIBUTION.md b/ATTRIBUTION.md index 8587585a8..dad4b5c05 100644 --- a/ATTRIBUTION.md +++ b/ATTRIBUTION.md @@ -32,6 +32,7 @@ License version 2.0, we include the full text of the package's License below. * `github.com/stretchr/testify` * `go.uber.org/zap` * `golang.org/x/exp` +* `golang.org/x/sync` * `golang.org/x/time` * `google.golang.org/genproto/googleapis/api` * `k8s.io/api` @@ -1453,6 +1454,38 @@ THE SOFTWARE. +### golang.org/x/sync + +License Identifier: BSD-3-Clause + +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ### golang.org/x/time License Identifier: BSD-3-Clause @@ -4027,4 +4060,56 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +### sigs.k8s.io/yaml + +License Identifier: Apache-2.0 + +Subdependencies: +* `github.com/google/go-cmp` +* `go.yaml.in/yaml/v2` +* `go.yaml.in/yaml/v3` +* `sigs.k8s.io/randfill` + +#### github.com/google/go-cmp + +License Identifier: BSD-3-Clause + +Copyright (c) 2017 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#### go.yaml.in/yaml/v2 + +License Identifier: Apache-2.0 + +#### go.yaml.in/yaml/v3 +License Identifier: Apache-2.0 + +#### sigs.k8s.io/randfill + +License Identifier: Apache-2.0 diff --git a/go.mod b/go.mod index 952593ca9..82413af80 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/sync v0.12.0 golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/api v0.0.0-20240826202546-f6391c0de4c7 k8s.io/api v0.31.0 @@ -26,7 +27,7 @@ require ( k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.19.0 sigs.k8s.io/release-utils v0.11.0 - sigs.k8s.io/yaml v1.4.0 + sigs.k8s.io/yaml v1.6.0 ) require ( @@ -86,6 +87,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/oauth2 v0.28.0 // indirect diff --git a/go.sum b/go.sum index ec6aaafc2..f7a3b0cd9 100644 --- a/go.sum +++ b/go.sum @@ -299,6 +299,10 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE= +go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -398,6 +402,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -659,5 +665,5 @@ sigs.k8s.io/release-utils v0.11.0 h1:FUVSw2dO67M7mfcQx9AITEGnTHoBOdJNbbQ3FT3o8mA sigs.k8s.io/release-utils v0.11.0/go.mod h1:wAlXz8xruzvqZUsorI64dZ3lbkiDnYSlI4IYC6l2yEA= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= -sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= -sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= +sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/pkg/applyset/applyset.go b/pkg/applyset/applyset.go new file mode 100644 index 000000000..16bb5b351 --- /dev/null +++ b/pkg/applyset/applyset.go @@ -0,0 +1,616 @@ +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Applylib is inspired from: +// * kubectl pkg/cmd/apply/applyset.go +// * kubebuilder-declarative-pattern/applylib +// * Creating a simpler, self-contained version of the library that is purpose built for controllers. +// * KEP describing applyset: +// https://git.k8s.io/enhancements/keps/sig-cli/3659-kubectl-apply-prune#design-details-applyset-specification + +package applyset + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "fmt" + "reflect" + "sort" + "strings" + + "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/dynamic" +) + +type ToolingID struct { + Name string + Version string +} + +func (t ToolingID) String() string { + return fmt.Sprintf("%s/%s", t.Name, t.Version) +} + +/* +The Set interface provides methods for: + - Add - Add an object to the set + - Apply - Apply objects in the set to the cluster along with optional pruning + - DryRun - Dry run calls the kubernetes API with dryrun flag set to true. + No actual resources are created or pruned. + +Add() is used to add object to the apply Set. +It takes unstructured object. +It does a get from the cluster to note the resource-version before apply. + +Apply() method applies the objects in the set to a Kubernetes cluster. If +the prune parameter is true, any objects that were previously applied but are +no longer in the set will be deleted from the cluster. + +DryRun() method can be used to see what changes would be made without actually making them. + +Example Usage: + + // Create an ApplySet + // aset, err := applyset.New(parent, restMapper, dynamicClient, applySetConfig) + + // Add a ConfigMap to the ApplySet + // configMap := &unstructured.Unstructured{ ... } + err = aset.Add(context.TODO(), applyset.ApplyableObject{ + Unstructured: configMap, + ID: "my-config-map", // optional + }) + if err != nil { + log.Fatalf("Failed to add object to ApplySet: %v", err) + } + + // Apply the changes to the cluster (or dry-run) + // To apply: + result, err := aset.Apply(context.TODO(), true) // true to enable pruning + // or dry-run: + // result, err := aset.DryRun(context.TODO(), true) + if err != nil { + log.Fatalf("Failed to apply/dry-run ApplySet: %v", err) + } + + if result.Errors() != nil { + fmt.Printf("ApplySet completed with errors: %v\n", result.Errors()) + } else { + fmt.Println("ApplySet completed successfully (or dry-run successful).") + } +*/ +type Set interface { + Add(ctx context.Context, obj ApplyableObject) (*unstructured.Unstructured, error) + Apply(ctx context.Context, prune bool) (*ApplyResult, error) + DryRun(ctx context.Context, prune bool) (*ApplyResult, error) +} + +type Config struct { + // ToolLabels can be used to inject labels into all resources managed by applyset + ToolLabels map[string]string + + // Provide an identifier which is used as field manager for server side apply + // https://kubernetes.io/docs/reference/using-api/server-side-apply/#managers + FieldManager string + + // concats the name and version and adds it as an annotatiuon + // https://kubernetes.io/docs/reference/using-api/server-side-apply/#managers + ToolingID ToolingID + + // Log is used to inject the calling reconciler's logger + Log logr.Logger +} + +/* +New creates a new ApplySet +parent object is expected to be the current one existing in the cluster +Use New() to create an apply Set. This function takes the parent object, +a RESTMapper, a dynamic client, and a configuration object. The parent +object is the object that "owns" the set of resources being applied. + +Example usage: + + // Set up Kubernetes client and RESTMapper + // dynamicClient = ... + // restMapper = ... + + // Define a parent. The parent should exist in the cluster + // Can be any Kubernetes resource even a custom resource instance + parent := &unstructured.Unstructured{ Object: map[string]interface{}{} } } + parent.SetGroupVersionKind(schema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "MyCustomResource"}) + parent.SetName("somename") + + // ApplySet configuration + applySetConfig := applyset.Config{ + ToolingID: applyset.ToolingID{Name: "my-controller", Version: "v1.0.0"}, + FieldManager: "my-controller", + Log: logr.Discard(), // Use a real logger in production + } + + // Create an ApplySet + aset, err := applyset.New(parent, restMapper, dynamicClient, applySetConfig) + // if err != nil { ... } +*/ +func New( + parent *unstructured.Unstructured, + restMapper meta.RESTMapper, + dynamicClient dynamic.Interface, + config Config, +) (Set, error) { + if config.ToolingID == (ToolingID{}) { + return nil, fmt.Errorf("toolingID is required") + } + if config.FieldManager == "" { + return nil, fmt.Errorf("fieldManager is required") + } + parentMetaObject, err := meta.Accessor(parent) + if err != nil { + return nil, fmt.Errorf("error getting parent metadata: %w", err) + } + parentPartialMetaObject := meta.AsPartialObjectMetadata(parentMetaObject) + parentPartialMetaObject.SetGroupVersionKind(parent.GroupVersionKind()) + + aset := &applySet{ + parent: parentPartialMetaObject, + toolingID: config.ToolingID, + toolLabels: config.ToolLabels, + fieldManager: config.FieldManager, + desired: NewTracker(), + desiredRESTMappings: make(map[schema.GroupKind]*meta.RESTMapping), + desiredNamespaces: sets.Set[string]{}, + supersetNamespaces: sets.Set[string]{}, + supersetGKs: sets.Set[string]{}, + clientSet: clientSet{ + restMapper: restMapper, + dynamicClient: dynamicClient, + }, + serverOptions: serverOptions{ + applyOptions: metav1.ApplyOptions{ + FieldManager: config.FieldManager, + Force: true, + }, + //deleteOptions: metav1.DeleteOptions{}, + }, + log: config.Log, + } + + gvk := parent.GroupVersionKind() + gk := gvk.GroupKind() + restMapping, err := aset.restMapper.RESTMapping(gk, gvk.Version) + if err != nil { + return nil, fmt.Errorf("error getting rest mapping for parent kind %v: %w", gvk, err) + } + if restMapping.Scope.Name() == meta.RESTScopeNameNamespace { + aset.parentClient = aset.dynamicClient.Resource(restMapping.Resource).Namespace(parent.GetNamespace()) + } else { + aset.parentClient = aset.dynamicClient.Resource(restMapping.Resource) + } + + return aset, nil +} + +type serverOptions struct { + // applyOptions holds the options used when applying, in particular the fieldManager + applyOptions metav1.ApplyOptions + + // deleteOptions holds the options used when pruning. + deleteOptions metav1.DeleteOptions +} + +type clientSet struct { + // dynamicClient is the dynamic kubernetes client used to apply objects to the k8s cluster. + dynamicClient dynamic.Interface + + // parentClient is the controller runtime client used to apply parent. + parentClient dynamic.ResourceInterface + // restMapper is used to map object kind to resources, and to know if objects are cluster-scoped. + restMapper meta.RESTMapper +} + +// ApplySet tracks the information about an applyset apply/prune +type applySet struct { + // Parent provides the necessary methods to determine a + // ApplySet parent object, which can be used to find out all the on-track + // deployment manifests. + parent *metav1.PartialObjectMetadata + + // toolingID is the value to be used and validated in the applyset.kubernetes.io/tooling annotation. + toolingID ToolingID + + // fieldManager is the name of the field manager that will be used to apply the resources. + fieldManager string + + // toolLabels is a map of tool provided labels to be applied to the resources + toolLabels map[string]string + + // current labels and annotations of the parent before the apply operation + currentLabels map[string]string + currentAnnotations map[string]string + + // set of applyset object rest mappings + desiredRESTMappings map[schema.GroupKind]*meta.RESTMapping + // set of applyset object namespaces + desiredNamespaces sets.Set[string] + + // superset of desired and old namespaces + supersetNamespaces sets.Set[string] + // superset of desired and old GKs + supersetGKs sets.Set[string] + + desired *tracker + clientSet + serverOptions + + log logr.Logger +} + +func (a *applySet) getAndRecordNamespace(obj ApplyableObject, restMapping *meta.RESTMapping) error { + // Ensure object namespace is correct for the scope + gvk := obj.GroupVersionKind() + switch restMapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + // If empty use the parent's namespace for the object. + namespace := obj.GetNamespace() + if namespace == "" { + namespace = a.parent.GetNamespace() + } + a.desiredNamespaces.Insert(namespace) + case meta.RESTScopeNameRoot: + if obj.GetNamespace() != "" { + return fmt.Errorf("namespace was provided for cluster-scoped object %v %v", gvk, obj.GetName()) + } + + default: + // Internal error + return fmt.Errorf("unknown scope for gvk %s: %q", gvk, restMapping.Scope.Name()) + } + return nil +} + +// getRESTMapping Fetch RESTMapping for the given object. +// It caches the mapping on the first get and returns it the next time. +func (a *applySet) getRestMapping(obj ApplyableObject) (*meta.RESTMapping, error) { + gvk := obj.GroupVersionKind() + gk := gvk.GroupKind() + // Ensure a rest mapping exists for the object + _, found := a.desiredRESTMappings[gk] + if !found { + restMapping, err := a.restMapper.RESTMapping(gk, gvk.Version) + if err != nil { + return nil, fmt.Errorf("error getting rest mapping for %v: %w", gvk, err) + } + if restMapping == nil { + return nil, fmt.Errorf("rest mapping not found for %v", gvk) + } + a.desiredRESTMappings[gk] = restMapping + } + + return a.desiredRESTMappings[gk], nil +} + +func (a *applySet) resourceClient(obj Applyable) (dynamic.ResourceInterface, error) { + restMapping, ok := a.desiredRESTMappings[obj.GroupVersionKind().GroupKind()] + if !ok { + // This should never happen, but if it does, we want to know about it. + return nil, fmt.Errorf("FATAL: rest mapping not found for %v", obj.GroupVersionKind()) + } + dynResource := a.dynamicClient.Resource(restMapping.Resource) + if restMapping.Scope.Name() == meta.RESTScopeNameNamespace { + ns := obj.GetNamespace() + if ns == "" { + ns = a.parent.GetNamespace() + } + if ns == "" { + ns = metav1.NamespaceDefault + } + return dynResource.Namespace(ns), nil + } + return dynResource, nil +} + +func (a *applySet) Add(ctx context.Context, obj ApplyableObject) (*unstructured.Unstructured, error) { + restMapping, err := a.getRestMapping(obj) + if err != nil { + return nil, err + } + if err := a.getAndRecordNamespace(obj, restMapping); err != nil { + return nil, err + } + obj.SetLabels(a.InjectApplysetLabels(a.injectToolLabels(obj.GetLabels()))) + + dynResource, err := a.resourceClient(obj) + if err != nil { + return nil, err + } + observed, err := dynResource.Get(ctx, + obj.GetName(), + metav1.GetOptions{}, + ) + if err != nil { + if apierrors.IsNotFound(err) { + observed = nil + } else { + return nil, fmt.Errorf("error getting object from cluster: %w", err) + } + } + if observed != nil { + // Record the last read revision of the object. + obj.lastReadRevision = observed.GetResourceVersion() + } + a.log.V(2).Info("adding object to applyset", "object", obj.String(), "cluster-revision", obj.lastReadRevision) + + if err := a.desired.Add(obj); err != nil { + return nil, err + } + return observed, nil +} + +// ID is the label value that we are using to identify this applyset. +// Format: base64(sha256(...)), using the URL safe encoding of RFC4648. +func (a *applySet) ID() string { + unencoded := strings.Join([]string{ + a.parent.GetName(), + a.parent.GetNamespace(), + a.parent.GroupVersionKind().Kind, + a.parent.GroupVersionKind().Group, + }, ApplySetIDPartDelimiter) + hashed := sha256.Sum256([]byte(unencoded)) + b64 := base64.RawURLEncoding.EncodeToString(hashed[:]) + // Label values must start and end with alphanumeric values, so add a known-safe prefix and suffix. + return fmt.Sprintf(V1ApplySetIdFormat, b64) +} + +func (a *applySet) injectToolLabels(labels map[string]string) map[string]string { + if labels == nil { + labels = make(map[string]string) + } + if a.toolLabels != nil { + for k, v := range a.toolLabels { + labels[k] = v + } + } + return labels +} + +func (a *applySet) InjectApplysetLabels(labels map[string]string) map[string]string { + if labels == nil { + labels = make(map[string]string) + } + labels[ApplysetPartOfLabel] = a.ID() + return labels +} + +type applySetUpdateMode string + +var updateToLatestSet applySetUpdateMode = "latest" +var updateToSuperset applySetUpdateMode = "superset" + +// updateParentLabelsAndAnnotations updates the parent labels and annotations. +func (a *applySet) updateParentLabelsAndAnnotations( + ctx context.Context, + mode applySetUpdateMode, +) (sets.Set[string], sets.Set[string], error) { + original, err := meta.Accessor(a.parent) + if err != nil { + return nil, nil, err + } + + // Generate and append the desired labels to the parent labels + desiredLabels := a.desiredParentLabels() + labels := a.parent.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + for k, v := range desiredLabels { + labels[k] = v + } + + // Get the desired annotations and append them to the parent + desiredAnnotations, returnNamespaces, returnGKs := a.desiredParentAnnotations(mode == updateToSuperset) + annotations := a.parent.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + for k, v := range desiredAnnotations { + annotations[k] = v + } + + options := metav1.ApplyOptions{ + FieldManager: a.fieldManager + "-parent-labeller", + Force: false, + } + + // Convert labels to map[string]interface{} for the unstructured object + labelsMap := make(map[string]interface{}) + for k, v := range labels { + labelsMap[k] = v + } + + // Convert annotations to map[string]interface{} for the unstructured object + annotationsMap := make(map[string]interface{}) + for k, v := range annotations { + annotationsMap[k] = v + } + + parentPatch := &unstructured.Unstructured{} + parentPatch.SetUnstructuredContent(map[string]interface{}{ + "apiVersion": a.parent.APIVersion, + "kind": a.parent.Kind, + "metadata": map[string]interface{}{ + "name": a.parent.GetName(), + "namespace": a.parent.GetNamespace(), + "labels": labelsMap, + "annotations": annotationsMap, + }, + }) + // update parent in the cluster. + if !reflect.DeepEqual(original.GetLabels(), parentPatch.GetLabels()) || + !reflect.DeepEqual(original.GetAnnotations(), parentPatch.GetAnnotations()) { + if _, err := a.parentClient.Apply(ctx, a.parent.GetName(), parentPatch, options); err != nil { + return nil, nil, fmt.Errorf("error updating parent %w", err) + } + a.log.V(2).Info("updated parent labels and annotations", "parent-name", a.parent.GetName(), + "parent-namespace", a.parent.GetNamespace(), + "parent-gvk", a.parent.GroupVersionKind(), + "parent-labels", desiredLabels, "parent-annotations", desiredAnnotations) + } + return returnNamespaces, returnGKs, nil +} + +func (a *applySet) desiredParentLabels() map[string]string { + labels := make(map[string]string) + labels[ApplySetParentIDLabel] = a.ID() + return labels +} + +// Return the annotations as well as the set of namespaces and GKs +func (a *applySet) desiredParentAnnotations( + includeCurrent bool, +) (map[string]string, sets.Set[string], sets.Set[string]) { + annotations := make(map[string]string) + annotations[ApplySetToolingAnnotation] = a.toolingID.String() + + // Generate sorted comma-separated list of GKs + gks := sets.Set[string]{} + for gk := range a.desiredRESTMappings { + gks.Insert(gk.String()) + } + if includeCurrent { + for _, gk := range strings.Split(a.currentAnnotations[ApplySetGKsAnnotation], ",") { + if gk == "" { + continue + } + gks.Insert(gk) + } + } + gksList := gks.UnsortedList() + sort.Strings(gksList) + annotations[ApplySetGKsAnnotation] = strings.Join(gksList, ",") + + // Generate sorted comma-separated list of namespaces + nss := a.desiredNamespaces.Clone() + if includeCurrent { + for _, ns := range strings.Split(a.currentAnnotations[ApplySetAdditionalNamespacesAnnotation], ",") { + if ns == "" { + continue + } + nss.Insert(ns) + } + } + nsList := nss.UnsortedList() + sort.Strings(nsList) + annotations[ApplySetAdditionalNamespacesAnnotation] = strings.Join(nsList, ",") + return annotations, nss, gks +} + +func (a *applySet) apply(ctx context.Context, dryRun bool) (*ApplyResult, error) { + results := &ApplyResult{DesiredCount: a.desired.Len()} + + // If dryRun is true, we will not update the parent labels and annotations. + if !dryRun { + parent, err := meta.Accessor(a.parent) + if err != nil { + return results, fmt.Errorf("unable to get parent: %w", err) + } + // Record the current labels and annotations + a.currentLabels = parent.GetLabels() + a.currentAnnotations = parent.GetAnnotations() + + // We will ensure the parent is updated with the latest applyset before applying the resources. + a.supersetNamespaces, a.supersetGKs, err = a.updateParentLabelsAndAnnotations(ctx, updateToSuperset) + if err != nil { + return results, fmt.Errorf("unable to update Parent: %w", err) + } + } + + options := a.applyOptions + if dryRun { + options.DryRun = []string{"All"} + } + for _, obj := range a.desired.objects { + + dynResource, err := a.resourceClient(obj) + if err != nil { + return results, err + } + lastApplied, err := dynResource.Apply(ctx, obj.GetName(), obj.Unstructured, options) + results.recordApplied(obj, lastApplied, err) + a.log.V(2).Info("applied object", "object", obj.String(), "applied-revision", lastApplied.GetResourceVersion(), + "error", err) + } + + return results, nil +} + +func (a *applySet) prune(ctx context.Context, results *ApplyResult, dryRun bool) (*ApplyResult, error) { + pruneObjects, err := a.findAllObjectsToPrune(ctx, a.dynamicClient, results.AppliedUIDs()) + if err != nil { + return results, err + } + options := a.deleteOptions + if dryRun { + options.DryRun = []string{"All"} + } + for i := range pruneObjects { + name := pruneObjects[i].GetName() + namespace := pruneObjects[i].GetNamespace() + mapping := pruneObjects[i].Mapping + + var err error + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + err = a.dynamicClient.Resource(mapping.Resource).Namespace(namespace).Delete(ctx, name, options) + } else { + err = a.dynamicClient.Resource(mapping.Resource).Delete(ctx, name, options) + } + results.recordPruned(pruneObjects[i], err) + a.log.V(2).Info("pruned object", "object", pruneObjects[i].String(), "error", err) + } + + if !dryRun { + // "latest" mode updates the parent "applyset.kubernetes.io/contains-group-resources" annotations + // to only contain the current manifest GVRs. + if _, _, err := a.updateParentLabelsAndAnnotations(ctx, updateToLatestSet); err != nil { + return results, fmt.Errorf("unable to update Parent: %w", err) + } + } + + return results, nil +} + +func (a *applySet) applyAndPrune(ctx context.Context, prune bool, dryRun bool) (*ApplyResult, error) { + results, err := a.apply(ctx, dryRun) + if err != nil { + return results, err + } + + if !prune { + return results, nil + } + + return a.prune(ctx, results, dryRun) +} + +func (a *applySet) Apply(ctx context.Context, prune bool) (*ApplyResult, error) { + return a.applyAndPrune(ctx, prune, false) +} + +func (a *applySet) DryRun(ctx context.Context, prune bool) (*ApplyResult, error) { + return a.applyAndPrune(ctx, prune, true) +} diff --git a/pkg/applyset/applyset_test.go b/pkg/applyset/applyset_test.go new file mode 100644 index 000000000..96f4f456b --- /dev/null +++ b/pkg/applyset/applyset_test.go @@ -0,0 +1,608 @@ +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package applyset + +import ( + "context" + "encoding/json" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic/fake" + k8stesting "k8s.io/client-go/testing" +) + +type gvkrl struct { + gvk schema.GroupVersionKind + resource string + listKind string +} + +var ( + testScheme = runtime.NewScheme() + + configMapGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + secretGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"} + fooGVK = schema.GroupVersionKind{Group: "foo.bar", Version: "v1", Kind: "Foo"} + + gvkrlList = []gvkrl{ + {gvk: configMapGVK, resource: "configmaps", listKind: "ConfigMapList"}, + {gvk: secretGVK, resource: "secrets", listKind: "SecretList"}, + {gvk: fooGVK, resource: "foos", listKind: "FooList"}, + } +) + +func init() { + testScheme.AddKnownTypes(configMapGVK.GroupVersion(), &unstructured.Unstructured{}) + testScheme.AddKnownTypes(secretGVK.GroupVersion(), &unstructured.Unstructured{}) +} + +func newTestApplySet( + t *testing.T, + parent *unstructured.Unstructured, + objs ...runtime.Object, +) (Set, *fake.FakeDynamicClient) { + allObjs := append([]runtime.Object{parent}, objs...) + gvrToListKind := map[schema.GroupVersionResource]string{} + defaultGroupVersions := []schema.GroupVersion{} + for _, gvkrl := range gvkrlList { + gvrToListKind[gvkrl.gvk.GroupVersion().WithResource(gvkrl.resource)] = gvkrl.listKind + defaultGroupVersions = append(defaultGroupVersions, gvkrl.gvk.GroupVersion()) + } + + restMapper := meta.NewDefaultRESTMapper(defaultGroupVersions) + for _, gvkrl := range gvkrlList { + restMapper.Add(gvkrl.gvk, meta.RESTScopeNamespace) + } + dynamicClient := fake.NewSimpleDynamicClientWithCustomListKinds(testScheme, gvrToListKind, allObjs...) + + config := Config{ + ToolingID: ToolingID{Name: "test", Version: "v1"}, + FieldManager: "test-manager", + Log: logr.Discard(), + } + + aset, err := New(parent, restMapper, dynamicClient, config) + assert.NoError(t, err) + return aset, dynamicClient +} + +func parentObj(gvk schema.GroupVersionKind, name string) *unstructured.Unstructured { + apiVersion, kind := gvk.ToAPIVersionAndKind() + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]interface{}{ + "name": name, + "namespace": "default", + "uid": "parent-uid", + }, + }, + } +} + +func configMap(name, ns string) ApplyableObject { + return ApplyableObject{ + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": name, + "namespace": ns, + "uid": name + ns + "-uid", + }, + "data": map[string]interface{}{ + "key": "value", + }, + }, + }, + ID: name + ns, + } +} + +func foo(name, ns string) ApplyableObject { + return ApplyableObject{ + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "foo.bar/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": name, + "namespace": ns, + "uid": name + ns + "-uid", + }, + }, + }, + ID: name + ns, + } +} + +func TestNew(t *testing.T) { + parent := parentObj(configMapGVK, "parent-cm") + + t.Run("valid config", func(t *testing.T) { + _, _ = newTestApplySet(t, parent) + }) + + t.Run("missing toolingID", func(t *testing.T) { + _, err := New(parent, nil, nil, Config{ + FieldManager: "test-manager", + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "toolingID is required") + }) + + t.Run("missing fieldManager", func(t *testing.T) { + _, err := New(parent, nil, nil, Config{ + ToolingID: ToolingID{Name: "test", Version: "v1"}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "fieldManager is required") + }) +} + +func TestApplySet_Add(t *testing.T) { + parent := parentObj(configMapGVK, "parent-cm") + + aset, _ := newTestApplySet(t, parent) + + _, err := aset.Add(context.Background(), configMap("test-cm", "default")) + assert.NoError(t, err) + + as := aset.(*applySet) + assert.Equal(t, 1, as.desired.Len()) + assert.Contains(t, as.desired.objects[0].GetLabels(), ApplysetPartOfLabel) + + assert.True(t, as.desiredNamespaces.Has("default")) + _, exists := as.desiredRESTMappings[configMapGVK.GroupKind()] + assert.True(t, exists) +} + +func TestApplySet_ID(t *testing.T) { + parent := parentObj(fooGVK, "test-foo") + parent.SetNamespace("test-ns") + + aset, _ := newTestApplySet(t, parent) + + // from: base64(sha256(...)) + // test-foo.test-ns.Foo.foo.bar + // sha256: 2dca1c8242de82132464575841648a37c74545785854614378933d2d408ddd2b + // base64: LcocgkLeghMkZFdYQWSKN8dFRXhYVGFDeJM9LUCd3Ss + // format: applyset-%s-v1 + expectedID := "applyset-f9Rk5tKHoB72oV1tU2iFKxDwL8MBZTjMHFQ8V9WNNlA-v1" + assert.Equal(t, expectedID, aset.(*applySet).ID()) +} + +func TestApplySet_Apply(t *testing.T) { + parent := parentObj(secretGVK, "parent-secret") + + aset, dynamicClient := newTestApplySet(t, parent) + _, err := aset.Add(context.Background(), configMap("test-cm", "default")) + assert.NoError(t, err) + + dynamicClient.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedCM unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedCM) + assert.NoError(t, err) + assert.NotNil(t, appliedCM) + assert.Equal(t, "test-cm", appliedCM.GetName()) + assert.Contains(t, appliedCM.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedCM, nil + }) + + // Reactor for parent update + dynamicClient.PrependReactor("patch", "secrets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var parentPatch unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &parentPatch) + assert.NoError(t, err) + assert.Equal(t, "parent-secret", parentPatch.GetName()) + assert.Contains(t, parentPatch.GetLabels(), ApplySetParentIDLabel) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetAdditionalNamespacesAnnotation) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetGKsAnnotation) + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetAdditionalNamespacesAnnotation], "default") + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetGKsAnnotation], "ConfigMap") + + return true, &parentPatch, nil + }) + + result, err := aset.Apply(context.Background(), false) + assert.NoError(t, err) + assert.NoError(t, result.Errors()) + assert.Equal(t, 1, result.DesiredCount) + assert.Len(t, result.AppliedObjects, 1) +} + +func TestApplySet_Prune(t *testing.T) { + parent := parentObj(secretGVK, "parent-secret") + + // This CM will be pruned + pruneCM := configMap("prune-cm", "default") + pruneCM.Unstructured.SetLabels(map[string]string{ + ApplysetPartOfLabel: "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", + }) + aset, dynamicClient := newTestApplySet(t, parent, pruneCM) + // Set the ApplysetPartOfLabel on the pruneCM to match the ID of the applyset + // This is needed because the applyset ID is dynamically generated based on the parent object. + // We need to ensure that the pruneCM has the correct label so it is discovered for pruning. + as := aset.(*applySet) + assert.Equal(t, "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", as.ID()) + + _, err := aset.Add(context.Background(), configMap("test-cm", "default")) + assert.NoError(t, err) + + dynamicClient.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var appliedCM *unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &appliedCM) + assert.Equal(t, "test-cm", appliedCM.GetName()) + assert.Contains(t, appliedCM.GetLabels(), ApplysetPartOfLabel) + assert.NoError(t, err) + + // The fake client needs to return the object that was applied. + return true, appliedCM, nil + }) + // Reactor for parent update + dynamicClient.PrependReactor("patch", "secrets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var parentPatch unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &parentPatch) + assert.NoError(t, err) + assert.Equal(t, "parent-secret", parentPatch.GetName()) + assert.Contains(t, parentPatch.GetLabels(), ApplySetParentIDLabel) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetAdditionalNamespacesAnnotation) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetGKsAnnotation) + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetAdditionalNamespacesAnnotation], "default") + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetGKsAnnotation], "ConfigMap") + + return true, &parentPatch, nil + }) + + var pruned bool + dynamicClient.PrependReactor("delete", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + deleteAction := action.(k8stesting.DeleteAction) + assert.Equal(t, "prune-cm", deleteAction.GetName()) + pruned = true + return true, nil, nil + }) + + result, err := aset.Apply(context.Background(), true) + assert.NoError(t, err) + assert.NoError(t, result.Errors()) + assert.Equal(t, 1, result.DesiredCount) + assert.Len(t, result.AppliedObjects, 1) + assert.Len(t, result.PrunedObjects, 1) + assert.True(t, pruned) +} + +func TestApplySet_ApplyMultiNamespace(t *testing.T) { + parent := parentObj(secretGVK, "parent-secret") + + aset, dynamicClient := newTestApplySet(t, parent) + _, err := aset.Add(context.Background(), configMap("test-cm", "ns1")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), configMap("test-cm", "ns2")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), foo("test-foo", "ns3")) + assert.NoError(t, err) + + dynamicClient.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedCM unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedCM) + assert.NoError(t, err) + assert.NotNil(t, appliedCM) + assert.Contains(t, "test-cm", appliedCM.GetName()) + assert.Contains(t, appliedCM.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedCM, nil + }) + + dynamicClient.PrependReactor("patch", "foos", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedFoo unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedFoo) + assert.NoError(t, err) + assert.NotNil(t, appliedFoo) + assert.Contains(t, "test-foo", appliedFoo.GetName()) + assert.Contains(t, appliedFoo.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedFoo, nil + }) + // Reactor for parent update + dynamicClient.PrependReactor("patch", "secrets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var parentPatch unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &parentPatch) + assert.NoError(t, err) + assert.Equal(t, "parent-secret", parentPatch.GetName()) + assert.Contains(t, parentPatch.GetLabels(), ApplySetParentIDLabel) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetAdditionalNamespacesAnnotation) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetGKsAnnotation) + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetAdditionalNamespacesAnnotation], "ns1,ns2,ns3") + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetGKsAnnotation], "ConfigMap,Foo.foo.bar") + + return true, &parentPatch, nil + }) + + result, err := aset.Apply(context.Background(), false) + assert.NoError(t, err) + assert.NoError(t, result.Errors()) + assert.Equal(t, 3, result.DesiredCount) + assert.Len(t, result.AppliedObjects, 3) +} + +func TestApplySet_PruneMultiNamespace(t *testing.T) { + parent := parentObj(secretGVK, "parent-secret") + + // These CMs will be pruned + pruneCM1 := configMap("prune-cm", "ns1") + pruneCM1.Unstructured.SetLabels(map[string]string{ + ApplysetPartOfLabel: "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", + }) + pruneCM2 := configMap("prune-cm", "ns2") + pruneCM2.Unstructured.SetLabels(map[string]string{ + ApplysetPartOfLabel: "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", + }) + aset, dynamicClient := newTestApplySet(t, parent, pruneCM1, pruneCM2) + // Set the ApplysetPartOfLabel on the pruneCM to match the ID of the applyset + // This is needed because the applyset ID is dynamically generated based on the parent object. + // We need to ensure that the pruneCM has the correct label so it is discovered for pruning. + as := aset.(*applySet) + assert.Equal(t, "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", as.ID()) + + _, err := aset.Add(context.Background(), configMap("test-cm", "ns1")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), configMap("test-cm", "ns2")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), foo("test-foo", "ns3")) + assert.NoError(t, err) + + dynamicClient.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedCM unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedCM) + assert.NoError(t, err) + assert.NotNil(t, appliedCM) + assert.Contains(t, "test-cm", appliedCM.GetName()) + assert.Contains(t, appliedCM.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedCM, nil + }) + + dynamicClient.PrependReactor("patch", "foos", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedFoo unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedFoo) + assert.NoError(t, err) + assert.NotNil(t, appliedFoo) + assert.Contains(t, "test-foo", appliedFoo.GetName()) + assert.Contains(t, appliedFoo.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedFoo, nil + }) + // Reactor for parent update + dynamicClient.PrependReactor("patch", "secrets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var parentPatch unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &parentPatch) + assert.NoError(t, err) + assert.Equal(t, "parent-secret", parentPatch.GetName()) + assert.Contains(t, parentPatch.GetLabels(), ApplySetParentIDLabel) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetAdditionalNamespacesAnnotation) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetGKsAnnotation) + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetAdditionalNamespacesAnnotation], "ns1,ns2,ns3") + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetGKsAnnotation], "ConfigMap,Foo.foo.bar") + + return true, &parentPatch, nil + }) + + var pruned int = 0 + dynamicClient.PrependReactor("delete", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + deleteAction := action.(k8stesting.DeleteAction) + assert.Equal(t, "prune-cm", deleteAction.GetName()) + pruned++ + return true, nil, nil + }) + result, err := aset.Apply(context.Background(), true) + assert.NoError(t, err) + assert.NoError(t, result.Errors()) + assert.Equal(t, 3, result.DesiredCount) + assert.Equal(t, pruned, 2) + assert.Len(t, result.PrunedObjects, 2) +} + +func TestApplySet_PruneOldNamespace(t *testing.T) { + parent := parentObj(secretGVK, "parent-secret") + // Add annotation for old namespace + parent.SetAnnotations(map[string]string{ + "applyset.kubernetes.io/additional-namespaces": "oldns1", + }) + + // CM in old namespace should be pruned + pruneCM1 := configMap("prune-cm", "oldns1") + pruneCM1.Unstructured.SetLabels(map[string]string{ + ApplysetPartOfLabel: "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", + }) + pruneCM2 := configMap("prune-cm", "ns2") + pruneCM2.Unstructured.SetLabels(map[string]string{ + ApplysetPartOfLabel: "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", + }) + aset, dynamicClient := newTestApplySet(t, parent, pruneCM1, pruneCM2) + // Set the ApplysetPartOfLabel on the pruneCM to match the ID of the applyset + // This is needed because the applyset ID is dynamically generated based on the parent object. + // We need to ensure that the pruneCM has the correct label so it is discovered for pruning. + as := aset.(*applySet) + assert.Equal(t, "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", as.ID()) + + _, err := aset.Add(context.Background(), configMap("test-cm", "ns1")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), configMap("test-cm", "ns2")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), foo("test-foo", "ns3")) + assert.NoError(t, err) + + dynamicClient.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedCM unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedCM) + assert.NoError(t, err) + assert.NotNil(t, appliedCM) + assert.Contains(t, "test-cm", appliedCM.GetName()) + assert.Contains(t, appliedCM.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedCM, nil + }) + + dynamicClient.PrependReactor("patch", "foos", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedFoo unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedFoo) + assert.NoError(t, err) + assert.NotNil(t, appliedFoo) + assert.Contains(t, "test-foo", appliedFoo.GetName()) + assert.Contains(t, appliedFoo.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedFoo, nil + }) + // Reactor for parent update + dynamicClient.PrependReactor("patch", "secrets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var parentPatch unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &parentPatch) + assert.NoError(t, err) + assert.Equal(t, "parent-secret", parentPatch.GetName()) + assert.Contains(t, parentPatch.GetLabels(), ApplySetParentIDLabel) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetAdditionalNamespacesAnnotation) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetGKsAnnotation) + //assert.Equal(t, parentPatch.GetAnnotations()[ApplySetAdditionalNamespacesAnnotation], "ns1,ns2,ns3,oldns1") + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetGKsAnnotation], "ConfigMap,Foo.foo.bar") + + return true, &parentPatch, nil + }) + + var pruned int = 0 + dynamicClient.PrependReactor("delete", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + deleteAction := action.(k8stesting.DeleteAction) + assert.Equal(t, "prune-cm", deleteAction.GetName()) + pruned++ + return true, nil, nil + }) + result, err := aset.Apply(context.Background(), true) + assert.NoError(t, err) + assert.NoError(t, result.Errors()) + assert.Equal(t, 3, result.DesiredCount) + assert.Equal(t, 2, pruned) + assert.Len(t, result.PrunedObjects, 2) +} + +func TestApplySet_PruneOldGVKs(t *testing.T) { + parent := parentObj(secretGVK, "parent-secret") + // Add annotation for old GK + parent.SetAnnotations(map[string]string{ + "applyset.kubernetes.io/contains-group-kinds": "Foo.foo.bar", + }) + + // Foo type should be pruned + pruneFoo := foo("prune-foo", "ns1") + pruneFoo.Unstructured.SetLabels(map[string]string{ + ApplysetPartOfLabel: "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", + }) + aset, dynamicClient := newTestApplySet(t, parent, pruneFoo) + // Set the ApplysetPartOfLabel on the pruneCM to match the ID of the applyset + // This is needed because the applyset ID is dynamically generated based on the parent object. + // We need to ensure that the pruneCM has the correct label so it is discovered for pruning. + as := aset.(*applySet) + assert.Equal(t, "applyset-wHf5Gity0G0nPN34KuNBIBBEOu2H9ED2KqsblMPFygM-v1", as.ID()) + + _, err := aset.Add(context.Background(), configMap("test-cm", "ns1")) + assert.NoError(t, err) + _, err = aset.Add(context.Background(), configMap("test-cm", "ns2")) + assert.NoError(t, err) + + dynamicClient.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + var appliedCM unstructured.Unstructured + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + err = json.Unmarshal(patchAction.GetPatch(), &appliedCM) + assert.NoError(t, err) + assert.NotNil(t, appliedCM) + assert.Contains(t, "test-cm", appliedCM.GetName()) + assert.Contains(t, appliedCM.GetLabels(), ApplysetPartOfLabel) + // The fake client needs to return the object that was applied. + return true, &appliedCM, nil + }) + + // Reactor for parent update + dynamicClient.PrependReactor("patch", "secrets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + patchAction := action.(k8stesting.PatchAction) + assert.Equal(t, types.ApplyPatchType, patchAction.GetPatchType()) + + var parentPatch unstructured.Unstructured + err = json.Unmarshal(patchAction.GetPatch(), &parentPatch) + assert.NoError(t, err) + assert.Equal(t, "parent-secret", parentPatch.GetName()) + assert.Contains(t, parentPatch.GetLabels(), ApplySetParentIDLabel) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetAdditionalNamespacesAnnotation) + assert.Contains(t, parentPatch.GetAnnotations(), ApplySetGKsAnnotation) + assert.Equal(t, parentPatch.GetAnnotations()[ApplySetAdditionalNamespacesAnnotation], "ns1,ns2") + //assert.Equal(t, parentPatch.GetAnnotations()[ApplySetGKsAnnotation], "ConfigMap,Foo.foo.bar") + + return true, &parentPatch, nil + }) + + var pruned int = 0 + dynamicClient.PrependReactor("delete", "foos", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + deleteAction := action.(k8stesting.DeleteAction) + assert.Equal(t, "prune-foo", deleteAction.GetName()) + pruned++ + return true, nil, nil + }) + result, err := aset.Apply(context.Background(), true) + assert.NoError(t, err) + assert.NoError(t, result.Errors()) + assert.Equal(t, 2, result.DesiredCount) + assert.Equal(t, 1, pruned) + assert.Len(t, result.PrunedObjects, 1) +} diff --git a/pkg/applyset/const.go b/pkg/applyset/const.go new file mode 100644 index 000000000..dde792f65 --- /dev/null +++ b/pkg/applyset/const.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Kubernetes Authors. +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package applyset + +// Label and annotation keys from the ApplySet specification. +// https://git.k8s.io/enhancements/keps/sig-cli/3659-kubectl-apply-prune#design-details-applyset-specification +const ( + // ApplySetToolingAnnotation is the key of the label that indicates which tool is used to manage this ApplySet. + // Tooling should refuse to mutate ApplySets belonging to other tools. + // The value must be in the format /. + // Example value: "kubectl/v1.27" or "helm/v3" or "kpt/v1.0.0" + ApplySetToolingAnnotation = "applyset.kubernetes.io/tooling" + + // ApplySetAdditionalNamespacesAnnotation annotation extends the scope of the ApplySet beyond the parent + // object's own namespace (if any) to include the listed namespaces. The value is a comma-separated + // list of the names of namespaces other than the parent's namespace in which objects are found + // Example value: "kube-system,ns1,ns2". + ApplySetAdditionalNamespacesAnnotation = "applyset.kubernetes.io/additional-namespaces" + + // ApplySetGKsAnnotation is a list of group-kinds used to optimize listing of ApplySet member objects. + // It is optional in the ApplySet specification, as tools can perform discovery or use a different optimization. + // However, it is currently required in kubectl. + // When present, the value of this annotation must be a comma separated list of the group-kinds, + // in the fully-qualified name format, i.e. .. + // Example value: "Certificate.cert-manager.io,ConfigMap,deployments.apps,Secret,Service" + ApplySetGKsAnnotation = "applyset.kubernetes.io/contains-group-kinds" + + // ApplySetParentIDLabel is the key of the label that makes object an ApplySet parent object. + // Its value MUST use the format specified in V1ApplySetIdFormat below + ApplySetParentIDLabel = "applyset.kubernetes.io/id" + + // V1ApplySetIdFormat is the format required for the value of ApplySetParentIDLabel (and ApplysetPartOfLabel). + // The %s segment is the unique ID of the object itself, which MUST be the base64 encoding + // (using the URL safe encoding of RFC4648) of the hash of the GKNN of the object it is on, in the form: + // base64(sha256(...)). + V1ApplySetIdFormat = "applyset-%s-v1" + + // ApplysetPartOfLabel is the key of the label which indicates that the object is a member of an ApplySet. + // The value of the label MUST match the value of ApplySetParentIDLabel on the parent object. + ApplysetPartOfLabel = "applyset.kubernetes.io/part-of" + + // ApplysetParentCRDLabel is the key of the label that can be set on a CRD to identify + // the custom resource type it defines (not the CRD itself) as an allowed parent for an ApplySet. + ApplysetParentCRDLabel = "applyset.kubernetes.io/is-parent-type" + + // applySetIDPartDelimiter is the delimiter used to separate the parts of the ApplySet ID. + ApplySetIDPartDelimiter = "." +) diff --git a/pkg/applyset/doc.go b/pkg/applyset/doc.go new file mode 100644 index 000000000..3b0e6109d --- /dev/null +++ b/pkg/applyset/doc.go @@ -0,0 +1,51 @@ +// Copyright 2023 The Kubernetes Authors. +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package applyset provides a library for managing sets of Kubernetes objects. +The core of this package is the Set interface, which defines the operations +for an apply set. An apply set is a collection of Kubernetes objects that are +managed together as a group. This is useful for managing the resources of a +composite object, like a CRD that creates other resources. + +This is implemented as per the KEP: +https://github.com/kubernetes/enhancements/blob/master/keps/sig-cli/3659-kubectl-apply-prune/README.md + +Existing implementations of the KEP: + + 1. kubectl implements the KEP in its codebase: + https://github.com/kubernetes/kubectl/blob/master/pkg/cmd/apply/applyset.go + https://github.com/kubernetes/kubectl/blob/master/pkg/cmd/apply/applyset_pruner.go + cli focussed, pulls in a lot of dependencies not required for a controller. + + 2. kubernetes-declarative-pattern (kdp) + https://pkg.go.dev/sigs.k8s.io/kubebuilder-declarative-pattern/applylib/applyset + controller focussed, missing callbacks, lifecycle support etc. + +Why Fork: +* The kubectl implementation is built for cli and has many dependencies that we dont need in a controller. +* The kdp implementation is good for controller but lacks some enhancements that are required for it to work with KRO. +* We need the ability to do partial applies of an applyset (without pruning) that is not supported by KDP library. +* Also we need the ability to read-in the result of an apply for use by KRO engine. This requires some callback support. +* We ended up adding call-backs (to populate KRO engine) +* Also we enhanced applylib to support ExternalRef (read from cluster, no changes) - KRO specific for now. +* Planning to add LifeCycle modifiers to support additional behaviours like abandon on delete, no update to a resource after creation etc. + +Upstreaming: +The goal is to upstream the changes back to either kubernetes-declarative-pattern or controller-runtime. +We may need to do the enhancements and decide if all of it or some of it can be upstreamed. +We will need to do a few iterations in the KRO repo first before we get an idea of where and how to upstream. +*/ +package applyset diff --git a/pkg/applyset/prune.go b/pkg/applyset/prune.go new file mode 100644 index 000000000..ab0910555 --- /dev/null +++ b/pkg/applyset/prune.go @@ -0,0 +1,204 @@ +// Copyright 2023 The Kubernetes Authors. +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Applylib has code copied over from: +// - kubectl pkg/cmd/apply/applyset.go +// - kubebuilder-declarative-pattern/applylib +// - Creating a simpler, self-contained version of the library that is purpose built for controllers. +// - KEP describing applyset: +// https://git.k8s.io/enhancements/keps/sig-cli/3659-kubectl-apply-prune#design-details-applyset-specification + +package applyset + +import ( + "context" + "fmt" + + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/dynamic" +) + +const ( + // This is set to an arbitrary number here for now. + // This ensures we are no unbounded when pruning many GVKs. + // Could be parameterized later on + // TODO (barney-s): Possible parameterization target + PruneGVKParallelizationLimit = 1 +) + +// PruneObject is an apiserver object that should be deleted as part of prune. +type PruneObject struct { + *unstructured.Unstructured + Mapping *meta.RESTMapping +} + +// String returns a human-readable name of the object, for use in debug messages. +func (p *PruneObject) String() string { + s := p.Mapping.GroupVersionKind.GroupKind().String() + + if p.GetNamespace() != "" { + s += " " + p.GetNamespace() + "/" + p.GetName() + } else { + s += " " + p.GetName() + } + return s +} + +// findAllObjectsToPrune returns the list of objects that will be pruned. +// Calling this instead of Prune can be useful for dry-run / diff behaviour. +func (a *applySet) findAllObjectsToPrune( + ctx context.Context, + dynamicClient dynamic.Interface, + visitedUIDs sets.Set[types.UID], +) ([]PruneObject, error) { + type task struct { + namespace string + restMapping *meta.RESTMapping + results []PruneObject + } + + tasks := []*task{} + + restMappings := map[schema.GroupKind]*meta.RESTMapping{} + for gk, restMapping := range a.desiredRESTMappings { + restMappings[gk] = restMapping + } + + // add restmapping for older GKs + for _, entry := range a.supersetGKs.UnsortedList() { + if entry == "" { + continue + } + gk := schema.ParseGroupKind(entry) + if _, ok := restMappings[gk]; ok { + continue + } + restMapping, err := a.restMapper.RESTMapping(gk) + if err != nil { + a.log.V(2).Info("no rest mapping for gk", "gk", gk) + continue + } + restMappings[gk] = restMapping + + } + // We run discovery in parallel, in as many goroutines as priority and fairness will allow + // (We don't expect many requests in real-world scenarios - maybe tens, unlikely to be hundreds) + for _, restMapping := range restMappings { + switch restMapping.Scope.Name() { + case meta.RESTScopeNameNamespace: + for _, namespace := range a.supersetNamespaces.UnsortedList() { + if namespace == "" { + namespace = metav1.NamespaceDefault + } + tasks = append(tasks, &task{ + namespace: namespace, + restMapping: restMapping, + }) + } + + case meta.RESTScopeNameRoot: + tasks = append(tasks, &task{ + restMapping: restMapping, + }) + + default: + return nil, fmt.Errorf("unhandled scope %q", restMapping.Scope.Name()) + } + } + + if PruneGVKParallelizationLimit <= 1 { + for i := range tasks { + task := tasks[i] + results, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUIDs, task.namespace, task.restMapping) + if err != nil { + return nil, fmt.Errorf("listing %v objects for pruning: %w", task.restMapping.GroupVersionKind.String(), err) + } + task.results = results + } + } else { + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(PruneGVKParallelizationLimit) + for i := range tasks { + task := tasks[i] + group.Go(func() error { + results, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUIDs, task.namespace, task.restMapping) + if err != nil { + return fmt.Errorf("listing %v objects for pruning: %w", task.restMapping.GroupVersionKind.String(), err) + } + task.results = results + return nil + }) + } + // Wait for all the goroutines to finish + if err := group.Wait(); err != nil { + return nil, err + } + } + + var allObjects []PruneObject + for _, task := range tasks { + allObjects = append(allObjects, task.results...) + } + return allObjects, nil +} + +func (a *applySet) LabelSelectorForMembers() string { + return metav1.FormatLabelSelector(&metav1.LabelSelector{ + MatchLabels: a.InjectApplysetLabels(nil), + }) +} + +func (a *applySet) findObjectsToPrune( + ctx context.Context, + dynamicClient dynamic.Interface, + visitedUIDs sets.Set[types.UID], + namespace string, + mapping *meta.RESTMapping, +) ([]PruneObject, error) { + applysetLabelSelector := a.LabelSelectorForMembers() + + opt := metav1.ListOptions{ + LabelSelector: applysetLabelSelector, + } + + a.log.V(2).Info("listing objects for pruning", "namespace", namespace, "resource", mapping.Resource) + objects, err := dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opt) + if err != nil { + return nil, err + } + + pruneObjects := []PruneObject{} + for i := range objects.Items { + obj := &objects.Items[i] + + uid := obj.GetUID() + if visitedUIDs.Has(uid) { + continue + } + + pruneObjects = append(pruneObjects, PruneObject{ + Unstructured: obj, + Mapping: mapping, + }) + + } + return pruneObjects, nil +} diff --git a/pkg/applyset/result.go b/pkg/applyset/result.go new file mode 100644 index 000000000..c01b5eb3d --- /dev/null +++ b/pkg/applyset/result.go @@ -0,0 +1,137 @@ +// Copyright 2025 The Kube Resource Orchestrator Authors +// Copyright 2022 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Derived from: https://github.com/kubernetes-sigs/kubebuilder-declarative-pattern/blob/master/applylib/applyset/results.go + +package applyset + +import ( + "errors" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" +) + +// AppliedObject is a wrapper around an ApplyableObject that contains the last applied object +// It is used to track the applied object and any errors that occurred while applying it. +// It is also used to check if the object has been mutated in the cluster as part of the apply operation. +type AppliedObject struct { + ApplyableObject + LastApplied *unstructured.Unstructured + Error error + Message string +} + +func (ao *AppliedObject) HasClusterMutation() bool { + // If there was an error applying, we consider the object to have not changed. + if ao.Error != nil { + return false + } + + // If the object was not applied, we consider it to have not changed. + if ao.LastApplied == nil { + return false + } + + return ao.lastReadRevision != ao.LastApplied.GetResourceVersion() +} + +type PrunedObject struct { + PruneObject + Error error +} + +// ApplyResult summarizes the results of an apply operation +// It returns a list of applied and pruned objects +// AppliedObject has both the input object and the result of the apply operation +// Any errors returned by the apply call is also recorded in it. +// PrunedObject records any error returned by the delete call. +type ApplyResult struct { + DesiredCount int + AppliedObjects []AppliedObject + PrunedObjects []PrunedObject +} + +func (a *ApplyResult) Errors() error { + return errors.Join(a.applyErrors(), a.pruneErrors()) +} + +func (a *ApplyResult) pruneErrors() error { + var err error + for _, pruned := range a.PrunedObjects { + err = errors.Join(err, pruned.Error) + } + return err + +} + +func (a *ApplyResult) applyErrors() error { + var err error + if len(a.AppliedObjects) != a.DesiredCount { + err = errors.Join(err, fmt.Errorf("expected %d applied objects, got %d", + a.DesiredCount, len(a.AppliedObjects))) + } + for _, applied := range a.AppliedObjects { + err = errors.Join(err, applied.Error) + } + return err +} + +func (a *ApplyResult) AppliedUIDs() sets.Set[types.UID] { + uids := sets.New[types.UID]() + for _, applied := range a.AppliedObjects { + if applied.Error != nil { + continue + } + uids.Insert(applied.LastApplied.GetUID()) + } + return uids +} + +func (a *ApplyResult) HasClusterMutation() bool { + for _, applied := range a.AppliedObjects { + if applied.HasClusterMutation() { + return true + } + } + return false +} + +func (a *ApplyResult) recordApplied( + obj ApplyableObject, + lastApplied *unstructured.Unstructured, + err error, +) { + ao := AppliedObject{ + ApplyableObject: obj, + LastApplied: lastApplied, + Error: err, + } + a.AppliedObjects = append(a.AppliedObjects, ao) +} + +func (a *ApplyResult) recordPruned( + obj PruneObject, + err error, +) PrunedObject { + po := PrunedObject{ + PruneObject: obj, + Error: err, + } + a.PrunedObjects = append(a.PrunedObjects, po) + return po +} diff --git a/pkg/applyset/tracker.go b/pkg/applyset/tracker.go new file mode 100644 index 000000000..06d9eabda --- /dev/null +++ b/pkg/applyset/tracker.go @@ -0,0 +1,128 @@ +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package applyset + +import ( + "encoding/json" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" +) + +// ApplyableObject is implemented by objects that can be applied to the cluster. +// We don't need much, so this might allow for more efficient implementations in future. +type Applyable interface { + // GroupVersionKind returns the GroupVersionKind structure describing the type of the object + GroupVersionKind() schema.GroupVersionKind + // GetNamespace returns the namespace of the object + GetNamespace() string + // GetName returns the name of the object + GetName() string + + // GetLabels returns the labels of the object + GetLabels() map[string]string + // SetLabels sets the labels of the object + SetLabels(labels map[string]string) + + // The object should implement json marshalling + json.Marshaler +} +type ApplyableObject struct { + *unstructured.Unstructured + + // Optional + // User provided unique identifier for the object. + // If present a uniqeness check is done when adding + // It is opaque and is passed in the callbacks as is + ID string + + // Lifecycle hints + // TODO(barney-s): need to exapnd on these: https://github.com/kro-run/kro/issues/542 + ExternalRef bool + + // lastReadRevision is the revision of the object that was last read from the cluster. + lastReadRevision string +} + +func (a *ApplyableObject) String() string { + return fmt.Sprintf("[%s:%s/%s]", a.GroupVersionKind(), a.GetNamespace(), a.GetName()) +} + +type k8sObjectKey struct { + schema.GroupVersionKind + types.NamespacedName +} + +type tracker struct { + // objects is a list of objects we are applying. + objects []ApplyableObject + + // serverIDs is a map of object key to object. + serverIDs map[k8sObjectKey]bool + + // clientIDs is a map of object key to object. + clientIDs map[string]bool +} + +func NewTracker() *tracker { + return &tracker{ + serverIDs: make(map[k8sObjectKey]bool), + clientIDs: make(map[string]bool), + } +} + +func (t *tracker) Add(obj ApplyableObject) error { + gvk := obj.GroupVersionKind() + + // Server side uniqueness check + objectKey := k8sObjectKey{ + GroupVersionKind: gvk, + NamespacedName: types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, + } + + // detect duplicates in the objects list + if _, found := t.serverIDs[objectKey]; found { + return fmt.Errorf("duplicate object %v", objectKey) + } + t.serverIDs[objectKey] = true + + // TODO(barney-s): Do we need to care about client side uniqueness? + // We could just not take the ID (opaque string) and let user deal with mapping + // GVKNN to their ID. Adding a todo here to revisit this. + if obj.ID != "" { + if _, found := t.clientIDs[obj.ID]; found { + return fmt.Errorf("duplicate object ID %v", obj.ID) + } + t.clientIDs[obj.ID] = true + } + + // Ensure the object is marshallable + if _, err := json.Marshal(obj); err != nil { + return fmt.Errorf("object %v is not json marshallable: %w", objectKey, err) + } + + // Add the object to the tracker + t.objects = append(t.objects, obj) + return nil +} + +func (t *tracker) Len() int { + return len(t.objects) +} diff --git a/pkg/applyset/tracker_test.go b/pkg/applyset/tracker_test.go new file mode 100644 index 000000000..43956aff6 --- /dev/null +++ b/pkg/applyset/tracker_test.go @@ -0,0 +1,175 @@ +// Copyright 2025 The Kube Resource Orchestrator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package applyset + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestTracker_Add(t *testing.T) { + tests := []struct { + name string + objects []ApplyableObject + expectError bool + expectedLen int + }{ + { + name: "add single object", + objects: []ApplyableObject{ + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm", + "namespace": "default", + }, + }, + }, + ID: "client-id-1", + }, + }, + expectError: false, + expectedLen: 1, + }, + { + name: "add two different objects", + objects: []ApplyableObject{ + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm-1", + "namespace": "default", + }, + }, + }, + ID: "client-id-1", + }, + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm-2", + "namespace": "default", + }, + }, + }, + ID: "client-id-2", + }, + }, + expectError: false, + expectedLen: 2, + }, + { + name: "add duplicate object by GVKNN", + objects: []ApplyableObject{ + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm", + "namespace": "default", + }, + }, + }, + ID: "client-id-1", + }, + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm", + "namespace": "default", + }, + }, + }, + ID: "client-id-2", + }, + }, + expectError: true, + expectedLen: 1, + }, + { + name: "add duplicate object by client ID", + objects: []ApplyableObject{ + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm-1", + "namespace": "default", + }, + }, + }, + ID: "client-id-1", + }, + { + Unstructured: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm-2", + "namespace": "default", + }, + }, + }, + ID: "client-id-1", + }, + }, + expectError: true, + expectedLen: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracker := NewTracker() + var err error + for _, obj := range tt.objects { + // GroupVersionKind is not set automatically on unstructured.Unstructured + obj.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}) + err = tracker.Add(obj) + if err != nil { + break + } + } + + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + assert.Len(t, tracker.objects, tt.expectedLen) + }) + } +} diff --git a/pkg/client/fake/fake.go b/pkg/client/fake/fake.go index 3b4a922e5..78a4c1731 100644 --- a/pkg/client/fake/fake.go +++ b/pkg/client/fake/fake.go @@ -20,6 +20,7 @@ import ( "github.com/kro-run/kro/pkg/client" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -31,6 +32,7 @@ type FakeSet struct { KubernetesClient kubernetes.Interface ApiExtensionsClient apiextensionsv1.ApiextensionsV1Interface Config *rest.Config + restMapper meta.RESTMapper } var _ client.SetInterface = (*FakeSet)(nil) @@ -75,6 +77,14 @@ func (f *FakeSet) WithImpersonation(user string) (client.SetInterface, error) { return f, nil } +func (f *FakeSet) RESTMapper() meta.RESTMapper { + return f.restMapper +} + +func (f *FakeSet) SetRESTMapper(restMapper meta.RESTMapper) { + f.restMapper = restMapper +} + // FakeCRD is a fake implementation of CRDInterface for testing type FakeCRD struct{} diff --git a/pkg/client/set.go b/pkg/client/set.go index 5b36795fc..319ee2eae 100644 --- a/pkg/client/set.go +++ b/pkg/client/set.go @@ -18,6 +18,7 @@ import ( "fmt" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -45,6 +46,9 @@ type SetInterface interface { // WithImpersonation returns a new client that impersonates the given user WithImpersonation(user string) (SetInterface, error) + + RESTMapper() meta.RESTMapper + SetRESTMapper(restMapper meta.RESTMapper) } // Set provides a unified interface for different Kubernetes clients @@ -53,6 +57,8 @@ type Set struct { kubernetes *kubernetes.Clientset dynamic *dynamic.DynamicClient apiExtensionsV1 *apiextensionsv1.ApiextensionsV1Client + // restMapper is a REST mapper for the Kubernetes API server + restMapper meta.RESTMapper } var _ SetInterface = (*Set)(nil) @@ -148,6 +154,11 @@ func (c *Set) RESTConfig() *rest.Config { return rest.CopyConfig(c.config) } +// RESTMapper returns the REST mapper +func (c *Set) RESTMapper() meta.RESTMapper { + return c.restMapper +} + // CRD returns a new CRDInterface instance func (c *Set) CRD(cfg CRDWrapperConfig) CRDInterface { if cfg.Client == nil { @@ -164,3 +175,8 @@ func (c *Set) WithImpersonation(user string) (SetInterface, error) { ImpersonateUser: user, }) } + +// SetRESTMapper sets the REST mapper for the client +func (c *Set) SetRESTMapper(restMapper meta.RESTMapper) { + c.restMapper = restMapper +} diff --git a/pkg/controller/instance/controller.go b/pkg/controller/instance/controller.go index cb0be219e..3645d3340 100644 --- a/pkg/controller/instance/controller.go +++ b/pkg/controller/instance/controller.go @@ -23,6 +23,7 @@ import ( "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -98,6 +99,7 @@ func NewController( gvr schema.GroupVersionResource, rgd *graph.Graph, clientSet kroclient.SetInterface, + restMapper meta.RESTMapper, defaultServiceAccounts map[string]string, instanceLabeler metadata.Labeler, ) *Controller { @@ -154,6 +156,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) error { log: log, gvr: c.gvr, client: executionClient, + restMapper: c.clientSet.RESTMapper(), runtime: rgRuntime, instanceLabeler: c.instanceLabeler, instanceSubResourcesLabeler: instanceSubResourcesLabeler, diff --git a/pkg/controller/instance/controller_reconcile.go b/pkg/controller/instance/controller_reconcile.go index 537a2a3cc..6a8e72b99 100644 --- a/pkg/controller/instance/controller_reconcile.go +++ b/pkg/controller/instance/controller_reconcile.go @@ -20,13 +20,16 @@ import ( "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "github.com/kro-run/kro/pkg/controller/instance/delta" + "sigs.k8s.io/release-utils/version" + + "github.com/kro-run/kro/pkg/applyset" "github.com/kro-run/kro/pkg/metadata" "github.com/kro-run/kro/pkg/requeue" "github.com/kro-run/kro/pkg/runtime" @@ -44,6 +47,16 @@ const ( ResourceStatePendingDeletion = "PENDING_DELETION" ResourceStateWaitingForReadiness = "WAITING_FOR_READINESS" ResourceStateUpdating = "UPDATING" + + FieldManagerForApplyset = "kro.run/applyset" + FieldManagerForLabeler = "kro.run/labeller" +) + +var ( + KROTooling = applyset.ToolingID{ + Name: "kro", + Version: version.GetVersionInfo().GitVersion, + } ) // instanceGraphReconciler is responsible for reconciling a single instance and @@ -56,6 +69,10 @@ type instanceGraphReconciler struct { gvr schema.GroupVersionResource // client is a dynamic client for interacting with the Kubernetes API server client dynamic.Interface + + // restMapper is a REST mapper for the Kubernetes API server + restMapper meta.RESTMapper + // runtime is the runtime representation of the ResourceGraphDefinition. It holds the // information about the instance and its sub-resources, the CEL expressions // their dependencies, and the resolved values... etc @@ -90,7 +107,10 @@ func (igr *instanceGraphReconciler) reconcile(ctx context.Context) error { // handleReconciliation provides a common wrapper for reconciliation operations, // handling status updates and error management. -func (igr *instanceGraphReconciler) handleReconciliation(ctx context.Context, reconcileFunc func(context.Context) error) error { +func (igr *instanceGraphReconciler) handleReconciliation( + ctx context.Context, + reconcileFunc func(context.Context) error, +) error { defer func() { // Update instance state based on reconciliation result igr.updateInstanceState() @@ -109,6 +129,18 @@ func (igr *instanceGraphReconciler) handleReconciliation(ctx context.Context, re return igr.state.ReconcileErr } +func (igr *instanceGraphReconciler) updateResourceReadiness(resourceID string) { + log := igr.log.WithValues("resourceID", resourceID) + resourceState := igr.state.ResourceStates[resourceID] + if ready, reason, err := igr.runtime.IsResourceReady(resourceID); err != nil || !ready { + log.V(1).Info("Resource not ready", "reason", reason, "error", err) + resourceState.State = ResourceStateWaitingForReadiness + resourceState.Err = fmt.Errorf("resource not ready: %s: %w", reason, err) + } else { + resourceState.State = ResourceStateSynced + } +} + // reconcileInstance handles the reconciliation of an active instance func (igr *instanceGraphReconciler) reconcileInstance(ctx context.Context) error { instance := igr.runtime.GetInstance() @@ -123,193 +155,113 @@ func (igr *instanceGraphReconciler) reconcileInstance(ctx context.Context) error igr.state.ResourceStates[resourceID] = &ResourceState{State: ResourceStatePending} } - // Reconcile resources in topological order - for _, resourceID := range igr.runtime.TopologicalOrder() { - if err := igr.reconcileResource(ctx, resourceID); err != nil { - return err - } - - // Synchronize runtime state after each resource - if _, err := igr.runtime.Synchronize(); err != nil { - return fmt.Errorf("failed to synchronize reconciling resource %s: %w", resourceID, err) - } + config := applyset.Config{ + ToolLabels: igr.instanceSubResourcesLabeler.Labels(), + FieldManager: FieldManagerForApplyset, + ToolingID: KROTooling, + Log: igr.log, } - return nil -} - -// setupInstance prepares an instance for reconciliation by setting up necessary -// labels and managed state. -func (igr *instanceGraphReconciler) setupInstance(ctx context.Context, instance *unstructured.Unstructured) error { - patched, err := igr.setManaged(ctx, instance, instance.GetUID()) + aset, err := applyset.New(instance, igr.restMapper, igr.client, config) if err != nil { - return err - } - if patched != nil { - instance.Object = patched.Object + return igr.delayedRequeue(fmt.Errorf("failed creating an applyset: %w", err)) } - return nil -} -// reconcileResource handles the reconciliation of a single resource within the instance -func (igr *instanceGraphReconciler) reconcileResource(ctx context.Context, resourceID string) error { - log := igr.log.WithValues("resourceID", resourceID) - resourceState := &ResourceState{State: ResourceStateInProgress} - igr.state.ResourceStates[resourceID] = resourceState - - // Check if resource should be processed (create or get) - if want, err := igr.runtime.ReadyToProcessResource(resourceID); err != nil || !want { - log.V(1).Info("Skipping resource processing", "reason", err) - resourceState.State = ResourceStateSkipped - igr.runtime.IgnoreResource(resourceID) - return nil - } - - // Get and validate resource state - resource, state := igr.runtime.GetResource(resourceID) - if state != runtime.ResourceStateResolved { - return igr.delayedRequeue(fmt.Errorf("resource %s not resolved: state=%v", resourceID, state)) - } + unresolvedResourceID := "" + prune := true + // Reconcile resources in topological order + for _, resourceID := range igr.runtime.TopologicalOrder() { + log := igr.log.WithValues("resourceID", resourceID) + + // Initialize resource state in instance state + resourceState := &ResourceState{State: ResourceStateInProgress} + igr.state.ResourceStates[resourceID] = resourceState + + // Check if resource should be processed (create or get) + // TODO(barney-s): skipping on error seems un-intuitive, should we skip on CEL evaluation error? + if want, err := igr.runtime.ReadyToProcessResource(resourceID); err != nil || !want { + log.V(1).Info("Skipping resource processing", "reason", err) + resourceState.State = ResourceStateSkipped + igr.runtime.IgnoreResource(resourceID) + continue + } - // Handle resource reconciliation - return igr.handleResourceReconciliation(ctx, resourceID, resource, resourceState) -} + // Check if the resource dependencies are resolved and can be reconciled + resource, state := igr.runtime.GetResource(resourceID) -// handleResourceReconciliation manages the reconciliation of a specific resource, -// including creation, updates, and readiness checks. -func (igr *instanceGraphReconciler) handleResourceReconciliation( - ctx context.Context, - resourceID string, - resource *unstructured.Unstructured, - resourceState *ResourceState, -) error { - log := igr.log.WithValues("resourceID", resourceID) + if state != runtime.ResourceStateResolved { + unresolvedResourceID = resourceID + prune = false + break + } - // Get resource client and namespace - rc := igr.getResourceClient(resourceID) + applyable := applyset.ApplyableObject{ + Unstructured: resource, + ID: resourceID, + ExternalRef: igr.runtime.ResourceDescriptor(resourceID).IsExternalRef(), + } + clusterObj, err := aset.Add(ctx, applyable) + if err != nil { + return fmt.Errorf("failed to add resource to applyset: %w", err) + } - // Check if resource exists - observed, err := rc.Get(ctx, resource.GetName(), metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - // For read-only resources, we don't create - if igr.runtime.ResourceDescriptor(resourceID).IsExternalRef() { - resourceState.State = "WAITING_FOR_EXTERNAL_RESOURCE" - resourceState.Err = fmt.Errorf("external resource not found: %w", err) - return igr.delayedRequeue(resourceState.Err) + if clusterObj != nil { + igr.runtime.SetResource(resourceID, clusterObj) + igr.updateResourceReadiness(resourceID) + // Synchronize runtime state after each resource + if _, err := igr.runtime.Synchronize(); err != nil { + return fmt.Errorf("failed to synchronize after apply/prune: %w", err) } - return igr.handleResourceCreation(ctx, rc, resource, resourceID, resourceState) } - resourceState.State = ResourceStateError - resourceState.Err = fmt.Errorf("failed to get resource: %w", err) - return resourceState.Err } - // Update runtime with observed state - igr.runtime.SetResource(resourceID, observed) - - // Check resource readiness - if ready, reason, err := igr.runtime.IsResourceReady(resourceID); err != nil || !ready { - log.V(1).Info("Resource not ready", "reason", reason, "error", err) - resourceState.State = ResourceStateWaitingForReadiness - resourceState.Err = fmt.Errorf("resource not ready: %s: %w", reason, err) - return igr.delayedRequeue(resourceState.Err) + result, err := aset.Apply(ctx, prune) + for _, applied := range result.AppliedObjects { + resourceState := igr.state.ResourceStates[applied.ID] + if applied.Error != nil { + resourceState.State = ResourceStateError + resourceState.Err = applied.Error + } else { + igr.updateResourceReadiness(applied.ID) + } } - resourceState.State = ResourceStateSynced - - // For read-only resources, don't perform updates - if igr.runtime.ResourceDescriptor(resourceID).IsExternalRef() { - return nil + if err != nil { + return igr.delayedRequeue(fmt.Errorf("failed to apply/prune resources: %w", err)) } - return igr.updateResource(ctx, rc, resource, observed, resourceID, resourceState) -} - -// getResourceClient returns the appropriate dynamic client and namespace for a resource -func (igr *instanceGraphReconciler) getResourceClient(resourceID string) dynamic.ResourceInterface { - descriptor := igr.runtime.ResourceDescriptor(resourceID) - gvr := descriptor.GetGroupVersionResource() - namespace := igr.getResourceNamespace(resourceID) + // Inspect resource states and return error if any resource is in error state + if err := igr.state.ResourceErrors(); err != nil { + return igr.delayedRequeue(err) + } - if descriptor.IsNamespaced() { - return igr.client.Resource(gvr).Namespace(namespace) + if err := result.Errors(); err != nil { + return fmt.Errorf("failed to apply/prune resources: %w", err) } - return igr.client.Resource(gvr) -} -// handleResourceCreation manages the creation of a new resource -func (igr *instanceGraphReconciler) handleResourceCreation( - ctx context.Context, - rc dynamic.ResourceInterface, - resource *unstructured.Unstructured, - resourceID string, - resourceState *ResourceState, -) error { - igr.log.V(1).Info("Creating new resource", "resourceID", resourceID) + if unresolvedResourceID != "" { + return igr.delayedRequeue(fmt.Errorf("unresolved resource: %s", unresolvedResourceID)) + } - // Apply labels and create resource - igr.instanceSubResourcesLabeler.ApplyLabels(resource) - if _, err := rc.Create(ctx, resource, metav1.CreateOptions{}); err != nil { - resourceState.State = ResourceStateError - resourceState.Err = fmt.Errorf("failed to create resource: %w", err) - return resourceState.Err + // If there are any cluster mutations, we need to requeue. + if result.HasClusterMutation() { + return igr.delayedRequeue(fmt.Errorf("changes applied to cluster")) } - resourceState.State = ResourceStateCreated - return igr.delayedRequeue(fmt.Errorf("awaiting resource creation completion")) + return nil } -// updateResource handles updates to an existing resource, comparing the desired -// and observed states and applying the necessary changes. -func (igr *instanceGraphReconciler) updateResource( - ctx context.Context, - rc dynamic.ResourceInterface, - desired, observed *unstructured.Unstructured, - resourceID string, - resourceState *ResourceState, -) error { - igr.log.V(1).Info("Processing resource update", "resourceID", resourceID) - - // Compare desired and observed states - differences, err := delta.Compare(desired, observed) +// setupInstance prepares an instance for reconciliation by setting up necessary +// labels and managed state. +func (igr *instanceGraphReconciler) setupInstance(ctx context.Context, instance *unstructured.Unstructured) error { + patched, err := igr.setManaged(ctx, instance, instance.GetUID()) if err != nil { - resourceState.State = ResourceStateError - resourceState.Err = fmt.Errorf("failed to compare desired and observed states: %w", err) - return resourceState.Err - } - - // If no differences are found, the resource is in sync. - if len(differences) == 0 { - resourceState.State = ResourceStateSynced - igr.log.V(1).Info("No deltas found for resource", "resourceID", resourceID) - return nil + return err } - - // Proceed with the update, note that we don't need to handle each difference - // individually. We can apply all changes at once. - // - // NOTE(a-hilaly): are there any cases where we need to handle each difference individually? - igr.log.V(1).Info("Found deltas for resource", - "resourceID", resourceID, - "delta", differences, - ) - igr.instanceSubResourcesLabeler.ApplyLabels(desired) - - // Apply changes to the resource - // TODO: Handle annotations - desired.SetResourceVersion(observed.GetResourceVersion()) - desired.SetFinalizers(observed.GetFinalizers()) - _, err = rc.Update(ctx, desired, metav1.UpdateOptions{}) - if err != nil { - resourceState.State = ResourceStateError - resourceState.Err = fmt.Errorf("failed to update resource: %w", err) - return resourceState.Err + if patched != nil { + instance.Object = patched.Object } - - // Set state to UPDATING and requeue to check the update - resourceState.State = ResourceStateUpdating - return igr.delayedRequeue(fmt.Errorf("resource update in progress")) + return nil } // handleInstanceDeletion manages the deletion of an instance and its resources @@ -417,6 +369,18 @@ func (igr *instanceGraphReconciler) deleteResource(ctx context.Context, resource return igr.delayedRequeue(fmt.Errorf("resource deletion in progress")) } +// getResourceClient returns the appropriate dynamic client and namespace for a resource +func (igr *instanceGraphReconciler) getResourceClient(resourceID string) dynamic.ResourceInterface { + descriptor := igr.runtime.ResourceDescriptor(resourceID) + gvr := descriptor.GetGroupVersionResource() + namespace := igr.getResourceNamespace(resourceID) + + if descriptor.IsNamespaced() { + return igr.client.Resource(gvr).Namespace(namespace) + } + return igr.client.Resource(gvr) +} + // finalizeDeletion checks if all resources are deleted and removes the instance finalizer // if appropriate. func (igr *instanceGraphReconciler) finalizeDeletion(ctx context.Context) error { @@ -439,23 +403,43 @@ func (igr *instanceGraphReconciler) finalizeDeletion(ctx context.Context) error } // setManaged ensures the instance has the necessary finalizer and labels. -func (igr *instanceGraphReconciler) setManaged(ctx context.Context, obj *unstructured.Unstructured, _ types.UID) (*unstructured.Unstructured, error) { +func (igr *instanceGraphReconciler) setManaged( + ctx context.Context, + obj *unstructured.Unstructured, + _ types.UID, +) (*unstructured.Unstructured, error) { if exist, _ := metadata.HasInstanceFinalizerUnstructured(obj); exist { return obj, nil } igr.log.V(1).Info("Setting managed state", "name", obj.GetName(), "namespace", obj.GetNamespace()) - copy := obj.DeepCopy() - if err := metadata.SetInstanceFinalizerUnstructured(copy); err != nil { + instancePatch := &unstructured.Unstructured{} + instancePatch.SetUnstructuredContent(map[string]interface{}{ + "apiVersion": obj.GetAPIVersion(), + "kind": obj.GetKind(), + "metadata": map[string]interface{}{ + "name": obj.GetName(), + "namespace": obj.GetNamespace(), + "labels": obj.GetLabels(), + }, + }) + + err := unstructured.SetNestedStringSlice(instancePatch.Object, obj.GetFinalizers(), "metadata", "finalizers") + if err != nil { + return nil, fmt.Errorf("failed to copy existing finalizers to patch: %w", err) + } + + if err := metadata.SetInstanceFinalizerUnstructured(instancePatch); err != nil { return nil, fmt.Errorf("failed to set finalizer: %w", err) } - igr.instanceLabeler.ApplyLabels(copy) + igr.instanceLabeler.ApplyLabels(instancePatch) updated, err := igr.client.Resource(igr.gvr). Namespace(obj.GetNamespace()). - Update(ctx, copy, metav1.UpdateOptions{}) + Apply(ctx, instancePatch.GetName(), instancePatch, + metav1.ApplyOptions{FieldManager: FieldManagerForLabeler, Force: true}) if err != nil { return nil, fmt.Errorf("failed to update managed state: %w", err) } @@ -464,21 +448,34 @@ func (igr *instanceGraphReconciler) setManaged(ctx context.Context, obj *unstruc } // setUnmanaged removes the finalizer from the instance. -func (igr *instanceGraphReconciler) setUnmanaged(ctx context.Context, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { +func (igr *instanceGraphReconciler) setUnmanaged( + ctx context.Context, + obj *unstructured.Unstructured, +) (*unstructured.Unstructured, error) { if exist, _ := metadata.HasInstanceFinalizerUnstructured(obj); !exist { return obj, nil } igr.log.V(1).Info("Removing managed state", "name", obj.GetName(), "namespace", obj.GetNamespace()) - copy := obj.DeepCopy() - if err := metadata.RemoveInstanceFinalizerUnstructured(copy); err != nil { + instancePatch := &unstructured.Unstructured{} + instancePatch.SetUnstructuredContent(map[string]interface{}{ + "apiVersion": obj.GetAPIVersion(), + "kind": obj.GetKind(), + "metadata": map[string]interface{}{ + "name": obj.GetName(), + "namespace": obj.GetNamespace(), + }, + }) + instancePatch.SetFinalizers(obj.GetFinalizers()) + if err := metadata.RemoveInstanceFinalizerUnstructured(instancePatch); err != nil { return nil, fmt.Errorf("failed to remove finalizer: %w", err) } updated, err := igr.client.Resource(igr.gvr). Namespace(obj.GetNamespace()). - Update(ctx, copy, metav1.UpdateOptions{}) + Apply(ctx, instancePatch.GetName(), instancePatch, + metav1.ApplyOptions{FieldManager: FieldManagerForLabeler, Force: true}) if err != nil { return nil, fmt.Errorf("failed to update unmanaged state: %w", err) } diff --git a/pkg/controller/instance/controller_status.go b/pkg/controller/instance/controller_status.go index bfbed40d7..89ff42750 100644 --- a/pkg/controller/instance/controller_status.go +++ b/pkg/controller/instance/controller_status.go @@ -21,6 +21,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "github.com/kro-run/kro/api/v1alpha1" "github.com/kro-run/kro/pkg/requeue" @@ -100,9 +101,22 @@ func (igr *instanceGraphReconciler) patchInstanceStatus(ctx context.Context, sta instance := igr.runtime.GetInstance().DeepCopy() instance.Object["status"] = status - _, err := igr.client.Resource(igr.gvr). - Namespace(instance.GetNamespace()). - UpdateStatus(ctx, instance, metav1.UpdateOptions{}) + // We are using retry.RetryOnConflict to handle conflicts. + // This is because this method is called in a defer path and there is no way to return an error. + // TODO(barney-s): We should explore removing the defer path and returning an error. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + instance, err := igr.client.Resource(igr.gvr). + Namespace(instance.GetNamespace()). + Get(ctx, instance.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + instance.Object["status"] = status + _, err = igr.client.Resource(igr.gvr). + Namespace(instance.GetNamespace()). + UpdateStatus(ctx, instance, metav1.UpdateOptions{}) + return err + }) if err != nil { return fmt.Errorf("failed to update instance status: %w", err) diff --git a/pkg/controller/instance/delta/delta.go b/pkg/controller/instance/delta/delta.go deleted file mode 100644 index 8ad4d01c0..000000000 --- a/pkg/controller/instance/delta/delta.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2025 The Kubernetes Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package delta - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" -) - -// Difference represents a single field-level difference between two objects. -// Path is the full path to the differing field (e.g. "spec.containers[0].image") -// Desired and Observed contain the actual values that differ at that path. -type Difference struct { - // Path is the full path to the differing field (e.g. "spec.x.y.z" - Path string `json:"path"` - // Desired is the desired value at the path - Desired interface{} `json:"desired"` - // Observed is the observed value at the path - Observed interface{} `json:"observed"` -} - -// Compare takes desired and observed unstructured objects and returns a list of -// their differences. It performs a deep comparison while being aware of Kubernetes -// metadata specifics. The comparison: -// -// - Cleans metadata from both objects to avoid spurious differences -// - Walks object trees in parallel to find actual value differences -// - Builds path strings to precisely identify where differences occurs -// - Handles type mismatches, nil values, and empty vs nil collections -func Compare(desired, observed *unstructured.Unstructured) ([]Difference, error) { - desiredCopy := desired.DeepCopy() - observedCopy := observed.DeepCopy() - - cleanMetadata(desiredCopy) - cleanMetadata(observedCopy) - - var differences []Difference - walkCompare(desiredCopy.Object, observedCopy.Object, "", &differences) - return differences, nil -} - -// ignoredMetadataFields are Kubernetes metadata fields that should not trigger updates. -var ignoredMetadataFields = []string{ - "creationTimestamp", - "deletionTimestamp", - "generation", - "resourceVersion", - "selfLink", - "uid", - "managedFields", - "ownerReferences", - "finalizers", -} - -// cleanMetadata removes Kubernetes metadata fields that should not trigger updates -// like resourceVersion, creationTimestamp, etc. Also handles empty maps in -// annotations and labels. This ensures we don't detect spurious changes based on -// Kubernetes-managed fields. -func cleanMetadata(obj *unstructured.Unstructured) { - metadata, ok := obj.Object["metadata"].(map[string]interface{}) - if !ok { - // Maybe we should panic here, but for now just return - return - } - - if annotations, exists := metadata["annotations"].(map[string]interface{}); exists { - if len(annotations) == 0 { - delete(metadata, "annotations") - } - } - - if labels, exists := metadata["labels"].(map[string]interface{}); exists { - if len(labels) == 0 { - delete(metadata, "labels") - } - } - - for _, field := range ignoredMetadataFields { - delete(metadata, field) - } -} - -// walkCompare recursively compares desired and observed values, recording any -// differences found. It handles different types appropriately: -// - For maps: recursively compares all keys/values -// - For slices: checks length and recursively compares elements -// - For primitives: directly compares values -// -// Records a Difference if values don't match or are of different types. -func walkCompare(desired, observed interface{}, path string, differences *[]Difference) { - switch d := desired.(type) { - case map[string]interface{}: - e, ok := observed.(map[string]interface{}) - if !ok { - *differences = append(*differences, Difference{ - Path: path, - Observed: observed, - Desired: desired, - }) - return - } - walkMap(d, e, path, differences) - - case []interface{}: - e, ok := observed.([]interface{}) - if !ok { - *differences = append(*differences, Difference{ - Path: path, - Observed: observed, - Desired: desired, - }) - return - } - walkSlice(d, e, path, differences) - - default: - if desired != observed { - *differences = append(*differences, Difference{ - Path: path, - Observed: observed, - Desired: desired, - }) - } - } -} - -// walkMap compares two maps recursively. For each key in desired: -// -// - If key missing in observed: records a difference -// - If key exists: recursively compares values -func walkMap(desired, observed map[string]interface{}, path string, differences *[]Difference) { - for k, desiredVal := range desired { - newPath := k - if path != "" { - newPath = fmt.Sprintf("%s.%s", path, k) - } - - observedVal, exists := observed[k] - if !exists && desiredVal != nil { - *differences = append(*differences, Difference{ - Path: newPath, - Observed: nil, - Desired: desiredVal, - }) - continue - } - - walkCompare(desiredVal, observedVal, newPath, differences) - } -} - -// walkSlice compares two slices recursively: -// - If lengths differ: records entire slice as different -// - If lengths match: recursively compares elements -func walkSlice(desired, observed []interface{}, path string, differences *[]Difference) { - if len(desired) != len(observed) { - *differences = append(*differences, Difference{ - Path: path, - Observed: observed, - Desired: desired, - }) - return - } - - for i := range desired { - newPath := fmt.Sprintf("%s[%d]", path, i) - walkCompare(desired[i], observed[i], newPath, differences) - } -} diff --git a/pkg/controller/instance/delta/delta_test.go b/pkg/controller/instance/delta/delta_test.go deleted file mode 100644 index 2c5122a3f..000000000 --- a/pkg/controller/instance/delta/delta_test.go +++ /dev/null @@ -1,461 +0,0 @@ -// Copyright 2025 The Kubernetes Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package delta - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" -) - -func TestCompare_Simple(t *testing.T) { - desired := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "replicas": int64(3), - "template": map[string]interface{}{ - "spec": map[string]interface{}{ - "containers": []interface{}{ - map[string]interface{}{ - "name": "app", - "image": "nginx:1.19", - }, - }, - }, - }, - }, - }, - } - - observed := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "replicas": int64(2), - "template": map[string]interface{}{ - "spec": map[string]interface{}{ - "containers": []interface{}{ - map[string]interface{}{ - "name": "app", - "image": "nginx:1.18", - }, - }, - }, - }, - }, - }, - } - - differences, err := Compare(desired, observed) - assert.NoError(t, err) - - // Find differences by path - replicasDiff := findDiffByPath(differences, "spec.replicas") - assert.NotNil(t, replicasDiff) - assert.Equal(t, int64(2), replicasDiff.Observed) - assert.Equal(t, int64(3), replicasDiff.Desired) - - imageDiff := findDiffByPath(differences, "spec.template.spec.containers[0].image") - assert.NotNil(t, imageDiff) - assert.Equal(t, "nginx:1.18", imageDiff.Observed) - assert.Equal(t, "nginx:1.19", imageDiff.Desired) -} - -func TestCompare_Arrays(t *testing.T) { - tests := []struct { - name string - desired []interface{} - observed []interface{} - expectDiff bool - expectedPath string - expectedOld interface{} - expectedNew interface{} - }{ - { - name: "different lengths", - desired: []interface{}{int64(1), int64(2), int64(3)}, - observed: []interface{}{int64(1), int64(2)}, - expectDiff: true, - expectedPath: "spec.numbers", - expectedOld: []interface{}{int64(1), int64(2)}, - expectedNew: []interface{}{int64(1), int64(2), int64(3)}, - }, - { - name: "same content", - desired: []interface{}{int64(1), int64(2), int64(3)}, - observed: []interface{}{int64(1), int64(2), int64(3)}, - expectDiff: false, - }, - { - name: "different content same length", - desired: []interface{}{int64(1), int64(2), int64(3)}, - observed: []interface{}{int64(1), int64(2), int64(4)}, - expectDiff: true, - expectedPath: "spec.numbers[2]", - expectedOld: int64(4), - expectedNew: int64(3), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - desired := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "numbers": tt.desired, - }, - }, - } - observed := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "numbers": tt.observed, - }, - }, - } - - differences, err := Compare(desired, observed) - assert.NoError(t, err) - - if tt.expectDiff { - diff := findDiffByPath(differences, tt.expectedPath) - assert.NotNil(t, diff) - assert.Equal(t, tt.expectedOld, diff.Observed) - assert.Equal(t, tt.expectedNew, diff.Desired) - } else { - assert.Empty(t, differences) - } - }) - } -} - -func TestCompare_NewField(t *testing.T) { - desired := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "newField": "value", - }, - }, - } - - observed := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{}, - }, - } - - differences, err := Compare(desired, observed) - assert.NoError(t, err) - - diff := findDiffByPath(differences, "spec.newField") - assert.NotNil(t, diff) - assert.Nil(t, diff.Observed) - assert.Equal(t, "value", diff.Desired) -} - -func findDiffByPath(diffs []Difference, path string) *Difference { - for _, diff := range diffs { - if diff.Path == path { - return &diff - } - } - return nil -} - -func TestCompare_Comprehensive(t *testing.T) { - desired := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "name": "test", - "labels": map[string]interface{}{ - "env": "prod", - "new": "label", - }, - }, - "spec": map[string]interface{}{ - "replicas": int64(3), - "selector": map[string]interface{}{ - "app": "test", - }, - "template": map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "app": "test", - }, - }, - "spec": map[string]interface{}{ - "containers": []interface{}{ - map[string]interface{}{ - "name": "app", - "image": "nginx:1.19", - "env": []interface{}{ - map[string]interface{}{ - "name": "DEBUG", - "value": "true", - }, - map[string]interface{}{ - "name": "PORT", - "value": "8080", - }, - }, - }, - }, - "volumes": []interface{}{ - map[string]interface{}{ - "name": "config", - "configMap": map[string]interface{}{ - "name": "app-config", - }, - }, - }, - }, - }, - }, - }, - } - - observed := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "metadata": map[string]interface{}{ - "name": "test", - "labels": map[string]interface{}{ - "env": "dev", // changed value - "old": "obsolete", // removed label - }, - "resourceVersion": "12345", // should be ignored - "creationTimestamp": "NOTNIL", - }, - "spec": map[string]interface{}{ - "replicas": int64(2), // changed value - "selector": map[string]interface{}{ - "app": "test", // unchanged - }, - "template": map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "app": "test", - }, - }, - "spec": map[string]interface{}{ - "containers": []interface{}{ - map[string]interface{}{ - "name": "app", - "image": "nginx:1.18", // changed value - "env": []interface{}{ - map[string]interface{}{ - "name": "DEBUG", - "value": "false", // changed value - }, - // removed env var PORT - }, - "resources": map[string]interface{}{ - "limits": map[string]interface{}{ - "cpu": "100m", - }, - }, - }, - }, - }, - }, - }, - }, - } - - differences, err := Compare(desired, observed) - assert.NoError(t, err) - - expectedChanges := map[string]struct { - old interface{} - new interface{} - }{ - "metadata.labels.env": {old: "dev", new: "prod"}, - "metadata.labels.new": {old: nil, new: "label"}, - "spec.replicas": {old: int64(2), new: int64(3)}, - "spec.template.spec.containers[0].image": {old: "nginx:1.18", new: "nginx:1.19"}, - "spec.template.spec.containers[0].env": { - old: []interface{}{ - map[string]interface{}{ - "name": "DEBUG", - "value": "false", - }, - }, - new: []interface{}{ - map[string]interface{}{ - "name": "DEBUG", - "value": "true", - }, - map[string]interface{}{ - "name": "PORT", - "value": "8080", - }, - }, - }, - "spec.template.spec.volumes": { - old: nil, - new: []interface{}{ - map[string]interface{}{ - "name": "config", - "configMap": map[string]interface{}{ - "name": "app-config", - }, - }, - }, - }, - } - - assert.Equal(t, len(expectedChanges), len(differences), "number of differences should match") - - for _, diff := range differences { - expected, ok := expectedChanges[diff.Path] - assert.True(t, ok, "unexpected difference at path: %s", diff.Path) - assert.Equal(t, expected.old, diff.Observed, "old value mismatch at path: %s", diff.Path) - assert.Equal(t, expected.new, diff.Desired, "new value mismatch at path: %s", diff.Path) - } -} - -func TestCompare_EmptyMaps(t *testing.T) { - tests := []struct { - name string - desired *unstructured.Unstructured - observed *unstructured.Unstructured - wantDiff bool - }{ - { - name: "empty maps in spec should not diff", - desired: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": map[string]interface{}{}, - }, - }, - }, - observed: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": map[string]interface{}{}, - }, - }, - }, - wantDiff: false, - }, - { - name: "empty map in desired, no field in observed should diff", - desired: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": map[string]interface{}{}, - }, - }, - }, - observed: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{}, - }, - }, - wantDiff: true, - }, - { - name: "nil map in desired, no field in observed should not diff", - desired: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": nil, - }, - }, - }, - observed: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{}, - }, - }, - wantDiff: false, - }, - { - name: "non-empty map should diff when values differ", - desired: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": map[string]interface{}{ - "environment": "prod", - }, - }, - }, - }, - observed: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": map[string]interface{}{ - "environment": "dev", - }, - }, - }, - }, - wantDiff: true, - }, - { - name: "nested empty maps should diff", - desired: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "config": map[string]interface{}{ - "settings": map[string]interface{}{}, - }, - }, - }, - }, - observed: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "config": map[string]interface{}{}, - }, - }, - }, - wantDiff: true, - }, - { - name: "non-empty map vs nil map should diff", - desired: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": map[string]interface{}{ - "environment": "prod", - }, - }, - }, - }, - observed: &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "tags": nil, - }, - }, - }, - wantDiff: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - differences, err := Compare(tt.desired, tt.observed) - assert.NoError(t, err) - if tt.wantDiff { - assert.NotEmpty(t, differences) - } else { - assert.Empty(t, differences, - "expected no differences but got: %+v", differences) - } - }) - } -} diff --git a/pkg/controller/instance/instance_state.go b/pkg/controller/instance/instance_state.go index 5b42573b6..5c2e58a11 100644 --- a/pkg/controller/instance/instance_state.go +++ b/pkg/controller/instance/instance_state.go @@ -14,6 +14,8 @@ package instance +import "errors" + const ( InstanceStateInProgress = "IN_PROGRESS" InstanceStateFailed = "FAILED" @@ -48,3 +50,13 @@ type InstanceState struct { // Any error encountered during reconciliation ReconcileErr error } + +func (s *InstanceState) ResourceErrors() error { + errorsSeen := []error{} + for _, resourceState := range s.ResourceStates { + if resourceState.State == ResourceStateError { + errorsSeen = append(errorsSeen, resourceState.Err) + } + } + return errors.Join(errorsSeen...) +} diff --git a/pkg/controller/resourcegraphdefinition/controller.go b/pkg/controller/resourcegraphdefinition/controller.go index 4e067128a..a8ab2c75b 100644 --- a/pkg/controller/resourcegraphdefinition/controller.go +++ b/pkg/controller/resourcegraphdefinition/controller.go @@ -48,6 +48,7 @@ type ResourceGraphDefinitionReconciler struct { // Client and instanceLogger are set with SetupWithManager client.Client + instanceLogger logr.Logger clientSet kroclient.SetInterface @@ -82,6 +83,7 @@ func NewResourceGraphDefinitionReconciler( // SetupWithManager sets up the controller with the Manager. func (r *ResourceGraphDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error { r.Client = mgr.GetClient() + r.clientSet.SetRESTMapper(mgr.GetRESTMapper()) r.instanceLogger = mgr.GetLogger() logConstructor := func(req *reconcile.Request) logr.Logger { diff --git a/pkg/controller/resourcegraphdefinition/controller_reconcile.go b/pkg/controller/resourcegraphdefinition/controller_reconcile.go index cc8663370..16b05fd87 100644 --- a/pkg/controller/resourcegraphdefinition/controller_reconcile.go +++ b/pkg/controller/resourcegraphdefinition/controller_reconcile.go @@ -34,7 +34,10 @@ import ( // 1. Processing the resource graph // 2. Ensuring CRDs are present // 3. Setting up and starting the microcontroller -func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(ctx context.Context, rgd *v1alpha1.ResourceGraphDefinition) ([]string, []v1alpha1.ResourceInformation, error) { +func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition( + ctx context.Context, + rgd *v1alpha1.ResourceGraphDefinition, +) ([]string, []v1alpha1.ResourceInformation, error) { log := ctrl.LoggerFrom(ctx) mark := NewConditionsMarkerFor(rgd) @@ -71,7 +74,8 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(ctx // Setup and start microcontroller gvr := processedRGD.Instance.GetGroupVersionResource() - controller := r.setupMicroController(gvr, processedRGD, rgd.Spec.DefaultServiceAccounts, graphExecLabeler) + controller := r.setupMicroController(gvr, processedRGD, + rgd.Spec.DefaultServiceAccounts, graphExecLabeler) log.V(1).Info("reconciling resource graph definition micro controller") // TODO: the context that is passed here is tied to the reconciliation of the rgd, we might need to make @@ -115,6 +119,7 @@ func (r *ResourceGraphDefinitionReconciler) setupMicroController( gvr, processedRGD, r.clientSet, + r.clientSet.RESTMapper(), defaultSVCs, labeler, ) diff --git a/test/e2e/chainsaw/check-rgd-deletion/chainsaw-test.yaml b/test/e2e/chainsaw/check-rgd-deletion/chainsaw-test.yaml index 3e3621353..c0e474721 100644 --- a/test/e2e/chainsaw/check-rgd-deletion/chainsaw-test.yaml +++ b/test/e2e/chainsaw/check-rgd-deletion/chainsaw-test.yaml @@ -42,7 +42,16 @@ spec: podLogs: selector: app.kubernetes.io/instance=kro namespace: kro-system + finally: + - description: sleep operation + sleep: + duration: 20s # wait for the RGD to be deleted. Running the test with 5s is flaky (passes locally but fails in CI) - name: check-that-crd-is-deleted try: - error: file: instancecrd-assert.yaml + catch: + - description: kro controller pod logs collector + podLogs: + selector: app.kubernetes.io/instance=kro + namespace: kro-system