Skip to content

Commit e74ebf1

Browse files
committed
feat: delete jobs in batches
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 6695100 commit e74ebf1

File tree

5 files changed

+61
-5
lines changed

5 files changed

+61
-5
lines changed

manager/config/config.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ type GCConfig struct {
321321

322322
// TTL is the ttl for job.
323323
TTL time.Duration `yaml:"ttl" mapstructure:"ttl"`
324+
325+
// BatchSize is the batch size when operating gorm database.
326+
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
324327
}
325328

326329
type PreheatConfig struct {
@@ -442,8 +445,9 @@ func New() *Config {
442445
},
443446
Job: JobConfig{
444447
GC: GCConfig{
445-
Interval: DefaultJobGCInterval,
446-
TTL: DefaultJobGCTTL,
448+
Interval: DefaultJobGCInterval,
449+
TTL: DefaultJobGCTTL,
450+
BatchSize: DefaultJobGCBatchSize,
447451
},
448452
Preheat: PreheatConfig{
449453
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
@@ -629,6 +633,10 @@ func (cfg *Config) Validate() error {
629633
return errors.New("gc requires parameter ttl")
630634
}
631635

636+
if cfg.Job.GC.BatchSize == 0 {
637+
return errors.New("gc requires parameter batchSize")
638+
}
639+
632640
if cfg.Job.Preheat.RegistryTimeout == 0 {
633641
return errors.New("preheat requires parameter registryTimeout")
634642
}

manager/config/config_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,9 @@ func TestConfig_Load(t *testing.T) {
178178
},
179179
Job: JobConfig{
180180
GC: GCConfig{
181-
Interval: 1 * time.Second,
182-
TTL: 1 * time.Second,
181+
Interval: 1 * time.Second,
182+
TTL: 1 * time.Second,
183+
BatchSize: 100,
183184
},
184185
Preheat: PreheatConfig{
185186
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
@@ -765,6 +766,21 @@ func TestConfig_Validate(t *testing.T) {
765766
assert.EqualError(err, "gc requires parameter ttl")
766767
},
767768
},
769+
{
770+
name: "gc requires parameter batchSize",
771+
config: New(),
772+
mock: func(cfg *Config) {
773+
cfg.Auth.JWT = mockJWTConfig
774+
cfg.Database.Type = DatabaseTypeMysql
775+
cfg.Database.Mysql = mockMysqlConfig
776+
cfg.Database.Redis = mockRedisConfig
777+
cfg.Job.GC.BatchSize = 0
778+
},
779+
expect: func(t *testing.T, err error) {
780+
assert := assert.New(t)
781+
assert.EqualError(err, "gc requires parameter batchSize")
782+
},
783+
},
768784
{
769785
name: "preheat requires parameter registryTimeout",
770786
config: New(),

manager/config/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ const (
9393
// DefaultJobGCTTL is the default ttl for job.
9494
DefaultJobGCTTL = 12 * time.Hour
9595

96+
// DefaultJobGCBatchSize is the default batch size for operating on the database in gc job.
97+
DefaultJobGCBatchSize = 5000
98+
9699
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
97100
DefaultJobPreheatRegistryTimeout = 1 * time.Minute
98101

manager/config/testdata/manager.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ job:
7272
gc:
7373
interval: 1s
7474
ttl: 1s
75+
batchSize: 100
7576
preheat:
7677
registryTimeout: 1m
7778
tls:

manager/job/gc.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (gc *gc) Serve() {
6161
select {
6262
case <-tick.C:
6363
logger.Infof("gc job started")
64-
if err := gc.db.WithContext(context.Background()).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Unscoped().Delete(&models.Job{}).Error; err != nil {
64+
if err := gc.deleteInBatches(context.Background()); err != nil {
6565
logger.Errorf("gc job failed: %v", err)
6666
}
6767
case <-gc.done:
@@ -74,3 +74,31 @@ func (gc *gc) Serve() {
7474
func (gc *gc) Stop() {
7575
close(gc.done)
7676
}
77+
78+
// deleteInBatches deletes jobs in batches.
79+
func (gc *gc) deleteInBatches(ctx context.Context) error {
80+
for {
81+
jobs := make([]models.Job, 0, gc.config.Job.GC.BatchSize)
82+
if err := gc.db.WithContext(ctx).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Limit(gc.config.Job.GC.BatchSize).Find(&jobs).Error; err != nil {
83+
return err
84+
}
85+
86+
if len(jobs) == 0 {
87+
logger.Infof("gc job finished")
88+
break
89+
}
90+
91+
var ids []uint
92+
for _, job := range jobs {
93+
ids = append(ids, job.ID)
94+
}
95+
96+
if err := gc.db.WithContext(ctx).Where("id IN ?", ids).Unscoped().Delete(&models.Job{}).Error; err != nil {
97+
return err
98+
}
99+
100+
logger.Infof("deleted %d jobs", len(jobs))
101+
}
102+
103+
return nil
104+
}

0 commit comments

Comments
 (0)