From 61cdb000e9cb497b981f197c79162964313eb16f Mon Sep 17 00:00:00 2001 From: Kimmo Lehto Date: Tue, 12 Aug 2025 14:24:11 +0300 Subject: [PATCH 1/2] Validate version consicutiveness on upgrade Signed-off-by: Kimmo Lehto --- phase/upgrade_workers.go | 13 ++ phase/validate_facts.go | 48 +++++++ pkg/node/statusfunc.go | 258 ++++++++++++++++++++++++++++++++++++ smoke-test/smoke-dryrun.sh | 14 +- smoke-test/smoke-upgrade.sh | 9 +- 5 files changed, 334 insertions(+), 8 deletions(-) diff --git a/phase/upgrade_workers.go b/phase/upgrade_workers.go index c59df939..710527d2 100644 --- a/phase/upgrade_workers.go +++ b/phase/upgrade_workers.go @@ -87,6 +87,7 @@ func (p *UpgradeWorkers) Run(ctx context.Context) error { log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades) return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades, p.start, + p.waitForKubeProxy, p.cordonWorker, p.drainWorker, p.upgradeWorker, @@ -244,3 +245,15 @@ func (p *UpgradeWorkers) upgradeWorker(ctx context.Context, h *cluster.Host) err h.Metadata.Ready = true return nil } + +func (p *UpgradeWorkers) waitForKubeProxy(ctx context.Context, h *cluster.Host) error { + if !p.IsWet() { + p.DryMsg(h, "wait for kube-proxy to be at the desired version") + return nil + } + log.Infof("%s: waiting for kube-proxy roll-out", h) + if err := retry.AdaptiveTimeout(context.Background(), retry.DefaultTimeout, node.KubeProxyRolledOutFunc(h)); err != nil { + return fmt.Errorf("kube-proxy did not reach the desired version: %w", err) + } + return nil +} diff --git a/phase/validate_facts.go b/phase/validate_facts.go index 6151ebf5..c919081f 100644 --- a/phase/validate_facts.go +++ b/phase/validate_facts.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster" + "github.com/k0sproject/version" log "github.com/sirupsen/logrus" ) @@ -28,6 +30,10 @@ func (p *ValidateFacts) Run(_ context.Context) error { return err } + if err := p.validateVersionSkew(); err != nil { + return err + } + return nil } @@ -70,3 +76,45 @@ func (p *ValidateFacts) validateDefaultVersion() error { return nil } + +func (p *ValidateFacts) validateVersionSkew() error { + return p.Config.Spec.Hosts.Filter(func(h *cluster.Host) bool { + return h.Metadata.NeedsUpgrade + }).Each(context.Background(), func(_ context.Context, h *cluster.Host) error { + log.Debugf("%s: validating k0s version skew", h) + delta := version.NewDelta(h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version) + log.Debugf("%s: version delta: %s", h, delta) + + var unacceptable bool + switch { + case delta.MinorUpgrade: + if h.IsController() { + if p.Config.Spec.K0s.Version.Segments()[1]-h.Metadata.K0sRunningVersion.Segments()[1] > 1 { + log.Debugf("%s: controller upgrade not within version skew policy", h) + unacceptable = true + } + } else if p.Config.Spec.K0s.Version.Segments()[1]-h.Metadata.K0sRunningVersion.Segments()[1] > 3 { + log.Debugf("%s: worker upgrade not within version skew policy", h) + unacceptable = true + } + + if !unacceptable { + log.Debugf("%s: minor upgrade within acceptable skew", h) + } + case delta.MajorUpgrade: + unacceptable = true + log.Warnf("%s: major upgrades are not supported, the operation will highly likely fail", h) + } + + if unacceptable { + if Force { + log.Warnf("upgrade from %s directly to %s is not within the version skew policy, allowing because --force given", h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version) + return nil + } + return fmt.Errorf("upgrade from %s directly to %s is not within the version skew policy, you can use --force to skip this check", h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version) + } + + log.Debugf("%s: version skew check passed", h) + return nil + }) +} diff --git a/pkg/node/statusfunc.go b/pkg/node/statusfunc.go index 8db17102..6791395b 100644 --- a/pkg/node/statusfunc.go +++ b/pkg/node/statusfunc.go @@ -3,6 +3,7 @@ package node import ( "context" "encoding/json" + "errors" "fmt" "strings" "time" @@ -136,3 +137,260 @@ func ServiceStoppedFunc(h *cluster.Host, service string) retryFunc { return nil } } + +type daemonSetInfo struct { + Metadata struct { + Name string `json:"name"` + Generation int64 `json:"generation"` + } `json:"metadata"` + Spec struct { + Selector struct { + MatchLabels map[string]string `json:"matchLabels"` + } `json:"selector"` + Template struct { + Spec struct { + Containers []struct { + Name string `json:"name"` + Image string `json:"image"` + } `json:"containers"` + } `json:"spec"` + } `json:"template"` + } `json:"spec"` + Status struct { + ObservedGeneration int64 `json:"observedGeneration"` + DesiredNumberScheduled int32 `json:"desiredNumberScheduled"` + UpdatedNumberScheduled int32 `json:"updatedNumberScheduled"` + NumberAvailable int32 `json:"numberAvailable"` + } `json:"status"` +} + +type podList struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + } `json:"metadata"` + Spec struct { + NodeName string `json:"nodeName"` + Containers []struct { + Name string `json:"name"` + Image string `json:"image"` + } `json:"containers"` + } `json:"spec"` + Status struct { + Phase string `json:"phase"` + ContainerStatuses []struct { + Name string `json:"name"` + Ready bool `json:"ready"` + Image string `json:"image"` + ImageID string `json:"imageID"` + } `json:"containerStatuses"` + } `json:"status"` + } `json:"items"` +} + +// DaemonSetRolledOutFunc returns a retryFunc that waits until the given DaemonSet has: +// 1. been observed by the controller (observedGeneration == generation) +// 2. updatedNumberScheduled == desiredNumberScheduled +// 3. numberAvailable == desiredNumberScheduled +// 4. all matched pods have the specified container Ready and matching the template image +// +// If skipIfMissing is true and the DaemonSet is NotFound, it returns nil +// (useful for proxyless setups where kube-proxy DS is intentionally absent). +func DaemonSetRolledOutFunc(h *cluster.Host, namespace, dsName, containerName string, skipIfMissing bool) retryFunc { + return func(_ context.Context) error { + ds, err := fetchDaemonSet(h, namespace, dsName) + if err != nil { + if skipIfMissing && isNotFoundErr(err) { + log.Infof("%s: DaemonSet %s/%s not found; skipping as requested", h, namespace, dsName) + return nil + } + return err + } + + if err := assertDaemonSetObservedAndComplete(ds); err != nil { + return err + } + if ds.Status.DesiredNumberScheduled == 0 { + log.Infof("%s: %s/%s desiredNumberScheduled=0; nothing to roll out", h, namespace, dsName) + return nil + } + + desiredImg, err := desiredContainerImage(ds, containerName) + if err != nil { + return err + } + + pods, err := listPodsForDaemonSet(h, namespace, ds) + if err != nil { + return err + } + if len(pods.Items) == 0 { + return fmt.Errorf("no pods found for DaemonSet %s/%s despite desired=%d", + namespace, dsName, ds.Status.DesiredNumberScheduled) + } + + notReady, mismatches := verifyPodsReadyAndImage(pods, containerName, desiredImg) + if notReady > 0 { + return fmt.Errorf("%d containers NotReady for DaemonSet %s/%s", notReady, namespace, dsName) + } + if mismatches > 0 { + return fmt.Errorf("%d pods running unexpected image for DaemonSet %s/%s", mismatches, namespace, dsName) + } + + log.Debugf("%s: %s/%s rolled out: desired=%d updated=%d available=%d image=%s", + h, namespace, dsName, ds.Status.DesiredNumberScheduled, ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, desiredImg) + return nil + } +} + +// Optional convenience: kube-proxy waiter (skip if DS missing, e.g., proxyless CNI) +func KubeProxyRolledOutFunc(h *cluster.Host) retryFunc { + return DaemonSetRolledOutFunc(h, "kube-system", "kube-proxy", "kube-proxy", true) +} + +func fetchDaemonSet(h *cluster.Host, ns, name string) (*daemonSetInfo, error) { + out, err := h.ExecOutput( + h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n %s get ds %s -o json", ns, name), + exec.HideOutput(), exec.Sudo(h), + ) + if err != nil { + return nil, wrapKubectlNotFound(err) + } + var ds daemonSetInfo + if uerr := json.Unmarshal([]byte(out), &ds); uerr != nil { + return nil, fmt.Errorf("failed to decode DaemonSet %s/%s: %w", ns, name, uerr) + } + return &ds, nil +} + +func assertDaemonSetObservedAndComplete(ds *daemonSetInfo) error { + if ds.Status.ObservedGeneration != ds.Metadata.Generation { + return fmt.Errorf("DaemonSet not yet observed: gen=%d obs=%d", ds.Metadata.Generation, ds.Status.ObservedGeneration) + } + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || + ds.Status.NumberAvailable != ds.Status.DesiredNumberScheduled { + return fmt.Errorf("DaemonSet not fully rolled out: updated=%d available=%d desired=%d", + ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, ds.Status.DesiredNumberScheduled) + } + return nil +} + +func desiredContainerImage(ds *daemonSetInfo, containerName string) (string, error) { + containers := ds.Spec.Template.Spec.Containers + if len(containers) == 0 { + return "", fmt.Errorf("DaemonSet has no containers in pod template") + } + if containerName == "" { + return containers[0].Image, nil + } + for _, c := range containers { + if c.Name == containerName { + return c.Image, nil + } + } + return "", fmt.Errorf("container %q not found in DaemonSet template", containerName) +} + +func listPodsForDaemonSet(h *cluster.Host, ns string, ds *daemonSetInfo) (*podList, error) { + selector := buildLabelSelector(ds.Spec.Selector.MatchLabels) + out, err := h.ExecOutput( + h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n %s get pods -l %s -o json", ns, selector), + exec.HideOutput(), exec.Sudo(h), + ) + if err != nil { + return nil, fmt.Errorf("failed to list pods for selector %q in %s: %w", selector, ns, err) + } + var pods podList + if uerr := json.Unmarshal([]byte(out), &pods); uerr != nil { + return nil, fmt.Errorf("failed to decode pods for selector %q: %w", selector, uerr) + } + return &pods, nil +} + +func verifyPodsReadyAndImage(pods *podList, containerName, desiredImg string) (notReady, mismatches int) { + for _, p := range pods.Items { + if p.Status.Phase != "Running" { + notReady++ + continue + } + var podImg, imageID string + var hasContainer, ready bool + + for _, c := range p.Spec.Containers { + if containerName == "" || c.Name == containerName { + podImg = c.Image + break + } + } + for _, cs := range p.Status.ContainerStatuses { + if containerName == "" || cs.Name == containerName { + hasContainer = true + ready = cs.Ready + imageID = cs.ImageID + break + } + } + if !hasContainer || !ready { + notReady++ + continue + } + if !matchImage(desiredImg, podImg, imageID) { + mismatches++ + } + } + return +} + +func buildLabelSelector(labels map[string]string) string { + // Simple AND of matchLabels (k=v,k2=v2,...) + if len(labels) == 0 { + return "" + } + parts := make([]string, 0, len(labels)) + for k, v := range labels { + parts = append(parts, fmt.Sprintf("%s=%s", k, v)) + } + // Deterministic order not required by kubectl, but harmless as-is. + return strings.Join(parts, ",") +} + +func matchImage(dsImage, podImage, podImageID string) bool { + // Exact tag match + if dsImage != "" && dsImage == podImage { + return true + } + // Digest pin match: DS template uses @sha256:..., ensure pod's ImageID has same digest. + if at := strings.Index(dsImage, "@sha256:"); at != -1 { + digest := dsImage[at+1:] // "sha256:..." + return strings.Contains(podImageID, digest) + } + return false +} + +func wrapKubectlNotFound(err error) error { + if err == nil { + return nil + } + // Typical stderr: 'Error from server (NotFound): daemonsets.apps "kube-proxy" not found' + low := strings.ToLower(err.Error()) + if strings.Contains(low, "notfound") || strings.Contains(low, "not found") { + return ¬FoundError{err} + } + return err +} + +type notFoundError struct{ error } + +func (e *notFoundError) Unwrap() error { return e.error } + +func isNotFoundErr(err error) bool { + if err == nil { + return false + } + var nf *notFoundError + if errors.As(err, &nf) { + return true + } + low := strings.ToLower(err.Error()) + return strings.Contains(low, "notfound") || strings.Contains(low, "not found") +} diff --git a/smoke-test/smoke-dryrun.sh b/smoke-test/smoke-dryrun.sh index 701afcba..26910bf4 100755 --- a/smoke-test/smoke-dryrun.sh +++ b/smoke-test/smoke-dryrun.sh @@ -90,8 +90,8 @@ expectNoK0s() { } applyConfig() { - local extra_flag=$1 - ../k0sctl apply --config "${K0SCTL_CONFIG}" --debug "${extra_flag}" | tee "${log}" + local extra_flags=("$@") + ../k0sctl apply --config "${K0SCTL_CONFIG}" --debug "${extra_flags[@]}" | tee "${log}" } deleteCluster @@ -100,7 +100,7 @@ createCluster K0S_VERSION="${K0S_FROM}" colorEcho 3 "Installing ${K0S_VERSION} with --dry-run" -applyConfig "--dry-run" +applyConfig --dry-run expectNoK0s checkDryRunLines min 3 dumpDryRunLines @@ -111,7 +111,7 @@ expectK0sVersion "${K0S_FROM}" checkDryRunLines none colorEcho 3 "Installing ${K0S_VERSION} with --dry-run again" -applyConfig "--dry-run" +applyConfig --dry-run expectK0sVersion "${K0S_FROM}" dryRunNoChanges @@ -119,17 +119,17 @@ colorEcho 4 "Succesfully installed ${K0S_FROM}, moving on to upgrade to ${K0S_TO K0S_VERSION="${K0S_TO}" colorEcho 3 "Upgrading to ${K0S_VERSION} with --dry-run" -applyConfig "--dry-run" +applyConfig --dry-run --force expectK0sVersion "${K0S_FROM}" checkDryRunLines min 3 dumpDryRunLines colorEcho 3 "Upgrading to ${K0S_VERSION}" -applyConfig +applyConfig --force expectK0sVersion "${K0S_TO}" checkDryRunLines none colorEcho 3 "Upgrading to ${K0S_VERSION} with --dry-run again" -applyConfig "--dry-run" +applyConfig --dry-run --force expectK0sVersion "${K0S_TO}" dryRunNoChanges diff --git a/smoke-test/smoke-upgrade.sh b/smoke-test/smoke-upgrade.sh index 71f7b7bf..257dfbb6 100755 --- a/smoke-test/smoke-upgrade.sh +++ b/smoke-test/smoke-upgrade.sh @@ -28,5 +28,12 @@ K0S_VERSION=$(curl -s "https://docs.k0sproject.io/stable.txt") # Create config with latest version and apply as upgrade echo "Upgrading to k0s ${K0S_VERSION}" -../k0sctl apply --config "${K0SCTL_CONFIG}" --debug +# First attempt should fail without --force because of version skew +if ../k0sctl apply --config "${K0SCTL_CONFIG}" --debug; then + echo "Expected failure when applying without --force" + exit 1 +fi + +# Second attempt should succeed with --force +../k0sctl apply --config "${K0SCTL_CONFIG}" --debug --force remoteCommand "root@manager0" "k0s version | grep -q ${K0S_VERSION}" From 5c47051e85508263a65d178be2f5de29e0287aad Mon Sep 17 00:00:00 2001 From: Kimmo Lehto Date: Wed, 3 Sep 2025 13:07:37 +0300 Subject: [PATCH 2/2] Smarter roll-out check Signed-off-by: Kimmo Lehto --- phase/upgrade_workers.go | 53 ++++++++-------- pkg/node/statusfunc.go | 129 ++++++++++++++++++--------------------- 2 files changed, 84 insertions(+), 98 deletions(-) diff --git a/phase/upgrade_workers.go b/phase/upgrade_workers.go index 710527d2..bebf5a08 100644 --- a/phase/upgrade_workers.go +++ b/phase/upgrade_workers.go @@ -77,23 +77,32 @@ func (p *UpgradeWorkers) CleanUp() { // Run the phase func (p *UpgradeWorkers) Run(ctx context.Context) error { - // Upgrade worker hosts parallelly in 10% chunks - concurrentUpgrades := int(math.Floor(float64(len(p.hosts)) * float64(p.Config.Spec.Options.Concurrency.WorkerDisruptionPercent/100))) - if concurrentUpgrades == 0 { - concurrentUpgrades = 1 - } - concurrentUpgrades = min(concurrentUpgrades, p.Config.Spec.Options.Concurrency.Limit) + // Upgrade worker hosts parallelly in 10% chunks + concurrentUpgrades := int(math.Floor(float64(len(p.hosts)) * float64(p.Config.Spec.Options.Concurrency.WorkerDisruptionPercent/100))) + if concurrentUpgrades == 0 { + concurrentUpgrades = 1 + } + concurrentUpgrades = min(concurrentUpgrades, p.Config.Spec.Options.Concurrency.Limit) + + // Wait once for kube-proxy to be at desired version across the cluster. + if !p.IsWet() { + p.DryMsg(p.leader, "wait for kube-proxy to be at the desired version (cluster-wide)") + } else if !NoWait { // honor --no-wait + log.Infof("waiting for kube-proxy cluster-wide roll-out") + if err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, node.KubeProxyRolledOutFunc(p.leader)); err != nil { + return fmt.Errorf("kube-proxy did not reach the desired version: %w", err) + } + } - log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades) - return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades, - p.start, - p.waitForKubeProxy, - p.cordonWorker, - p.drainWorker, - p.upgradeWorker, - p.uncordonWorker, - p.finish, - ) + log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades) + return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades, + p.start, + p.cordonWorker, + p.drainWorker, + p.upgradeWorker, + p.uncordonWorker, + p.finish, + ) } func (p *UpgradeWorkers) cordonWorker(_ context.Context, h *cluster.Host) error { @@ -245,15 +254,3 @@ func (p *UpgradeWorkers) upgradeWorker(ctx context.Context, h *cluster.Host) err h.Metadata.Ready = true return nil } - -func (p *UpgradeWorkers) waitForKubeProxy(ctx context.Context, h *cluster.Host) error { - if !p.IsWet() { - p.DryMsg(h, "wait for kube-proxy to be at the desired version") - return nil - } - log.Infof("%s: waiting for kube-proxy roll-out", h) - if err := retry.AdaptiveTimeout(context.Background(), retry.DefaultTimeout, node.KubeProxyRolledOutFunc(h)); err != nil { - return fmt.Errorf("kube-proxy did not reach the desired version: %w", err) - } - return nil -} diff --git a/pkg/node/statusfunc.go b/pkg/node/statusfunc.go index 6791395b..9f2cc8c9 100644 --- a/pkg/node/statusfunc.go +++ b/pkg/node/statusfunc.go @@ -188,64 +188,10 @@ type podList struct { } `json:"items"` } -// DaemonSetRolledOutFunc returns a retryFunc that waits until the given DaemonSet has: -// 1. been observed by the controller (observedGeneration == generation) -// 2. updatedNumberScheduled == desiredNumberScheduled -// 3. numberAvailable == desiredNumberScheduled -// 4. all matched pods have the specified container Ready and matching the template image -// -// If skipIfMissing is true and the DaemonSet is NotFound, it returns nil -// (useful for proxyless setups where kube-proxy DS is intentionally absent). -func DaemonSetRolledOutFunc(h *cluster.Host, namespace, dsName, containerName string, skipIfMissing bool) retryFunc { - return func(_ context.Context) error { - ds, err := fetchDaemonSet(h, namespace, dsName) - if err != nil { - if skipIfMissing && isNotFoundErr(err) { - log.Infof("%s: DaemonSet %s/%s not found; skipping as requested", h, namespace, dsName) - return nil - } - return err - } - - if err := assertDaemonSetObservedAndComplete(ds); err != nil { - return err - } - if ds.Status.DesiredNumberScheduled == 0 { - log.Infof("%s: %s/%s desiredNumberScheduled=0; nothing to roll out", h, namespace, dsName) - return nil - } - - desiredImg, err := desiredContainerImage(ds, containerName) - if err != nil { - return err - } - - pods, err := listPodsForDaemonSet(h, namespace, ds) - if err != nil { - return err - } - if len(pods.Items) == 0 { - return fmt.Errorf("no pods found for DaemonSet %s/%s despite desired=%d", - namespace, dsName, ds.Status.DesiredNumberScheduled) - } - - notReady, mismatches := verifyPodsReadyAndImage(pods, containerName, desiredImg) - if notReady > 0 { - return fmt.Errorf("%d containers NotReady for DaemonSet %s/%s", notReady, namespace, dsName) - } - if mismatches > 0 { - return fmt.Errorf("%d pods running unexpected image for DaemonSet %s/%s", mismatches, namespace, dsName) - } - - log.Debugf("%s: %s/%s rolled out: desired=%d updated=%d available=%d image=%s", - h, namespace, dsName, ds.Status.DesiredNumberScheduled, ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, desiredImg) - return nil - } -} - -// Optional convenience: kube-proxy waiter (skip if DS missing, e.g., proxyless CNI) -func KubeProxyRolledOutFunc(h *cluster.Host) retryFunc { - return DaemonSetRolledOutFunc(h, "kube-system", "kube-proxy", "kube-proxy", true) +// KubeProxyRolledOutFunc waits for kube-proxy DS to match the desired +// state across all scheduled nodes in the cluster. The query is executed on `q`. +func KubeProxyRolledOutFunc(q *cluster.Host) retryFunc { + return DaemonSetRolledOutFunc(q, "kube-system", "kube-proxy", "kube-proxy", true) } func fetchDaemonSet(h *cluster.Host, ns, name string) (*daemonSetInfo, error) { @@ -263,18 +209,6 @@ func fetchDaemonSet(h *cluster.Host, ns, name string) (*daemonSetInfo, error) { return &ds, nil } -func assertDaemonSetObservedAndComplete(ds *daemonSetInfo) error { - if ds.Status.ObservedGeneration != ds.Metadata.Generation { - return fmt.Errorf("DaemonSet not yet observed: gen=%d obs=%d", ds.Metadata.Generation, ds.Status.ObservedGeneration) - } - if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || - ds.Status.NumberAvailable != ds.Status.DesiredNumberScheduled { - return fmt.Errorf("DaemonSet not fully rolled out: updated=%d available=%d desired=%d", - ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, ds.Status.DesiredNumberScheduled) - } - return nil -} - func desiredContainerImage(ds *daemonSetInfo, containerName string) (string, error) { containers := ds.Spec.Template.Spec.Containers if len(containers) == 0 { @@ -307,6 +241,61 @@ func listPodsForDaemonSet(h *cluster.Host, ns string, ds *daemonSetInfo) (*podLi return &pods, nil } +// DaemonSetRolledOutFunc waits for the DS to be fully rolled out across +// the cluster according to controller status and pod readiness/image checks. +// If skipIfMissing is true and DS is NotFound, it returns nil. +func DaemonSetRolledOutFunc(h *cluster.Host, namespace, dsName, containerName string, skipIfMissing bool) retryFunc { + return func(_ context.Context) error { + ds, err := fetchDaemonSet(h, namespace, dsName) + if err != nil { + if skipIfMissing && isNotFoundErr(err) { + log.Infof("%s: DaemonSet %s/%s not found; skipping as requested", h, namespace, dsName) + return nil + } + return err + } + + // Controller must have observed current generation and report full rollout. + if ds.Status.ObservedGeneration != ds.Metadata.Generation { + return fmt.Errorf("DaemonSet not yet observed: gen=%d obs=%d", ds.Metadata.Generation, ds.Status.ObservedGeneration) + } + if ds.Status.DesiredNumberScheduled == 0 { + log.Infof("%s: %s/%s desiredNumberScheduled=0; nothing to roll out", h, namespace, dsName) + return nil + } + + desiredImg, err := desiredContainerImage(ds, containerName) + if err != nil { + return err + } + + pods, err := listPodsForDaemonSet(h, namespace, ds) + if err != nil { + return err + } + if int32(len(pods.Items)) != ds.Status.DesiredNumberScheduled { + return fmt.Errorf("pod count mismatch for DS %s/%s: have=%d desired=%d", namespace, dsName, len(pods.Items), ds.Status.DesiredNumberScheduled) + } + + notReady, mismatches := verifyPodsReadyAndImage(pods, containerName, desiredImg) + if notReady > 0 { + return fmt.Errorf("%d containers NotReady for DaemonSet %s/%s", notReady, namespace, dsName) + } + if mismatches > 0 { + return fmt.Errorf("%d pods running unexpected image for DaemonSet %s/%s", mismatches, namespace, dsName) + } + + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || ds.Status.NumberAvailable != ds.Status.DesiredNumberScheduled { + return fmt.Errorf("DaemonSet not fully rolled out: updated=%d available=%d desired=%d", + ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, ds.Status.DesiredNumberScheduled) + } + + log.Debugf("%s: %s/%s rolled out cluster-wide: desired=%d updated=%d available=%d image=%s", h, namespace, dsName, + ds.Status.DesiredNumberScheduled, ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, desiredImg) + return nil + } +} + func verifyPodsReadyAndImage(pods *podList, containerName, desiredImg string) (notReady, mismatches int) { for _, p := range pods.Items { if p.Status.Phase != "Running" {