Skip to content

Commit a541bed

Browse files
committed
feat: add the API for trigger the GC manually
Signed-off-by: chlins <chlins.zhang@gmail.com>
1 parent c63b794 commit a541bed

File tree

11 files changed

+173
-12
lines changed

11 files changed

+173
-12
lines changed

internal/job/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ const (
3535

3636
// DeleteTaskJob is the name of deleting task job.
3737
DeleteTaskJob = "delete_task"
38+
39+
// GCJob is the name of gc job.
40+
GCJob = "gc"
3841
)
3942

4043
// Machinery server configuration.

manager/gc/audit.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,25 @@ func (a *audit) RunGC(ctx context.Context) error {
6565
return err
6666
}
6767

68-
if err := a.recorder.Init(AuditGCTaskID, models.JSONMap{
68+
args := models.JSONMap{
6969
"ttl": ttl,
7070
"batch_size": DefaultAuditGCBatchSize,
71-
}); err != nil {
71+
}
72+
73+
var userID uint
74+
if id, ok := ctx.Value(pkggc.ContextKeyUserID).(uint); ok {
75+
userID = id
76+
}
77+
78+
var taskID string
79+
if id, ok := ctx.Value(pkggc.ContextKeyTaskID).(string); ok {
80+
taskID = id
81+
} else {
82+
// Use the default task ID if taskID is not provided. (applied to background periodic execution scenarios)
83+
taskID = AuditGCTaskID
84+
}
85+
86+
if err := a.recorder.Init(userID, taskID, args); err != nil {
7287
return err
7388
}
7489

manager/gc/job.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,25 @@ func (j *job) RunGC(ctx context.Context) error {
6464
return err
6565
}
6666

67-
if err = j.recorder.Init(JobGCTaskID, models.JSONMap{
67+
args := models.JSONMap{
6868
"ttl": ttl,
6969
"batch_size": DefaultJobGCBatchSize,
70-
}); err != nil {
70+
}
71+
72+
var userID uint
73+
if id, ok := ctx.Value(pkggc.ContextKeyUserID).(uint); ok {
74+
userID = id
75+
}
76+
77+
var taskID string
78+
if id, ok := ctx.Value(pkggc.ContextKeyTaskID).(string); ok {
79+
taskID = id
80+
} else {
81+
// Use the default task ID if taskID is not provided. (applied to background periodic execution scenarios)
82+
taskID = AuditGCTaskID
83+
}
84+
85+
if err = j.recorder.Init(userID, taskID, args); err != nil {
7186
return err
7287
}
7388

manager/gc/recorder.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ func newJobRecorder(db *gorm.DB) *jobRecorder {
5151
}
5252
}
5353

54-
func (jb *jobRecorder) Init(taskID string, args models.JSONMap) error {
54+
func (jb *jobRecorder) Init(userID uint, taskID string, args models.JSONMap) error {
5555
job := models.Job{
5656
Type: GCJobType,
5757
TaskID: taskID,
58+
UserID: userID,
5859
Args: args,
5960
}
6061

manager/handlers/job.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
117117
return
118118
}
119119

120+
ctx.JSON(http.StatusOK, job)
121+
case job.GCJob:
122+
var json types.CreateGCJobRequest
123+
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
124+
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
125+
return
126+
}
127+
128+
job, err := h.service.CreateGCJob(ctx.Request.Context(), json)
129+
if err != nil {
130+
ctx.Error(err) // nolint: errcheck
131+
return
132+
}
133+
120134
ctx.JSON(http.StatusOK, job)
121135
default:
122136
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})

manager/manager.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
"d7y.io/dragonfly/v2/manager/cache"
3535
"d7y.io/dragonfly/v2/manager/config"
3636
"d7y.io/dragonfly/v2/manager/database"
37-
"d7y.io/dragonfly/v2/manager/gc"
37+
managergc "d7y.io/dragonfly/v2/manager/gc"
3838
"d7y.io/dragonfly/v2/manager/job"
3939
"d7y.io/dragonfly/v2/manager/metrics"
4040
"d7y.io/dragonfly/v2/manager/permission/rbac"
@@ -167,20 +167,21 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
167167
}
168168

