Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions phase/upgrade_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,32 @@ func (p *UpgradeWorkers) CleanUp() {

// Run the phase
func (p *UpgradeWorkers) Run(ctx context.Context) error {
// Upgrade worker hosts parallelly in 10% chunks
concurrentUpgrades := int(math.Floor(float64(len(p.hosts)) * float64(p.Config.Spec.Options.Concurrency.WorkerDisruptionPercent/100)))
if concurrentUpgrades == 0 {
concurrentUpgrades = 1
}
concurrentUpgrades = min(concurrentUpgrades, p.Config.Spec.Options.Concurrency.Limit)

log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades)
return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades,
p.start,
p.cordonWorker,
p.drainWorker,
p.upgradeWorker,
p.uncordonWorker,
p.finish,
)
// Upgrade worker hosts parallelly in 10% chunks
concurrentUpgrades := int(math.Floor(float64(len(p.hosts)) * float64(p.Config.Spec.Options.Concurrency.WorkerDisruptionPercent/100)))
if concurrentUpgrades == 0 {
concurrentUpgrades = 1
}
concurrentUpgrades = min(concurrentUpgrades, p.Config.Spec.Options.Concurrency.Limit)

// Wait once for kube-proxy to be at desired version across the cluster.
if !p.IsWet() {
p.DryMsg(p.leader, "wait for kube-proxy to be at the desired version (cluster-wide)")
} else if !NoWait { // honor --no-wait
log.Infof("waiting for kube-proxy cluster-wide roll-out")
if err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, node.KubeProxyRolledOutFunc(p.leader)); err != nil {
return fmt.Errorf("kube-proxy did not reach the desired version: %w", err)
}
}

log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades)
return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades,
p.start,
p.cordonWorker,
p.drainWorker,
p.upgradeWorker,
p.uncordonWorker,
p.finish,
)
}

func (p *UpgradeWorkers) cordonWorker(_ context.Context, h *cluster.Host) error {
Expand Down
48 changes: 48 additions & 0 deletions phase/validate_facts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/version"
log "github.com/sirupsen/logrus"
)

Expand All @@ -28,6 +30,10 @@ func (p *ValidateFacts) Run(_ context.Context) error {
return err
}

if err := p.validateVersionSkew(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -70,3 +76,45 @@ func (p *ValidateFacts) validateDefaultVersion() error {

return nil
}

func (p *ValidateFacts) validateVersionSkew() error {
return p.Config.Spec.Hosts.Filter(func(h *cluster.Host) bool {
return h.Metadata.NeedsUpgrade
}).Each(context.Background(), func(_ context.Context, h *cluster.Host) error {
log.Debugf("%s: validating k0s version skew", h)
delta := version.NewDelta(h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version)
log.Debugf("%s: version delta: %s", h, delta)

var unacceptable bool
switch {
case delta.MinorUpgrade:
if h.IsController() {
if p.Config.Spec.K0s.Version.Segments()[1]-h.Metadata.K0sRunningVersion.Segments()[1] > 1 {
log.Debugf("%s: controller upgrade not within version skew policy", h)
unacceptable = true
}
} else if p.Config.Spec.K0s.Version.Segments()[1]-h.Metadata.K0sRunningVersion.Segments()[1] > 3 {
log.Debugf("%s: worker upgrade not within version skew policy", h)
unacceptable = true
}

if !unacceptable {
log.Debugf("%s: minor upgrade within acceptable skew", h)
}
case delta.MajorUpgrade:
unacceptable = true
log.Warnf("%s: major upgrades are not supported, the operation will highly likely fail", h)
}

if unacceptable {
if Force {
log.Warnf("upgrade from %s directly to %s is not within the version skew policy, allowing because --force given", h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version)
return nil
}
return fmt.Errorf("upgrade from %s directly to %s is not within the version skew policy, you can use --force to skip this check", h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version)
}

log.Debugf("%s: version skew check passed", h)
return nil
})
}
247 changes: 247 additions & 0 deletions pkg/node/statusfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -136,3 +137,249 @@ func ServiceStoppedFunc(h *cluster.Host, service string) retryFunc {
return nil
}
}

