Skip to content

Commit 01e0c79

Browse files
bart0shklueska
andcommitted
DRA: refactor checkpointing
Co-authored-by: Kevin Klues <klueska@gmail.com>
1 parent d11b58e commit 01e0c79

12 files changed

+627
-503
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package checkpoint
18+
19+
import (
20+
"encoding/json"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
24+
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
25+
)
26+
27+
const (
28+
CheckpointAPIGroup = "checkpoint.dra.kubelet.k8s.io"
29+
CheckpointKind = "DRACheckpoint"
30+
CheckpointAPIVersion = CheckpointAPIGroup + "/" + state.Version
31+
)
32+
33+
// Checkpoint represents a structure to store DRA checkpoint data
34+
type Checkpoint struct {
35+
// Data is a JSON serialized checkpoint data
36+
// See state.CheckpointData for the details
37+
Data string
38+
// Checksum is a checksum of Data
39+
Checksum checksum.Checksum
40+
}
41+
42+
type CheckpointData struct {
43+
metav1.TypeMeta
44+
Entries state.ClaimInfoStateList
45+
}
46+
47+
// NewCheckpoint creates a new checkpoint from a list of claim info states
48+
func NewCheckpoint(data state.ClaimInfoStateList) (*Checkpoint, error) {
49+
cpData := &CheckpointData{
50+
TypeMeta: metav1.TypeMeta{
51+
Kind: CheckpointKind,
52+
APIVersion: CheckpointAPIVersion,
53+
},
54+
Entries: data,
55+
}
56+
57+
cpDataBytes, err := json.Marshal(cpData)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
return &Checkpoint{
63+
Data: string(cpDataBytes),
64+
Checksum: checksum.New(string(cpDataBytes)),
65+
}, nil
66+
}
67+
68+
// MarshalCheckpoint marshals checkpoint to JSON
69+
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
70+
return json.Marshal(cp)
71+
}
72+
73+
// UnmarshalCheckpoint unmarshals checkpoint from JSON
74+
// and verifies its data checksum
75+
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
76+
if err := json.Unmarshal(blob, cp); err != nil {
77+
return err
78+
}
79+
80+
// verify checksum
81+
if err := cp.VerifyChecksum(); err != nil {
82+
return err
83+
}
84+
85+
return nil
86+
}
87+
88+
// VerifyChecksum verifies that current checksum
89+
// of checkpointed Data is valid
90+
func (cp *Checkpoint) VerifyChecksum() error {
91+
return cp.Checksum.Verify(cp.Data)
92+
}
93+
94+
// GetEntries returns list of claim info states from checkpoint
95+
func (cp *Checkpoint) GetEntries() (state.ClaimInfoStateList, error) {
96+
var cpData CheckpointData
97+
if err := json.Unmarshal([]byte(cp.Data), &cpData); err != nil {
98+
return nil, err
99+
}
100+
101+
return cpData.Entries, nil
102+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package checkpoint
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
24+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
25+
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
26+
)
27+
28+
type Checkpointer interface {
29+
GetOrCreate() (*Checkpoint, error)
30+
Store(*Checkpoint) error
31+
}
32+
33+
type checkpointer struct {
34+
sync.RWMutex
35+
checkpointManager checkpointmanager.CheckpointManager
36+
checkpointName string
37+
}
38+
39+
// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend
40+
func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) {
41+
if len(checkpointName) == 0 {
42+
return nil, fmt.Errorf("received empty string instead of checkpointName")
43+
}
44+
45+
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
48+
}
49+
checkpointer := &checkpointer{
50+
checkpointManager: checkpointManager,
51+
checkpointName: checkpointName,
52+
}
53+
54+
return checkpointer, nil
55+
}
56+
57+
// GetOrCreate gets list of claim info states from a checkpoint
58+
// or creates empty list it checkpoint doesn't exist yet
59+
func (sc *checkpointer) GetOrCreate() (*Checkpoint, error) {
60+
sc.Lock()
61+
defer sc.Unlock()
62+
63+
checkpoint, err := NewCheckpoint(nil)
64+
if err != nil {
65+
return nil, fmt.Errorf("failed to create new checkpoint: %w", err)
66+
}
67+
68+
err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
69+
if err == errors.ErrCheckpointNotFound {
70+
sc.store(checkpoint)
71+
return NewCheckpoint(state.ClaimInfoStateList{})
72+
}
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
75+
}
76+
77+
return checkpoint, nil
78+
}
79+
80+
// Store stores checkpoint to the file
81+
func (sc *checkpointer) Store(checkpoint *Checkpoint) error {
82+
sc.Lock()
83+
defer sc.Unlock()
84+
85+
return sc.store(checkpoint)
86+
}
87+
88+
// store saves state to a checkpoint, caller is responsible for locking
89+
func (sc *checkpointer) store(checkpoint *Checkpoint) error {
90+
if err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint); err != nil {
91+
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
92+
}
93+
return nil
94+
}

0 commit comments

Comments
 (0)