169169
// Initialize garbage collector.
170-
s.gc = pkggc.New()
171-
170+
gc := pkggc.New()
172171
// Register job gc task.
173-
if err := s.gc.Add(gc.NewJobGCTask(db.DB)); err != nil {
172+
if err := gc.Add(managergc.NewJobGCTask(db.DB)); err != nil {
174173
return nil, err
175174
}
176175

177176
// Register audit gc task.
178-
if err := s.gc.Add(gc.NewAuditGCTask(db.DB)); err != nil {
177+
if err := gc.Add(managergc.NewAuditGCTask(db.DB)); err != nil {
179178
return nil, err
180179
}
181180

181+
s.gc = gc
182+
182183
// Initialize REST server.
183-
restService := service.New(cfg, db, cache, job, enforcer, objectStorage)
184+
restService := service.New(cfg, db, cache, job, gc, enforcer, objectStorage)
184185
router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, s.jobRateLimiter, EmbedFolder(assets, assetsTargetPath))
185186
if err != nil {
186187
return nil, err

manager/service/job.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,80 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"time"
2324

2425
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
26+
"github.com/google/uuid"
27+
"gorm.io/gorm"
2528

2629
logger "d7y.io/dragonfly/v2/internal/dflog"
2730
internaljob "d7y.io/dragonfly/v2/internal/job"
2831
"d7y.io/dragonfly/v2/manager/metrics"
2932
"d7y.io/dragonfly/v2/manager/models"
3033
"d7y.io/dragonfly/v2/manager/types"
34+
pkggc "d7y.io/dragonfly/v2/pkg/gc"
3135
"d7y.io/dragonfly/v2/pkg/net/http"
3236
"d7y.io/dragonfly/v2/pkg/retry"
3337
"d7y.io/dragonfly/v2/pkg/slices"
3438
"d7y.io/dragonfly/v2/pkg/structure"
3539
)
3640

41+
const (
42+
// DefaultGCJobPollingTimeout is the default timeout for polling GC job.
43+
DefaultGCJobPollingTimeout = 30 * time.Minute
44+
45+
// DefaultGCJobPollingInterval is the default interval for polling GC job.
46+
DefaultGCJobPollingInterval = 5 * time.Second
47+
)
48+
49+
func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest) (*models.Job, error) {
50+
taskID := uuid.NewString()
51+
ctx = context.WithValue(ctx, pkggc.ContextKeyTaskID, taskID)
52+
ctx = context.WithValue(ctx, pkggc.ContextKeyUserID, json.UserID)
53+
54+
// This is a non-block function to run the gc task, which will run the task asynchronously in the backend.
55+
if err := s.gc.Run(ctx, json.Args.Type); err != nil {
56+
return nil, err
57+
}
58+
59+
return s.pollingGCJob(ctx, json.Type, json.UserID, taskID)
60+
}
61+
62+
func (s *service) pollingGCJob(ctx context.Context, jobType string, userID uint, taskID string) (*models.Job, error) {
63+
ctx, cancel := context.WithTimeout(ctx, DefaultGCJobPollingTimeout)
64+
defer cancel()
65+
66+
ticker := time.NewTicker(DefaultGCJobPollingInterval)
67+
defer ticker.Stop()
68+
69+
job := models.Job{}
70+
71+
for {
72+
select {
73+
case <-ctx.Done():
74+
return nil, fmt.Errorf("context done: %w", ctx.Err())
75+
76+
case <-ticker.C:
77+
if err := s.db.WithContext(ctx).First(&job, models.Job{
78+
Type: jobType,
79+
UserID: userID,
80+
TaskID: taskID,
81+
}).Error; err != nil {
82+
if errors.Is(err, gorm.ErrRecordNotFound) {
83+
continue
84+
}
85+
86+
return nil, err
87+
}
88+
89+
// Return the job if the job is in success or failure state, otherwise continue polling.
90+
if job.State == machineryv1tasks.StateSuccess || job.State == machineryv1tasks.StateFailure {
91+
return &job, nil
92+
}
93+
}
94+
}
95+
}
96+
3797
func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error {
3898
schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs)
3999
if err != nil {

manager/service/mocks/service_mock.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

manager/service/service.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"d7y.io/dragonfly/v2/manager/models"
3434
"d7y.io/dragonfly/v2/manager/permission/rbac"
3535
"d7y.io/dragonfly/v2/manager/types"
36+
pkggc "d7y.io/dragonfly/v2/pkg/gc"
3637
"d7y.io/dragonfly/v2/pkg/objectstorage"
3738
)
3839

@@ -118,6 +119,7 @@ type Service interface {
118119
CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error
119120
CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error)
120121
CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error)
122+
CreateGCJob(context.Context, types.CreateGCJobRequest) (*models.Job, error)
121123
DestroyJob(context.Context, uint) error
122124
UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error)
123125
GetJob(context.Context, uint) (*models.Job, error)
@@ -152,18 +154,20 @@ type service struct {
152154
rdb redis.UniversalClient
153155
cache *cache.Cache
154156
job *job.Job
157+
gc pkggc.GC
155158
enforcer *casbin.Enforcer
156159
objectStorage objectstorage.ObjectStorage
157160
}
158161

159162
// NewREST returns a new REST instance
160-
func New(cfg *config.Config, database *database.Database, cache *cache.Cache, job *job.Job, enforcer *casbin.Enforcer, objectStorage objectstorage.ObjectStorage) Service {
163+
func New(cfg *config.Config, database *database.Database, cache *cache.Cache, job *job.Job, gc pkggc.GC, enforcer *casbin.Enforcer, objectStorage objectstorage.ObjectStorage) Service {
161164
return &service{
162165
config: cfg,
163166
db: database.DB,
164167
rdb: database.RDB,
165168
cache: cache,
166169
job: job,
170+
gc: gc,
167171
enforcer: enforcer,
168172
objectStorage: objectStorage,
169173
}

manager/types/job.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,22 @@ type DeleteTaskArgs struct {
276276
// Otherwise, calculate the task ID based on url, piece_length, tag, application, and filtered_query_params.
277277
ContentForCalculatingTaskID *string `json:"content_for_calculating_task_id" binding:"omitempty"`
278278
}
279+
280+
type CreateGCJobRequest struct {
281+
// BIO is the description of the job.
282+
BIO string `json:"bio" binding:"omitempty"`
283+
284+
// Type is the type of the job.
285+
Type string `json:"type" binding:"required"`
286+
287+
// Args is the arguments of the gc.
288+
Args GCArgs `json:"args" binding:"required"`
289+
290+
// UserID is the user id of the job.
291+
UserID uint `json:"user_id" binding:"omitempty"`
292+
}
293+
294+
type GCArgs struct {
295+
// Type is the type of the job.
296+
Type string `json:"type" binding:"required,oneof=audit job"`
297+
}

0 commit comments

Comments
 (0)