From 982ee7a36d345deeafd785489f321b59f3bd6f1b Mon Sep 17 00:00:00 2001 From: Daniel Vassdal Date: Fri, 14 Mar 2025 13:23:05 +0100 Subject: [PATCH] fix: Don't try watching items without resourceVersion Trying to watch items without resourceVersion will fail. Instead of failing and retrying every second, wait until the next resync interval and try again then. This reduces load and log spam. Signed-off-by: Daniel Vassdal --- pkg/cache/cluster.go | 23 ++++ pkg/cache/cluster_test.go | 221 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 626c70e81..38ebcdbff 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -666,6 +666,29 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo } } + // If the resourceVersion is still missing, watchutil.NewRetryWatcher will fail. + // https://github.com/kubernetes/client-go/blob/78d2af792babf2dd937ba2e2a8d99c753a5eda89/tools/watch/retrywatcher.go#L68-L71 + // Instead, let's just check if the resourceVersion exists at the next resync ... + if resourceVersion == "" { + c.log.V(1).Info(fmt.Sprintf("Ignoring watch for %s on %s due to missing resourceVersion", api.GroupKind, c.config.Host)) + + var watchResyncTimeoutCh <-chan time.Time + if c.watchResyncTimeout > 0 { + shouldResync := time.NewTimer(c.watchResyncTimeout) + defer shouldResync.Stop() + watchResyncTimeoutCh = shouldResync.C + } + + for { + select { + case <-ctx.Done(): + return nil + case <-watchResyncTimeoutCh: + return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host) + } + } + } + w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { res, err := resClient.Watch(ctx, options) diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 893c6b877..73f1b92dd 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -1,9 +1,13 @@ package cache import ( + "bufio" + "bytes" "context" "errors" "fmt" + "io" + "regexp" "sort" "strings" "sync" @@ -29,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" + "k8s.io/klog/v2/textlogger" "sigs.k8s.io/yaml" "github.com/argoproj/gitops-engine/pkg/utils/kube" @@ -1393,3 +1398,219 @@ func BenchmarkIterateHierarchyV2(b *testing.B) { // }) // } //} + +type syncedBuffer struct { + mutex sync.Mutex + buf bytes.Buffer +} + +func (lb *syncedBuffer) Read(p []byte) (n int, err error) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + return lb.buf.Read(p) +} + +func (lb *syncedBuffer) Write(p []byte) (n int, err error) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + return lb.buf.Write(p) +} + +func Test_watchEvents_Missing_resourceVersion(t *testing.T) { + objExample := &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "apiservice.example.com/v1", + "kind": "Example", + "metadata": map[string]any{ + "name": "example", + }, + }} + + testCases := []struct { + name string + objs []runtime.Object + funAssert func(t *testing.T, logLines []string) + waitForLogLines []string + waitForLogExtra time.Duration + watchResyncTimeout time.Duration + }{ + { + name: "Should_ignore_resource_without_resourceVersion", + objs: []runtime.Object{objExample}, + waitForLogLines: []string{"Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion"}, + funAssert: func(t *testing.T, logLines []string) { + t.Helper() + require.NotContains(t, logLines, "Resyncing Example.apiservice.example.com on https://test due to timeout") + }, + watchResyncTimeout: defaultWatchResyncTimeout, + waitForLogExtra: 0 * time.Millisecond, + }, + { + name: "Should_not_ignore_resource_with_resourceVersion", + objs: []runtime.Object{testDeploy()}, + waitForLogLines: []string{"Start watch Deployment.apps on https://test"}, + funAssert: func(t *testing.T, logLines []string) { + t.Helper() + require.NotContains(t, logLines, "Ignoring watch for Deployment.apps on https://test due to missing resourceVersion") + }, + watchResyncTimeout: defaultWatchResyncTimeout, + waitForLogExtra: 100 * time.Millisecond, + }, + { + name: "Should_retry_ignored_resource_on_next_resync", + objs: []runtime.Object{objExample}, + waitForLogLines: []string{"Failed to watch Example.apiservice.example.com on https://test: Resyncing Example.apiservice.example.com on https://test due to timeout, retrying in 1s"}, + funAssert: func(t *testing.T, logLines []string) { + t.Helper() + require.Contains(t, logLines, "Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion") + }, + watchResyncTimeout: 10 * time.Millisecond, + waitForLogExtra: 100 * time.Millisecond, + }, + } + + readLinesUntil := func(ctx context.Context, buf io.Reader, wantedLines []string, readExtra time.Duration) ([]string, error) { + wantedStatuses := map[string]bool{} + for _, wantedLine := range wantedLines { + wantedStatuses[strings.TrimSuffix(wantedLine, "\r\n")] = false + } + + var logLines []string + readChan := make(chan any) + go func() { + lineRgx := regexp.MustCompile(`^.+?\s+\d+\s+.+?\.go:(?:\d+?|\d+?)\]\s+"(?P.+)"$`) + + for { + scanner := bufio.NewScanner(buf) + for scanner.Scan() { + match := lineRgx.FindStringSubmatch(scanner.Text()) + readChan <- match[1] + } + + if scanner.Err() != nil { + readChan <- scanner.Err() + return + } + + // EOF. Waiting for data. + time.Sleep(50 * time.Millisecond) + } + }() + + var readExtraTimer *time.Timer + var readExtraTimeoutChan <-chan time.Time + + for { + select { + case <-readExtraTimeoutChan: + return logLines, ctx.Err() + case <-ctx.Done(): + return logLines, ctx.Err() + case read := <-readChan: + if err, ok := read.(error); ok { + return logLines, err + } + + // EOF + if read == nil { + return logLines, nil + } + + logLines = append(logLines, read.(string)) + if readExtraTimer != nil { + continue + } + + line := read.(string) + if _, ok := wantedStatuses[line]; ok { + wantedStatuses[line] = true + + done := true + for _, ok := range wantedStatuses { + if !ok { + done = false + } + } + + if done { + readExtraTimer = time.NewTimer(readExtra) + readExtraTimeoutChan = readExtraTimer.C + } + } + } + } + } + + createCluster := func(opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache { + client := fake.NewSimpleDynamicClientWithCustomListKinds(scheme.Scheme, + map[schema.GroupVersionResource]string{ + {Group: "apiservice.example.com", Version: "v1", Resource: "examples"}: "ExampleList", + }, + objs...) + reactor := client.ReactionChain[0] + client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + handled, ret, err = reactor.React(action) + if err != nil || !handled { + return + } + + // The apiservice.example.com group is for testing missing resourceVersion, so we omit setting it for those responses. + retList, ok := ret.(*unstructured.UnstructuredList) + if ok && len(retList.Items) > 0 && retList.Items[0].GetObjectKind().GroupVersionKind().Group == "apiservice.example.com" { + return + } + + // make sure retList response have resource version + ret.(metav1.ListInterface).SetResourceVersion("123") + return + }) + + apiResources := []kube.APIResourceInfo{{ + GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, + Meta: metav1.APIResource{Namespaced: true}, + }, { + GroupKind: schema.GroupKind{Group: "apiservice.example.com", Kind: "Example"}, + GroupVersionResource: schema.GroupVersionResource{Group: "apiservice.example.com", Version: "v1", Resource: "examples"}, + Meta: metav1.APIResource{Namespaced: false}, + }} + + opts = append([]UpdateSettingsFunc{ + SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}), + }, opts...) + + cache := NewClusterCache( + &rest.Config{Host: "https://test"}, + opts..., + ) + return cache + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Second) + defer ctxCancel() + + var logBuffer syncedBuffer + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&logBuffer), textlogger.Verbosity(1), textlogger.FixedTime(time.Unix(0, 0)))) + + cluster := createCluster([]UpdateSettingsFunc{ + SetLogr(logger), + SetWatchResyncTimeout(testCase.watchResyncTimeout), + }, testCase.objs...) + + defer func() { + cluster.Invalidate() + }() + + err := cluster.EnsureSynced() + require.NoError(t, err) + + logLines, err := readLinesUntil(ctx, &logBuffer, testCase.waitForLogLines, testCase.waitForLogExtra) + require.NoError(t, err) + testCase.funAssert(t, logLines) + for _, wantedLogLine := range testCase.waitForLogLines { + require.Contains(t, logLines, wantedLogLine) + } + }) + } +}