Skip to content

Commit 3e5f750

Browse files
committed
fix: Don't try watching items without resourceVersion
Trying to watch items without resourceVersion will fail. Instead of failing and retrying every second, wait until the next resync interval and try again then. This reduces load and log spam. Signed-off-by: Daniel Vassdal <daniel@vassdal.org>
1 parent d65e9d9 commit 3e5f750

File tree

2 files changed

+226
-0
lines changed

2 files changed

+226
-0
lines changed

pkg/cache/cluster.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,29 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
666666
}
667667
}
668668

669+
// If the resourceVersion is still missing, watchutil.NewRetryWatcher will fail.
670+
// https://github.com/kubernetes/client-go/blob/78d2af792babf2dd937ba2e2a8d99c753a5eda89/tools/watch/retrywatcher.go#L68-L71
671+
// Instead, let's just check if the resourceVersion exists at the next resync ...
672+
if resourceVersion == "" {
673+
c.log.V(1).Info(fmt.Sprintf("Ignoring watch for %s on %s due to missing resourceVersion", api.GroupKind, c.config.Host))
674+
675+
var watchResyncTimeoutCh <-chan time.Time
676+
if c.watchResyncTimeout > 0 {
677+
shouldResync := time.NewTimer(c.watchResyncTimeout)
678+
defer shouldResync.Stop()
679+
watchResyncTimeoutCh = shouldResync.C
680+
}
681+
682+
for {
683+
select {
684+
case <-ctx.Done():
685+
return nil
686+
case <-watchResyncTimeoutCh:
687+
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)
688+
}
689+
}
690+
}
691+
669692
w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
670693
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
671694
res, err := resClient.Watch(ctx, options)

pkg/cache/cluster_test.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package cache
22

