Skip to content

Commit 74af2e5

Browse files
bart0shklueska
andcommitted
DRA: refactor checkpointing
Co-authored-by: Kevin Klues <klueska@gmail.com>
1 parent d11b58e commit 74af2e5

12 files changed

+638
-499
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package checkpoint
18+
19+
import (
20+
"encoding/json"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
24+
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
25+
)
26+
27+
type Checkpoint struct {
28+
Data string
29+
Checksum checksum.Checksum
30+
}
31+
32+
func NewCheckpoint(data state.ClaimInfoStateList) (*Checkpoint, error) {
33+
cpData := &state.CheckpointData{
34+
TypeMeta: metav1.TypeMeta{
35+
Kind: state.CheckpointKind,
36+
APIVersion: state.CheckpointAPIVersion,
37+
},
38+
Entries: data,
39+
}
40+
41+
cpDataBytes, err := json.Marshal(cpData)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
return &Checkpoint{
47+
Data: string(cpDataBytes),
48+
Checksum: checksum.New(string(cpDataBytes)),
49+
}, nil
50+
}
51+
52+
// MarshalCheckpoint marshals checkpoint to JSON
53+
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
54+
return json.Marshal(cp)
55+
}
56+
57+
// UnmarshalCheckpoint unmarshals checkpoint from JSON
58+
// and verifies its checksum
59+
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
60+
if err := json.Unmarshal(blob, cp); err != nil {
61+
return err
62+
}
63+
64+
// verify checksum
65+
if err := cp.VerifyChecksum(); err != nil {
66+
return err
67+
}
68+
69+
return nil
70+
}
71+
72+
// VerifyChecksum verifies that current checksum
73+
// of checkpointed Data is valid
74+
func (cp *Checkpoint) VerifyChecksum() error {
75+
return cp.Checksum.Verify(cp.Data)
76+
}
77+
78+
// GetEntries returns list of claim info states from checkpoint
79+
func (cp *Checkpoint) GetEntries() (state.ClaimInfoStateList, error) {
80+
var cpData state.CheckpointData
81+
if err := json.Unmarshal([]byte(cp.Data), &cpData); err != nil {
82+
return nil, err
83+
}
84+
85+
return cpData.Entries, nil
86+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package checkpoint
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
24+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
25+
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
26+
)
27+
28+
type Checkpointer interface {
29+
GetOrCreate() (state.ClaimInfoStateList, error)
30+
Store(state.ClaimInfoStateList) error
31+
}
32+
33+
type checkpointer struct {
34+
sync.RWMutex
35+
checkpointManager checkpointmanager.CheckpointManager
36+
checkpointName string
37+
}
38+
39+
// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend
40+
func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) {
41+
if len(checkpointName) == 0 {
42+
return nil, fmt.Errorf("received empty string instead of checkpointName")
43+
}
44+
45+
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
48+
}
49+
checkpointer := &checkpointer{
50+
checkpointManager: checkpointManager,
51+
checkpointName: checkpointName,
52+
}
53+
54+
return checkpointer, nil
55+
}
56+
57+
// GetOrCreate gets list of claim info states from a checkpoint
58+
// or creates empty list it checkpoint doesn't exist yet
59+
func (sc *checkpointer) GetOrCreate() (state.ClaimInfoStateList, error) {
60+
sc.Lock()
61+
defer sc.Unlock()
62+
63+
checkpoint, err := NewCheckpoint(nil)
64+
err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
65+
if err == errors.ErrCheckpointNotFound {
66+
sc.store(state.ClaimInfoStateList{})
67+
return state.ClaimInfoStateList{}, nil
68+
}
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
71+
}
72+
73+
return checkpoint.GetEntries()
74+
}
75+
76+
// saves state to a checkpoint
77+
func (sc *checkpointer) Store(claimInfoStateList state.ClaimInfoStateList) error {
78+
sc.Lock()
79+
defer sc.Unlock()
80+
81+
return sc.store(claimInfoStateList)
82+
}
83+
84+
// saves state to a checkpoint, caller is responsible for locking
85+
func (sc *checkpointer) store(claimInfoStateList state.ClaimInfoStateList) error {
86+
checkpoint, err := NewCheckpoint(claimInfoStateList)
87+
88+
if err != nil {
89+
return err
90+
}
91+
92+
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
93+
if err != nil {
94+
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
95+
}
96+
return nil
97+
}

0 commit comments

Comments
 (0)