Skip to content

nodeTopology: de-dup enqueue when there is no label change #856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/nodeipam/ipam/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_test(
"//vendor/k8s.io/api/core/v1:core",
"//vendor/k8s.io/apimachinery/pkg/api/resource",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:meta",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema",
"//vendor/k8s.io/apimachinery/pkg/util/wait",
"//vendor/k8s.io/client-go/informers",
"//vendor/k8s.io/client-go/informers/core/v1:core",
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nodetopologyQueue.Enqueue(newNode)
_, oldNodeLabel := getNodeSubnetLabel(oldNode)
_, newNodeLabel := getNodeSubnetLabel(newNode)
if oldNodeLabel != newNodeLabel {
nodetopologyQueue.Enqueue(newNode)
} else {
klog.InfoS("Node subnet label does not change, skip enqueue item, label key: cloud.google.com/gke-node-pool-subnet", "node name", newNode.GetName(), "old node label", oldNodeLabel, "new node label", newNodeLabel)
}
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
Expand Down
82 changes: 70 additions & 12 deletions pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtimeSchema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestNodeTopologyQueuePeriodicSync(t *testing.T) {
}
}

func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
func TestNodeTopologyCR_AddOrUpdateNode(t *testing.T) {
testClusterValues := gce.DefaultTestClusterValues()
testClusterValues.SubnetworkURL = exampleSubnetURL
fakeGCE := gce.NewFakeGCECloud(testClusterValues)
Expand All @@ -210,7 +211,7 @@ func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
},
},
}
fakeClient := fake.NewSimpleClientset(defaultnode, mscnode)
fakeClient := fake.NewSimpleClientset(defaultnode)
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, time.Second)
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()

Expand All @@ -230,7 +231,8 @@ func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
go cloudAllocator.Run(wait.NeverStop)

// TODO: Fix node_topology_syncer addOrUpdate should add default subnet regardless of nodes ordering on the informer
fakeNodeInformer.Informer().GetStore().Add(mscnode)
time.Sleep(time.Millisecond * 500)
fakeClient.Tracker().Add(mscnode)
expectedSubnets := []string{"subnet-def", "subnet1"}
i := 0
for i < 5 {
Expand Down Expand Up @@ -267,9 +269,40 @@ func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
if i >= 5 {
t.Fatalf("AddOrUpdate node topology CRD not working as expected")
}
// Node subnet label should be immutable, update it just to test update node path
mscnode2.ObjectMeta.Labels[testNodePoolSubnetLabelPrefix] = "subnet3"
// TODO: automatically get gvr instead of hardcode
gvr := runtimeSchema.GroupVersionResource{
Version: "v1",
Resource: "nodes",
}
fakeClient.Tracker().Update(gvr, mscnode2, mscnode2.GetNamespace(), metav1.UpdateOptions{})
expectedSubnets = []string{"subnet-def", "subnet1", "subnet2", "subnet3"}
i = 0
for i < 5 {
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); ok {
break
} else {
time.Sleep(time.Millisecond * 500)
i++
}
}
if i >= 5 {
t.Fatalf("UpdateNode with different subnet lable should not dedup when enqueueing")
}
// Reset nodetopology just for test update node de-dup when the label didn't change
nodeTopologyClient.NetworkingV1().NodeTopologies().UpdateStatus(context.TODO(), ensuredNodeTopologyCR, metav1.UpdateOptions{})
// Update the node w/o changing node pool subnet label should de-dup, not enqueue
mscnode2.ObjectMeta.Labels[testNodePoolSubnetLabelPrefix] = "subnet3"
fakeClient.Tracker().Update(gvr, mscnode2, mscnode2.GetNamespace(), metav1.UpdateOptions{})
time.Sleep(time.Millisecond * 500)
expectedSubnets = []string{}
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); !ok {
t.Fatalf("UpdateNode with the same subnet lable should dedup when enqueueing")
}
}

func TestNodeTopologyCR_DELETION(t *testing.T) {
func TestNodeTopologyCR_DeleteNode(t *testing.T) {
testClusterValues := gce.DefaultTestClusterValues()
testClusterValues.SubnetworkURL = exampleSubnetURL
fakeGCE := gce.NewFakeGCECloud(testClusterValues)
Expand All @@ -279,15 +312,12 @@ func TestNodeTopologyCR_DELETION(t *testing.T) {
nwInformer := nwInfFactory.V1().Networks()
gnpInformer := nwInfFactory.V1().GKENetworkParamSets()

mscnode := &v1.Node{
defaultnode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "testNode",
Labels: map[string]string{
testNodePoolSubnetLabelPrefix: "subnet1",
},
Name: "nodeTopologyDefautNode",
},
}
fakeClient := fake.NewSimpleClientset()
fakeClient := fake.NewSimpleClientset(defaultnode)
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, time.Second)
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()

Expand All @@ -306,9 +336,17 @@ func TestNodeTopologyCR_DELETION(t *testing.T) {
fakeInformerFactory.Start(wait.NeverStop)
go cloudAllocator.Run(wait.NeverStop)

fakeNodeInformer.Informer().GetStore().Add(mscnode)
mscnode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "testNode",
Labels: map[string]string{
testNodePoolSubnetLabelPrefix: "subnet1",
},
},
}
fakeClient.Tracker().Add(mscnode)

expectedSubnets := []string{"subnet-def"}
expectedSubnets := []string{"subnet-def", "subnet1"}
i := 0
for i < 5 {
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); ok {
Expand All @@ -318,6 +356,26 @@ func TestNodeTopologyCR_DELETION(t *testing.T) {
i++
}
}
if i >= 5 {
t.Fatalf("Add node topology CR not working as expected")
}
// TODO: automatically get gvr instead of using hardcoded value
gvr := runtimeSchema.GroupVersionResource{
Version: "v1",
Resource: "nodes",
}
fakeClient.Tracker().Delete(gvr, mscnode.GetNamespace(), mscnode.GetName(), metav1.DeleteOptions{})

expectedSubnets = []string{"subnet-def"}
i = 0
for i < 5 {
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); ok {
break
} else {
time.Sleep(time.Millisecond * 500)
i++
}
}
if i >= 5 {
t.Fatalf("Delete node topology CR not working as expected")
}
Expand Down