Skip to content

Commit 632aa30

Browse files
committed
refactor: migrate the job gc to manager gc server.
Signed-off-by: chlins <chlins.zhang@gmail.com>
1 parent 09d99d3 commit 632aa30

File tree

6 files changed

+235
-117
lines changed

6 files changed

+235
-117
lines changed

manager/gc/job.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2025 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package gc
18+
19+
import (
20+
"encoding/json"
21+
"time"
22+
23+
"gorm.io/gorm"
24+
25+
logger "d7y.io/dragonfly/v2/internal/dflog"
26+
"d7y.io/dragonfly/v2/manager/models"
27+
libgc "d7y.io/dragonfly/v2/pkg/gc"
28+
)
29+
30+
const (
31+
// DefaultJobGCBatchSize is the default batch size for deleting jobs.
32+
DefaultJobGCBatchSize = 5000
33+
34+
// DefaultJobGCInterval is the default interval for running job GC.
35+
DefaultJobGCInterval = time.Hour * 3
36+
37+
// DefaultJobGCTimeout is the default timeout for running job GC.
38+
DefaultJobGCTimeout = time.Hour * 1
39+
40+
// JohGCTaskID is the ID of the job GC task.
41+
JobGCTaskID = "job"
42+
)
43+
44+
func NewJobGCTask(db *gorm.DB) libgc.Task {
45+
return libgc.Task{
46+
ID: JobGCTaskID,
47+
Interval: DefaultJobGCInterval,
48+
Timeout: DefaultJobGCTimeout,
49+
Runner: &job{db: db, recorder: newJobRecorder(db)},
50+
}
51+
}
52+
53+
// job is the struct for cleaning up jobs which implements the gc Runner interface.
54+
type job struct {
55+
db *gorm.DB
56+
recorder *jobRecorder
57+
}
58+
59+
// RunGC implements the gc Runner interface.
60+
func (j *job) RunGC() error {
61+
ttl, err := j.getTTL()
62+
if err != nil {
63+
return err
64+
}
65+
66+
if err = j.recorder.Init(JobGCTaskID, models.JSONMap{
67+
"ttl": ttl,
68+
"batch_size": DefaultJobGCBatchSize,
69+
}); err != nil {
70+
return err
71+
}
72+
73+
var gcResult Result
74+
defer func() {
75+
if err := j.recorder.Record(gcResult); err != nil {
76+
logger.Errorf("failed to record job GC result: %v", err)
77+
}
78+
}()
79+
80+
for {
81+
result := j.db.Where("created_at < ?", time.Now().Add(-ttl)).Limit(DefaultJobGCBatchSize).Unscoped().Delete(&models.Job{})
82+
if result.Error != nil {
83+
gcResult.Error = result.Error
84+
return result.Error
85+
}
86+
87+
if result.RowsAffected == 0 {
88+
break
89+
}
90+
91+
gcResult.Purged += result.RowsAffected
92+
logger.Infof("gc job deleted %d jobs", result.RowsAffected)
93+
}
94+
95+
return nil
96+
}
97+
98+
func (j *job) getTTL() (time.Duration, error) {
99+
var config models.Config
100+
if err := j.db.Model(models.Config{}).First(&config, &models.Config{Name: models.ConfigGC}).Error; err != nil {
101+
return 0, err
102+
}
103+
104+
var gcConfig models.GCConfig
105+
if err := json.Unmarshal([]byte(config.Value), &gcConfig); err != nil {
106+
return 0, err
107+
}
108+
109+
return gcConfig.Job.TTL, nil
110+
}

manager/gc/recorder.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2025 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package gc
18+
19+
import (
20+
"errors"
21+
22+
"gorm.io/gorm"
23+
24+
"d7y.io/dragonfly/v2/manager/models"
25+
)
26+
27+
const (
28+
// GCJobType indicates the gc task is completed successfully.
29+
GCJobType = "gc"
30+
31+
// GCStateSuccess indicates the gc task is completed successfully.
32+
GCStateSuccess = "SUCCESS"
33+
34+
// GCStateFailure indicates the gc task is completed with failure.
35+
GCStateFailure = "FAILURE"
36+
)
37+
38+
type Result struct {
39+
Error error
40+
Purged int64
41+
}
42+
43+
type jobRecorder struct {
44+
db *gorm.DB
45+
job *models.Job
46+
}
47+
48+
func newJobRecorder(db *gorm.DB) *jobRecorder {
49+
return &jobRecorder{
50+
db: db,
51+
}
52+
}
53+
54+
func (jb *jobRecorder) Init(taskID string, args models.JSONMap) error {
55+
job := models.Job{
56+
Type: GCJobType,
57+
TaskID: taskID,
58+
Args: args,
59+
}
60+
61+
if err := jb.db.Create(&job).Error; err != nil {
62+
return err
63+
}
64+
65+
jb.job = &job
66+
return nil
67+
}
68+
69+
func (jb *jobRecorder) Record(result Result) error {
70+
if jb.job == nil {
71+
return errors.New("job not found")
72+
}
73+
74+
if jb.job.Result == nil {
75+
jb.job.Result = make(models.JSONMap)
76+
}
77+
78+
jb.job.State = GCStateSuccess
79+
jb.job.Result["purged"] = result.Purged
80+
81+
if result.Error != nil {
82+
jb.job.State = GCStateFailure
83+
jb.job.Result["error"] = result.Error.Error()
84+
}
85+
86+
return jb.db.Save(jb.job).Error
87+
}