type daemonSetInfo struct {
Metadata struct {
Name string `json:"name"`
Generation int64 `json:"generation"`
} `json:"metadata"`
Spec struct {
Selector struct {
MatchLabels map[string]string `json:"matchLabels"`
} `json:"selector"`
Template struct {
Spec struct {
Containers []struct {
Name string `json:"name"`
Image string `json:"image"`
} `json:"containers"`
} `json:"spec"`
} `json:"template"`
} `json:"spec"`
Status struct {
ObservedGeneration int64 `json:"observedGeneration"`
DesiredNumberScheduled int32 `json:"desiredNumberScheduled"`
UpdatedNumberScheduled int32 `json:"updatedNumberScheduled"`
NumberAvailable int32 `json:"numberAvailable"`
} `json:"status"`
}

type podList struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
Spec struct {
NodeName string `json:"nodeName"`
Containers []struct {
Name string `json:"name"`
Image string `json:"image"`
} `json:"containers"`
} `json:"spec"`
Status struct {
Phase string `json:"phase"`
ContainerStatuses []struct {
Name string `json:"name"`
Ready bool `json:"ready"`
Image string `json:"image"`
ImageID string `json:"imageID"`
} `json:"containerStatuses"`
} `json:"status"`
} `json:"items"`
}

// KubeProxyRolledOutFunc waits for kube-proxy DS to match the desired
// state across all scheduled nodes in the cluster. The query is executed on `q`.
func KubeProxyRolledOutFunc(q *cluster.Host) retryFunc {
return DaemonSetRolledOutFunc(q, "kube-system", "kube-proxy", "kube-proxy", true)
}

func fetchDaemonSet(h *cluster.Host, ns, name string) (*daemonSetInfo, error) {
out, err := h.ExecOutput(
h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n %s get ds %s -o json", ns, name),
exec.HideOutput(), exec.Sudo(h),
)
if err != nil {
return nil, wrapKubectlNotFound(err)
}
var ds daemonSetInfo
if uerr := json.Unmarshal([]byte(out), &ds); uerr != nil {
return nil, fmt.Errorf("failed to decode DaemonSet %s/%s: %w", ns, name, uerr)
}
return &ds, nil
}

func desiredContainerImage(ds *daemonSetInfo, containerName string) (string, error) {
containers := ds.Spec.Template.Spec.Containers
if len(containers) == 0 {
return "", fmt.Errorf("DaemonSet has no containers in pod template")
}
if containerName == "" {
return containers[0].Image, nil
}
for _, c := range containers {
if c.Name == containerName {
return c.Image, nil
}
}
return "", fmt.Errorf("container %q not found in DaemonSet template", containerName)
}

func listPodsForDaemonSet(h *cluster.Host, ns string, ds *daemonSetInfo) (*podList, error) {
selector := buildLabelSelector(ds.Spec.Selector.MatchLabels)
out, err := h.ExecOutput(
h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n %s get pods -l %s -o json", ns, selector),
exec.HideOutput(), exec.Sudo(h),
)
if err != nil {
return nil, fmt.Errorf("failed to list pods for selector %q in %s: %w", selector, ns, err)
}
var pods podList
if uerr := json.Unmarshal([]byte(out), &pods); uerr != nil {
return nil, fmt.Errorf("failed to decode pods for selector %q: %w", selector, uerr)
}
return &pods, nil
}

