Skip to content

Commit 6242f59

Browse files
committed
feat: support maximum disk usage limit
Signed-off-by: imeoer <yansong.ys@antgroup.com>
1 parent b588994 commit 6242f59

File tree

10 files changed

+355
-53
lines changed

10 files changed

+355
-53
lines changed

cmd/model-csi-driver/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func main() {
3838
},
3939
},
4040
Action: func(c *cli.Context) error {
41-
cfg, err := config.FromFile(c.String("config"))
41+
cfg, err := config.New(c.String("config"))
4242
if err != nil {
4343
return errors.Wrap(err, "load config")
4444
}

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/container-storage-interface/spec v1.2.0
77
github.com/containerd/containerd v1.7.27
88
github.com/dustin/go-humanize v1.0.1
9+
github.com/fsnotify/fsnotify v1.8.0
910
github.com/google/uuid v1.6.0
1011
github.com/labstack/echo/v4 v4.13.3
1112
github.com/moby/sys/mountinfo v0.7.2
@@ -27,6 +28,7 @@ require (
2728
go.opentelemetry.io/otel/trace v1.37.0
2829
golang.org/x/net v0.42.0
2930
golang.org/x/sync v0.16.0
31+
golang.org/x/sys v0.35.0
3032
google.golang.org/grpc v1.75.0
3133
gopkg.in/yaml.v2 v2.4.0
3234
k8s.io/api v0.28.4
@@ -128,7 +130,6 @@ require (
128130
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
129131
golang.org/x/crypto v0.41.0 // indirect
130132
golang.org/x/oauth2 v0.30.0 // indirect
131-
golang.org/x/sys v0.35.0 // indirect
132133
golang.org/x/term v0.34.0 // indirect
133134
golang.org/x/text v0.28.0 // indirect
134135
golang.org/x/time v0.8.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc
7878
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
7979
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
8080
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
81+
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
82+
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
8183
github.com/gliderlabs/ssh v0.3.8 h1:a4YXD1V7xMF9g5nTkdfnja3Sxy1PVDCj1Zg4Wb8vY6c=
8284
github.com/gliderlabs/ssh v0.3.8/go.mod h1:xYoytBv1sV0aL3CavoDuJIQNURXkkfPA/wxQ1pL1fAU=
8385
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI=

pkg/config/config.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,29 @@ import (
55
"os"
66
"path/filepath"
77

8+
"github.com/dustin/go-humanize"
89
"github.com/pkg/errors"
910
"gopkg.in/yaml.v2"
1011
)
1112

13+
type HumanizeSize uint64
14+
15+
func (s *HumanizeSize) UnmarshalYAML(unmarshal func(interface{}) error) error {
16+
var str string
17+
if err := unmarshal(&str); err != nil {
18+
return err
19+
}
20+
21+
size, err := humanize.ParseBytes(str)
22+
if err != nil {
23+
return err
24+
}
25+
26+
*s = HumanizeSize(size)
27+
28+
return nil
29+
}
30+
1231
type Config struct {
1332
// Pattern:
1433
// static: /var/lib/dragonfly/model-csi/volumes/$volumeName/model
@@ -28,8 +47,10 @@ type Config struct {
2847
NodeID string // From env CSI_NODE_ID
2948
Mode string // From env X_CSI_MODE: "controller" or "node"
3049
}
50+
3151
type Features struct {
32-
CheckDiskQuota bool `yaml:"check_disk_quota"`
52+
CheckDiskQuota bool `yaml:"check_disk_quota"`
53+
DiskQuotaMaximumSize HumanizeSize `yaml:"disk_quota_maximum_size"`
3354
}
3455

3556
type PullConfig struct {
@@ -116,7 +137,7 @@ func (cfg *Config) IsNodeMode() bool {
116137
return cfg.Mode == "node"
117138
}
118139

119-
func FromFile(path string) (*Config, error) {
140+
func parse(path string) (*Config, error) {
120141
data, err := os.ReadFile(path)
121142
if err != nil {
122143
return nil, errors.Wrap(err, "read config file")
@@ -184,3 +205,14 @@ func FromFile(path string) (*Config, error) {
184205

185206
return &cfg, nil
186207
}
208+
209+
func New(path string) (*Config, error) {
210+
cfg, err := parse(path)
211+
if err != nil {
212+
return nil, err
213+
}
214+
215+
go cfg.watch(path)
216+
217+
return cfg, nil
218+
}

pkg/config/watcher.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package config
2+
3+
import (
4+
"path/filepath"
5+
"sync"
6+
7+
"github.com/fsnotify/fsnotify"
8+
"github.com/modelpack/model-csi-driver/pkg/logger"
9+
)
10+
11+
var mutex = sync.Mutex{}
12+
13+
func (cfg *Config) watch(path string) {
14+
configDir := filepath.Dir(path)
15+
configFile := filepath.Base(path)
16+
17+
watcher, err := fsnotify.NewWatcher()
18+
if err != nil {
19+
logger.Logger().WithError(err).Error("failed to create fsnotify watcher")
20+
return
21+
}
22+
defer func() { _ = watcher.Close() }()
23+
24+
go func() {
25+
for {
26+
select {
27+
case event, ok := <-watcher.Events:
28+
if !ok {
29+
return
30+
}
31+
relPath := filepath.Base(event.Name)
32+
if relPath == configFile && (event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename)) != 0 {
33+
logger.Logger().Infof("config file changed: %s, event: %s", event.Name, event.Op)
34+
cfg.reload(filepath.Join(configDir, configFile))
35+
}
36+
case err, ok := <-watcher.Errors:
37+
if !ok {
38+
return
39+
}
40+
logger.Logger().WithError(err).Error("watcher error")
41+
}
42+
}
43+
}()
44+
45+
err = watcher.Add(configDir)
46+
if err != nil {
47+
logger.Logger().WithError(err).Error("failed to add config dir to watcher")
48+
}
49+
}
50+
51+
func (cfg *Config) reload(path string) {
52+
newCfg, err := parse(path)
53+
if err != nil {
54+
logger.Logger().WithError(err).Error("failed to parse config file")
55+
return
56+
}
57+
58+
mutex.Lock()
59+
defer mutex.Unlock()
60+
61+
*cfg = *newCfg
62+
}

pkg/server/server_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type mockPuller struct {
4040
hook *service.Hook
4141
}
4242

43-
func (puller *mockPuller) Pull(ctx context.Context, reference, targetDir string, checkDiskQuota bool) error {
43+
func (puller *mockPuller) Pull(ctx context.Context, reference, targetDir string) error {
4444
if err := os.MkdirAll(targetDir, 0755); err != nil {
4545
return err
4646
}
@@ -560,13 +560,13 @@ func TestServer(t *testing.T) {
560560
if configPathFromEnv != "" {
561561
defaultCoofigPath = configPathFromEnv
562562
}
563-
cfg, err := config.FromFile(defaultCoofigPath)
563+
cfg, err := config.New(defaultCoofigPath)
564564
require.NoError(t, err)
565565
cfg.RootDir = rootDir
566566
cfg.PullConfig.ProxyURL = ""
567567
service.CacheSacnInterval = 1 * time.Second
568568

569-
service.NewPuller = func(ctx context.Context, pullCfg *config.PullConfig, hook *service.Hook) service.Puller {
569+
service.NewPuller = func(ctx context.Context, pullCfg *config.PullConfig, hook *service.Hook, diskQuotaChecker *service.DiskQuotaChecker) service.Puller {
570570
return &mockPuller{
571571
pullCfg: pullCfg,
572572
duration: time.Second * 2,

pkg/service/puller.go

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@ import (
55
"fmt"
66
"io"
77
"os"
8-
"path/filepath"
98
"sort"
109
"sync"
1110
"sync/atomic"
12-
"syscall"
1311
"time"
1412

1513
"github.com/dustin/go-humanize"
@@ -52,19 +50,21 @@ func NewHook(ctx context.Context, progressCb func(progress status.Progress)) *Ho
5250
}
5351

5452
type Puller interface {
55-
Pull(ctx context.Context, reference, targetDir string, checkDiskQuota bool) error
53+
Pull(ctx context.Context, reference, targetDir string) error
5654
}
5755

58-
var NewPuller = func(ctx context.Context, pullCfg *config.PullConfig, hook *Hook) Puller {
56+
var NewPuller = func(ctx context.Context, pullCfg *config.PullConfig, hook *Hook, diskQuotaChecker *DiskQuotaChecker) Puller {
5957
return &puller{
60-
pullCfg: pullCfg,
61-
hook: hook,
58+
pullCfg: pullCfg,
59+
hook: hook,
60+
diskQuotaChecker: diskQuotaChecker,
6261
}
6362
}
6463

6564
type puller struct {
66-
pullCfg *config.PullConfig
67-
hook *Hook
65+
pullCfg *config.PullConfig
66+
hook *Hook
67+
diskQuotaChecker *DiskQuotaChecker
6868
}
6969

7070
func (h *Hook) getProgressDesc() string {
@@ -201,7 +201,7 @@ func (h *Hook) GetProgress() status.Progress {
201201
return h.getProgress()
202202
}
203203

204-
func (p *puller) Pull(ctx context.Context, reference, targetDir string, checkDiskQuota bool) error {
204+
func (p *puller) Pull(ctx context.Context, reference, targetDir string) error {
205205
keyChain, err := auth.GetKeyChainByRef(reference)
206206
if err != nil {
207207
return errors.Wrapf(err, "get auth for model: %s", reference)
@@ -212,9 +212,10 @@ func (p *puller) Pull(ctx context.Context, reference, targetDir string, checkDis
212212
return errors.Wrap(err, "create modctl backend")
213213
}
214214

215-
if checkDiskQuota {
216-
if err := p.checkDiskQuota(ctx, reference, filepath.Dir(targetDir), keyChain.ServerScheme == "http", b); err != nil {
217-
return err
215+
if p.diskQuotaChecker != nil {
216+
plainHTTP := keyChain.ServerScheme == "http"
217+
if err := p.diskQuotaChecker.Check(ctx, b, reference, plainHTTP); err != nil {
218+
return errors.Wrap(err, "check disk quota")
218219
}
219220
}
220221

@@ -247,34 +248,3 @@ func (p *puller) Pull(ctx context.Context, reference, targetDir string, checkDis
247248

248249
return nil
249250
}
250-
251-
func (p *puller) checkDiskQuota(ctx context.Context, reference, dir string, plainHTTP bool, b backend.Backend) error {
252-
var st syscall.Statfs_t
253-
if err := syscall.Statfs(dir, &st); err != nil {
254-
logger.WithContext(ctx).WithError(err).Errorf("failed to stat dir %s in mounting %s", dir, reference)
255-
} else {
256-
availSpace := int64(st.Bavail) * int64(st.Bsize)
257-
logger.WithContext(ctx).Infof("cache dir available space: %s", humanize.IBytes(uint64(availSpace)))
258-
// get model image size
259-
result, err := b.Inspect(ctx, reference, &modctlConfig.Inspect{Remote: true, Insecure: true, PlainHTTP: plainHTTP})
260-
if err != nil {
261-
logger.WithContext(ctx).WithError(err).Errorf("failed to inspect model image: %s", reference)
262-
return errors.Wrap(err, "inspect model image")
263-
}
264-
265-
modelArtifact, ok := result.(*backend.InspectedModelArtifact)
266-
if !ok {
267-
logger.WithContext(ctx).Errorf("invalid inspected result: %s", result)
268-
return fmt.Errorf("invalid inspected result")
269-
}
270-
271-
totalSize := int64(0)
272-
for _, layer := range modelArtifact.Layers {
273-
totalSize += layer.Size
274-
}
275-
if totalSize > availSpace {
276-
return errors.Wrapf(syscall.ENOSPC, "model image %s is %s, but only %s of disk quota is available", reference, humanize.IBytes(uint64(totalSize)), humanize.IBytes(uint64(availSpace)))
277-
}
278-
}
279-
return nil
280-
}

0 commit comments

Comments
 (0)