Skip to content

Commit 38ed8c2

Browse files
bart0shklueska
andcommitted
DRA: refactor checkpointing
Co-authored-by: Kevin Klues <klueska@gmail.com>
1 parent d11b58e commit 38ed8c2

12 files changed

+643
-499
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package checkpoint
18+
19+
import (
20+
"encoding/json"
21+
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
24+
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
25+
)
26+
27+
// Checkpoint represents a structure to store DRA checkpoint data
28+
type Checkpoint struct {
29+
// Data is a JSON serialized checkpoint data
30+
// See state.CheckpointData for the details
31+
Data string
32+
// Checksum is a checksum of Data
33+
Checksum checksum.Checksum
34+
}
35+
36+
// NewCheckpoint creates a new checkpoint from a list of claim info states
37+
func NewCheckpoint(data state.ClaimInfoStateList) (*Checkpoint, error) {
38+
cpData := &state.CheckpointData{
39+
TypeMeta: metav1.TypeMeta{
40+
Kind: state.CheckpointKind,
41+
APIVersion: state.CheckpointAPIVersion,
42+
},
43+
Entries: data,
44+
}
45+
46+
cpDataBytes, err := json.Marshal(cpData)
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
return &Checkpoint{
52+
Data: string(cpDataBytes),
53+
Checksum: checksum.New(string(cpDataBytes)),
54+
}, nil
55+
}
56+
57+
// MarshalCheckpoint marshals checkpoint to JSON
58+
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
59+
return json.Marshal(cp)
60+
}
61+
62+
// UnmarshalCheckpoint unmarshals checkpoint from JSON
63+
// and verifies its data checksum
64+
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
65+
if err := json.Unmarshal(blob, cp); err != nil {
66+
return err
67+
}
68+
69+
// verify checksum
70+
if err := cp.VerifyChecksum(); err != nil {
71+
return err
72+
}
73+
74+
return nil
75+
}
76+
77+
// VerifyChecksum verifies that current checksum
78+
// of checkpointed Data is valid
79+
func (cp *Checkpoint) VerifyChecksum() error {
80+
return cp.Checksum.Verify(cp.Data)
81+
}
82+
83+
// GetEntries returns list of claim info states from checkpoint
84+
func (cp *Checkpoint) GetEntries() (state.ClaimInfoStateList, error) {
85+
var cpData state.CheckpointData
86+
if err := json.Unmarshal([]byte(cp.Data), &cpData); err != nil {
87+
return nil, err
88+
}
89+
90+
return cpData.Entries, nil
91+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package checkpoint
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
24+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
25+
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
26+
)
27+
28+
type Checkpointer interface {
29+
GetOrCreate() (state.ClaimInfoStateList, error)
30+
Store(state.ClaimInfoStateList) error
31+
}
32+
33+
type checkpointer struct {
34+
sync.RWMutex
35+
checkpointManager checkpointmanager.CheckpointManager
36+
checkpointName string
37+
}
38+
39+
// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend
40+
func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) {
41+
if len(checkpointName) == 0 {
42+
return nil, fmt.Errorf("received empty string instead of checkpointName")
43+
}
44+
45+
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
48+
}
49+
checkpointer := &checkpointer{
50+
checkpointManager: checkpointManager,
51+
checkpointName: checkpointName,
52+
}
53+
54+
return checkpointer, nil
55+
}
56+
57+
// GetOrCreate gets list of claim info states from a checkpoint
58+
// or creates empty list it checkpoint doesn't exist yet
59+
func (sc *checkpointer) GetOrCreate() (state.ClaimInfoStateList, error) {
60+
sc.Lock()
61+
defer sc.Unlock()
62+
63+
checkpoint, err := NewCheckpoint(nil)
64+
err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
65+
if err == errors.ErrCheckpointNotFound {
66+
sc.store(state.ClaimInfoStateList{})
67+
return state.ClaimInfoStateList{}, nil
68+
}
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
71+
}
72+
73+
return checkpoint.GetEntries()
74+
}
75+
76+
// Store stores checkpoint to the file
77+
func (sc *checkpointer) Store(claimInfoStateList state.ClaimInfoStateList) error {
78+
sc.Lock()
79+
defer sc.Unlock()
80+
81+
return sc.store(claimInfoStateList)
82+
}
83+
84+
// store saves state to a checkpoint, caller is responsible for locking
85+
func (sc *checkpointer) store(claimInfoStateList state.ClaimInfoStateList) error {
86+
checkpoint, err := NewCheckpoint(claimInfoStateList)
87+
88+
if err != nil {
89+
return err
90+
}
91+
92+
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
93+
if err != nil {
94+
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
95+
}
96+
return nil
97+
}

0 commit comments

Comments
 (0)