manager/job/gc.go

Lines changed: 0 additions & 94 deletions
This file was deleted.

manager/job/job.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ type Job struct {
4141
Preheat
4242
SyncPeers
4343
Task
44-
GC
4544
}
4645

4746
// New returns a new Job.
@@ -78,29 +77,21 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
7877
return nil, err
7978
}
8079

81-
gc, err := newGC(cfg, gdb)
82-
if err != nil {
83-
return nil, err
84-
}
85-
8680
return &Job{
8781
Job: j,
8882
Preheat: preheat,
8983
SyncPeers: syncPeers,
9084
Task: newTask(j),
91-
GC: gc,
9285
}, nil
9386
}
9487

9588
// Serve starts the job server.
9689
func (j *Job) Serve() {
97-
go j.GC.Serve()
9890
go j.SyncPeers.Serve()
9991
}
10092

10193
// Stop stops the job server.
10294
func (j *Job) Stop() {
103-
j.GC.Stop()
10495
j.SyncPeers.Stop()
10596
}
10697

manager/manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"d7y.io/dragonfly/v2/manager/cache"
3535
"d7y.io/dragonfly/v2/manager/config"
3636
"d7y.io/dragonfly/v2/manager/database"
37+
"d7y.io/dragonfly/v2/manager/gc"
3738
"d7y.io/dragonfly/v2/manager/job"
3839
"d7y.io/dragonfly/v2/manager/metrics"
3940
"d7y.io/dragonfly/v2/manager/permission/rbac"
@@ -42,6 +43,7 @@ import (
4243
"d7y.io/dragonfly/v2/manager/searcher"
4344
"d7y.io/dragonfly/v2/manager/service"
4445
"d7y.io/dragonfly/v2/pkg/dfpath"
46+
pkggc "d7y.io/dragonfly/v2/pkg/gc"
4547
"d7y.io/dragonfly/v2/pkg/net/ip"
4648
"d7y.io/dragonfly/v2/pkg/objectstorage"
4749
"d7y.io/dragonfly/v2/pkg/rpc"
@@ -93,6 +95,9 @@ type Server struct {
9395
// Job rate limiter.
9496
jobRateLimiter ratelimiter.JobRateLimiter
9597

98+
// GC server.
99+
gc pkggc.GC
100+
96101
// GRPC server.
97102
grpcServer *grpc.Server
98103

@@ -157,6 +162,14 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
157162
return nil, err
158163
}
159164

165+
// Initialize garbage collector.
166+
s.gc = pkggc.New()
167+
168+
// Register job gc task.
169+
if err := s.gc.Add(gc.NewJobGCTask(db.DB)); err != nil {
170+
return nil, err
171+
}
172+
160173
// Initialize REST server.
161174
restService := service.New(cfg, db, cache, job, enforcer, objectStorage)
162175
router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, s.jobRateLimiter, EmbedFolder(assets, assetsTargetPath))
@@ -253,6 +266,12 @@ func (s *Server) Serve() error {
253266
s.jobRateLimiter.Serve()
254267
}()
255268

269+
// Started gc server.
270+
go func() {
271+
logger.Info("started gc server")
272+
s.gc.Start()
273+
}()
274+
256275
// Generate GRPC listener.
257276
ip, ok := ip.FormatIP(s.config.Server.GRPC.ListenIP.String())
258277
if !ok {
@@ -299,6 +318,9 @@ func (s *Server) Stop() {
299318
// Stop job rate limiter.
300319
s.jobRateLimiter.Stop()
301320

321+
// Stop gc server.
322+
s.gc.Stop()
323+
302324
// Stop GRPC server.
303325
stopped := make(chan struct{})
304326
go func() {

0 commit comments

Comments
 (0)