// DaemonSetRolledOutFunc waits for the DS to be fully rolled out across
// the cluster according to controller status and pod readiness/image checks.
// If skipIfMissing is true and DS is NotFound, it returns nil.
func DaemonSetRolledOutFunc(h *cluster.Host, namespace, dsName, containerName string, skipIfMissing bool) retryFunc {
return func(_ context.Context) error {
ds, err := fetchDaemonSet(h, namespace, dsName)
if err != nil {
if skipIfMissing && isNotFoundErr(err) {
log.Infof("%s: DaemonSet %s/%s not found; skipping as requested", h, namespace, dsName)
return nil
}
return err
}

// Controller must have observed current generation and report full rollout.
if ds.Status.ObservedGeneration != ds.Metadata.Generation {
return fmt.Errorf("DaemonSet not yet observed: gen=%d obs=%d", ds.Metadata.Generation, ds.Status.ObservedGeneration)
}
if ds.Status.DesiredNumberScheduled == 0 {
log.Infof("%s: %s/%s desiredNumberScheduled=0; nothing to roll out", h, namespace, dsName)
return nil
}

desiredImg, err := desiredContainerImage(ds, containerName)
if err != nil {
return err
}

pods, err := listPodsForDaemonSet(h, namespace, ds)
if err != nil {
return err
}
if int32(len(pods.Items)) != ds.Status.DesiredNumberScheduled {
return fmt.Errorf("pod count mismatch for DS %s/%s: have=%d desired=%d", namespace, dsName, len(pods.Items), ds.Status.DesiredNumberScheduled)
}

notReady, mismatches := verifyPodsReadyAndImage(pods, containerName, desiredImg)
if notReady > 0 {
return fmt.Errorf("%d containers NotReady for DaemonSet %s/%s", notReady, namespace, dsName)
}
if mismatches > 0 {
return fmt.Errorf("%d pods running unexpected image for DaemonSet %s/%s", mismatches, namespace, dsName)
}

if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || ds.Status.NumberAvailable != ds.Status.DesiredNumberScheduled {
return fmt.Errorf("DaemonSet not fully rolled out: updated=%d available=%d desired=%d",
ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, ds.Status.DesiredNumberScheduled)
}

log.Debugf("%s: %s/%s rolled out cluster-wide: desired=%d updated=%d available=%d image=%s", h, namespace, dsName,
ds.Status.DesiredNumberScheduled, ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, desiredImg)
return nil
}
}

func verifyPodsReadyAndImage(pods *podList, containerName, desiredImg string) (notReady, mismatches int) {
for _, p := range pods.Items {
if p.Status.Phase != "Running" {
notReady++
continue
}
var podImg, imageID string
var hasContainer, ready bool

for _, c := range p.Spec.Containers {
if containerName == "" || c.Name == containerName {
podImg = c.Image
break
}
}
for _, cs := range p.Status.ContainerStatuses {
if containerName == "" || cs.Name == containerName {
hasContainer = true
ready = cs.Ready
imageID = cs.ImageID
break
}
}
if !hasContainer || !ready {
notReady++
continue
}
if !matchImage(desiredImg, podImg, imageID) {
mismatches++
}
}
return
}

func buildLabelSelector(labels map[string]string) string {
// Simple AND of matchLabels (k=v,k2=v2,...)
if len(labels) == 0 {
return ""
}
parts := make([]string, 0, len(labels))
for k, v := range labels {
parts = append(parts, fmt.Sprintf("%s=%s", k, v))
}
// Deterministic order not required by kubectl, but harmless as-is.
return strings.Join(parts, ",")
}

func matchImage(dsImage, podImage, podImageID string) bool {
// Exact tag match
if dsImage != "" && dsImage == podImage {
return true
}
// Digest pin match: DS template uses @sha256:..., ensure pod's ImageID has same digest.
if at := strings.Index(dsImage, "@sha256:"); at != -1 {
digest := dsImage[at+1:] // "sha256:..."
return strings.Contains(podImageID, digest)
}
return false
}

func wrapKubectlNotFound(err error) error {
if err == nil {
return nil
}
// Typical stderr: 'Error from server (NotFound): daemonsets.apps "kube-proxy" not found'
low := strings.ToLower(err.Error())
if strings.Contains(low, "notfound") || strings.Contains(low, "not found") {
return &notFoundError{err}
}
return err
}

type notFoundError struct{ error }

func (e *notFoundError) Unwrap() error { return e.error }

func isNotFoundErr(err error) bool {
if err == nil {
return false
}
var nf *notFoundError
if errors.As(err, &nf) {
return true
}
low := strings.ToLower(err.Error())
return strings.Contains(low, "notfound") || strings.Contains(low, "not found")
}
Loading
Loading