33
import (
4+
"bufio"
5+
"bytes"
46
"context"
57
"errors"
68
"fmt"
9+
"io"
10+
"k8s.io/klog/v2/textlogger"
11+
"regexp"
712
"sort"
813
"strings"
914
"sync"
@@ -1393,3 +1398,201 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
13931398
// })
13941399
// }
13951400
//}
1401+
func Test_watchEvents_Missing_resourceVersion(t *testing.T) {
1402+
1403+
objExample := &unstructured.Unstructured{Object: map[string]any{
1404+
"apiVersion": "apiservice.example.com/v1",
1405+
"kind": "Example",
1406+
"metadata": map[string]any{
1407+
"name": "example",
1408+
},
1409+
}}
1410+
1411+
testCases := []struct {
1412+
name string
1413+
objs []runtime.Object
1414+
funAssert func(t *testing.T, logLines []string)
1415+
waitForLogLines []string
1416+
waitForLogExtra time.Duration
1417+
watchResyncTimeout time.Duration
1418+
}{
1419+
{
1420+
name: "Should_ignore_resource_without_resourceVersion",
1421+
objs: []runtime.Object{objExample},
1422+
waitForLogLines: []string{"Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion"},
1423+
funAssert: func(t *testing.T, logLines []string) {
1424+
require.NotContains(t, logLines, fmt.Sprintf("Resyncing Example.apiservice.example.com on https://test due to timeout"))
1425+
},
1426+
watchResyncTimeout: defaultWatchResyncTimeout,
1427+
waitForLogExtra: 0 * time.Millisecond,
1428+
},
1429+
{
1430+
name: "Should_not_ignore_resource_with_resourceVersion",
1431+
objs: []runtime.Object{testDeploy()},
1432+
waitForLogLines: []string{"Start watch Deployment.apps on https://test"},
1433+
funAssert: func(t *testing.T, logLines []string) {
1434+
require.NotContains(t, logLines, fmt.Sprintf("Ignoring watch for Deployment.apps on https://test due to missing resourceVersion"))
1435+
},
1436+
watchResyncTimeout: defaultWatchResyncTimeout,
1437+
waitForLogExtra: 100 * time.Millisecond,
1438+
},
1439+
{
1440+
name: "Should_retry_ignored_resource_on_next_resync",
1441+
objs: []runtime.Object{objExample},
1442+
waitForLogLines: []string{"Failed to watch Example.apiservice.example.com on https://test: Resyncing Example.apiservice.example.com on https://test due to timeout, retrying in 1s"},
1443+
funAssert: func(t *testing.T, logLines []string) {
1444+
require.Contains(t, logLines, fmt.Sprintf("Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion"))
1445+
},
1446+
watchResyncTimeout: 10 * time.Millisecond,
1447+
waitForLogExtra: 100 * time.Millisecond,
1448+
},
1449+
}
1450+
1451+
readLinesUntil := func(ctx context.Context, buf io.Reader, wantedLines []string, readExtra time.Duration) ([]string, error) {
1452+
wantedStatuses := map[string]bool{}
1453+
for _, wantedLine := range wantedLines {
1454+
wantedStatuses[strings.TrimSuffix(wantedLine, "\r\n")] = false
1455+
}
1456+
1457+
var logLines []string
1458+
readChan := make(chan interface{})
1459+
go func() {
1460+
lineRgx := regexp.MustCompile(`(?ms)^.+?01:00:00\.000000\s+\d+\s+.+?\.go:\d+\]\s+"(?P<msg>.+)"$`)
1461+
1462+
for {
1463+
scanner := bufio.NewScanner(buf)
1464+
for scanner.Scan() {
1465+
match := lineRgx.FindStringSubmatch(scanner.Text())
1466+
readChan <- match[1]
1467+
}
1468+
1469+
if scanner.Err() != nil {
1470+
readChan <- scanner.Err()
1471+
return
1472+
}
1473+
1474+
// EOF. Waiting for data.
1475+
time.Sleep(50 * time.Millisecond)
1476+
}
1477+
1478+
readChan <- nil
1479+
}()
1480+
1481+
var readExtraTimer *time.Timer
1482+
var readExtraTimeoutChan <-chan time.Time
1483+
1484+
for {
1485+
select {
1486+
case <-readExtraTimeoutChan:
1487+
return logLines, ctx.Err()
1488+
case <-ctx.Done():
1489+
return logLines, ctx.Err()
1490+
case read := <-readChan:
1491+
if err, ok := read.(error); ok {
1492+
return logLines, err
1493+
}
1494+
1495+
// EOF
1496+
if read == nil {
1497+
return logLines, nil
1498+
}
1499+
1500+
logLines = append(logLines, read.(string))
1501+
if readExtraTimer != nil {
1502+
continue
1503+
}
1504+
1505+
line := read.(string)
1506+
if _, ok := wantedStatuses[line]; ok {
1507+
wantedStatuses[line] = true
1508+
1509+
done := true
1510+
for _, ok := range wantedStatuses {
1511+
if !ok {
1512+
done = false
1513+
}
1514+
}
1515+
1516+
if done {
1517+
readExtraTimer = time.NewTimer(readExtra)
1518+
readExtraTimeoutChan = readExtraTimer.C
1519+
}
1520+
}
1521+
}
1522+
}
1523+
}
1524+
1525+
createCluster := func(opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache {
1526+
client := fake.NewSimpleDynamicClientWithCustomListKinds(scheme.Scheme,
1527+
map[schema.GroupVersionResource]string{
1528+
{Group: "apiservice.example.com", Version: "v1", Resource: "examples"}: "ExampleList",
1529+
},
1530+
objs...)
1531+
reactor := client.ReactionChain[0]
1532+
client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
1533+
handled, ret, err = reactor.React(action)
1534+
if err != nil || !handled {
1535+
return
1536+
}
1537+
1538+
// The apiservice.example.com group is for testing missing resourceVersion, so we omit setting it for those responses.
1539+
retList, ok := ret.(*unstructured.UnstructuredList)
1540+
if ok && len(retList.Items) > 0 && retList.Items[0].GetObjectKind().GroupVersionKind().Group == "apiservice.example.com" {
1541+
return
1542+
}
1543+
1544+
// make sure retList response have resource version
1545+
ret.(metav1.ListInterface).SetResourceVersion("123")
1546+
return
1547+
})
1548+
1549+
apiResources := []kube.APIResourceInfo{{
1550+
GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"},
1551+
GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
1552+
Meta: metav1.APIResource{Namespaced: true},
1553+
}, {
1554+
GroupKind: schema.GroupKind{Group: "apiservice.example.com", Kind: "Example"},
1555+
GroupVersionResource: schema.GroupVersionResource{Group: "apiservice.example.com", Version: "v1", Resource: "examples"},
1556+
Meta: metav1.APIResource{Namespaced: false},
1557+
}}
1558+
1559+
opts = append([]UpdateSettingsFunc{
1560+
SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}),
1561+
}, opts...)
1562+
1563+
cache := NewClusterCache(
1564+
&rest.Config{Host: "https://test"},
1565+
opts...,
1566+
)
1567+
return cache
1568+
}
1569+
1570+
for _, testCase := range testCases {
1571+
t.Run(testCase.name, func(t *testing.T) {
1572+
ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
1573+
1574+
var logBuffer bytes.Buffer
1575+
logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&logBuffer), textlogger.Verbosity(1), textlogger.FixedTime(time.Unix(0, 0))))
1576+
1577+
cluster := createCluster([]UpdateSettingsFunc{
1578+
SetLogr(logger),
1579+
SetWatchResyncTimeout(testCase.watchResyncTimeout),
1580+
}, testCase.objs...)
1581+
1582+
defer func() {
1583+
cluster.Invalidate()
1584+
}()
1585+
1586+
err := cluster.EnsureSynced()
1587+
require.NoError(t, err)
1588+
1589+
logLines, err := readLinesUntil(ctx, &logBuffer, testCase.waitForLogLines, testCase.waitForLogExtra)
1590+
require.NoError(t, err)
1591+
testCase.funAssert(t, logLines)
1592+
for _, wantedLogLine := range testCase.waitForLogLines {
1593+
require.Contains(t, logLines, wantedLogLine)
1594+
}
1595+
})
1596+
}
1597+
1598+
}

0 commit comments

Comments
 (0)