Skip to content

Commit d5eef60

Browse files
authored
feat: clean up expired jobs to prevent performance problems (#3504)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent bba6b9b commit d5eef60

File tree

7 files changed

+216
-1
lines changed

7 files changed

+216
-1
lines changed

manager/config/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,25 @@ type JobConfig struct {
300300
// RateLimit configuration.
301301
RateLimit RateLimitConfig `yaml:"rateLimit" mapstructure:"rateLimit"`
302302

303+
// GC configuration, used to clean up expired jobs. If the count of the jobs is huge,
304+
// it may cause performance problems.
305+
GC GCConfig `yaml:"gc" mapstructure:"gc"`
306+
303307
// Preheat configuration.
304308
Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"`
305309

306310
// Sync peers configuration.
307311
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`
308312
}
309313

314+
type GCConfig struct {
315+
// Interval is the interval for gc.
316+
Interval time.Duration `yaml:"interval" mapstructure:"interval"`
317+
318+
// TTL is the ttl for job.
319+
TTL time.Duration `yaml:"ttl" mapstructure:"ttl"`
320+
}
321+
310322
type PreheatConfig struct {
311323
// RegistryTimeout is the timeout for requesting registry to get token and manifest.
312324
RegistryTimeout time.Duration `yaml:"registryTimeout" mapstructure:"registryTimeout"`
@@ -466,6 +478,10 @@ func New() *Config {
466478
Capacity: DefaultJobRateLimitCapacity,
467479
Quantum: DefaultJobRateLimitQuantum,
468480
},
481+
GC: GCConfig{
482+
Interval: DefaultJobGCInterval,
483+
TTL: DefaultJobGCTTL,
484+
},
469485
Preheat: PreheatConfig{
470486
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
471487
},
@@ -647,6 +663,14 @@ func (cfg *Config) Validate() error {
647663
return errors.New("rateLimit requires parameter quantum")
648664
}
649665

666+
if cfg.Job.GC.Interval == 0 {
667+
return errors.New("gc requires parameter interval")
668+
}
669+
670+
if cfg.Job.GC.TTL == 0 {
671+
return errors.New("gc requires parameter ttl")
672+
}
673+
650674
if cfg.Job.Preheat.TLS != nil {
651675
if cfg.Job.Preheat.TLS.CACert == "" {
652676
return errors.New("preheat requires parameter caCert")

manager/config/config_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ func TestConfig_Load(t *testing.T) {
192192
Capacity: 1000,
193193
Quantum: 1000,
194194
},
195+
GC: GCConfig{
196+
Interval: 1 * time.Second,
197+
TTL: 1 * time.Second,
198+
},
195199
Preheat: PreheatConfig{
196200
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
197201
TLS: &PreheatTLSClientConfig{
@@ -747,6 +751,36 @@ func TestConfig_Validate(t *testing.T) {
747751
assert.EqualError(err, "rateLimit requires parameter quantum")
748752
},
749753
},
754+
{
755+
name: "gc requires parameter interval",
756+
config: New(),
757+
mock: func(cfg *Config) {
758+
cfg.Auth.JWT = mockJWTConfig
759+
cfg.Database.Type = DatabaseTypeMysql
760+
cfg.Database.Mysql = mockMysqlConfig
761+
cfg.Database.Redis = mockRedisConfig
762+
cfg.Job.GC.Interval = 0
763+
},
764+
expect: func(t *testing.T, err error) {
765+
assert := assert.New(t)
766+
assert.EqualError(err, "gc requires parameter interval")
767+
},
768+
},
769+
{
770+
name: "gc requires parameter ttl",
771+
config: New(),
772+
mock: func(cfg *Config) {
773+
cfg.Auth.JWT = mockJWTConfig
774+
cfg.Database.Type = DatabaseTypeMysql
775+
cfg.Database.Mysql = mockMysqlConfig
776+
cfg.Database.Redis = mockRedisConfig
777+
cfg.Job.GC.TTL = 0
778+
},
779+
expect: func(t *testing.T, err error) {
780+
assert := assert.New(t)
781+
assert.EqualError(err, "gc requires parameter ttl")
782+
},
783+
},
750784
{
751785
name: "preheat requires parameter caCert",
752786
config: New(),

manager/config/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ const (
9696
// DefaultJobRateLimitQuantum is the default quantum for job rate limit.
9797
DefaultJobRateLimitQuantum = 10
9898

99+
// DefaultJobGCInterval is the default interval for gc job.
100+
DefaultJobGCInterval = 24 * time.Hour
101+
102+
// DefaultJobGCTTL is the default ttl for job.
103+
DefaultJobGCTTL = 7 * 24 * time.Hour
104+
99105
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
100106
DefaultJobPreheatRegistryTimeout = 1 * time.Minute
101107

manager/config/testdata/manager.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ job:
7070
fillInterval: 1s
7171
capacity: 1000
7272
quantum: 1000
73+
gc:
74+
interval: 1s
75+
ttl: 1s
7376
preheat:
7477
registryTimeout: 1m
7578
tls:

manager/job/gc.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2024 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
//go:generate mockgen -destination mocks/gc_mock.go -source gc.go -package mocks
18+
19+
package job
20+
21+
import (
22+
"context"
23+
"time"
24+
25+
"gorm.io/gorm"
26+
27+
logger "d7y.io/dragonfly/v2/internal/dflog"
28+
"d7y.io/dragonfly/v2/manager/config"
29+
"d7y.io/dragonfly/v2/manager/models"
30+
)
31+
32+
// GC is an interface for gc.
33+
type GC interface {
34+
// Serve started gc server.
35+
Serve()
36+
37+
// Stop gc server.
38+
Stop()
39+
}
40+
41+
// gc is an implementation of GC.
42+
type gc struct {
43+
config *config.Config
44+
db *gorm.DB
45+
done chan struct{}
46+
}
47+
48+
// newGC returns a new GC.
49+
func newGC(cfg *config.Config, gdb *gorm.DB) (GC, error) {
50+
return &gc{
51+
config: cfg,
52+
db: gdb,
53+
done: make(chan struct{}),
54+
}, nil
55+
}
56+
57+
// Serve started gc server.
58+
func (gc *gc) Serve() {
59+
tick := time.NewTicker(gc.config.Job.GC.Interval)
60+
for {
61+
select {
62+
case <-tick.C:
63+
logger.Infof("gc job started")
64+
if err := gc.db.WithContext(context.Background()).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Unscoped().Delete(&models.Job{}).Error; err != nil {
65+
logger.Errorf("gc job failed: %v", err)
66+
}
67+
case <-gc.done:
68+
return
69+
}
70+
}
71+
}
72+
73+
// Stop gc server.
74+
func (gc *gc) Stop() {
75+
close(gc.done)
76+
}

manager/job/job.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Job struct {
4141
Preheat
4242
SyncPeers
4343
Task
44+
GC
4445
}
4546

4647
// New returns a new Job.
@@ -77,22 +78,30 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
7778
return nil, err
7879
}
7980

81+
gc, err := newGC(cfg, gdb)
82+
if err != nil {
83+
return nil, err
84+
}
85+
8086
task := newTask(j)
8187
return &Job{
8288
Job: j,
8389
Preheat: preheat,
8490
SyncPeers: syncPeers,
8591
Task: task,
92+
GC: gc,
8693
}, nil
8794
}
8895

8996
// Serve starts the job server.
9097
func (j *Job) Serve() {
91-
j.SyncPeers.Serve()
98+
go j.GC.Serve()
99+
go j.SyncPeers.Serve()
92100
}
93101

94102
// Stop stops the job server.
95103
func (j *Job) Stop() {
104+
j.GC.Stop()
96105
j.SyncPeers.Stop()
97106
}
98107

manager/job/mocks/gc_mock.go

Lines changed: 63 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)