Skip to content

Commit 5547307

Browse files
authored
feat: add delete task and list tasks manager api with request type and service type. (#3378)
Signed-off-by: Asklv <boironic@gmail.com>
1 parent 53ba603 commit 5547307

File tree

17 files changed

+674
-11
lines changed

17 files changed

+674
-11
lines changed

internal/job/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const (
2929

3030
// SyncPeersJob is the name of syncing peers job.
3131
SyncPeersJob = "sync_peers"
32+
33+
// ListTasksJob is the name of listing tasks job.
34+
ListTasksJob = "list_tasks"
35+
36+
// DeleteTasksJob is the name of deleting tasks job.
37+
DeleteTaskJob = "delete_task"
3238
)
3339

3440
// Machinery server configuration.

internal/job/types.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,48 @@
1616

1717
package job
1818

19+
import "d7y.io/dragonfly/v2/scheduler/resource"
20+
1921
type PreheatRequest struct {
2022
URL string `json:"url" validate:"required,url"`
2123
Tag string `json:"tag" validate:"omitempty"`
2224
Digest string `json:"digest" validate:"omitempty"`
23-
FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"`
25+
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
2426
Headers map[string]string `json:"headers" validate:"omitempty"`
2527
Application string `json:"application" validate:"omitempty"`
2628
Priority int32 `json:"priority" validate:"omitempty"`
2729
PieceLength uint32 `json:"pieceLength" validate:"omitempty"`
2830
}
2931

3032
type PreheatResponse struct {
33+
TaskID string `json:"taskID"`
34+
}
35+
36+
// ListTasksRequest defines the request parameters for listing tasks.
37+
type ListTasksRequest struct {
38+
TaskID string `json:"task_id" validate:"required"`
39+
}
40+
41+
// ListTasksResponse defines the response parameters for listing tasks.
42+
type ListTasksResponse struct {
43+
Peers []*resource.Peer `json:"peers"`
44+
}
45+
46+
// DeleteTaskRequest defines the request parameters for deleting task.
47+
type DeleteTaskRequest struct {
48+
TaskID string `json:"task_id" validate:"required"`
49+
}
50+
51+
// Task includes information about a task along with peer details and a description.
52+
type Task struct {
53+
Task *resource.Task `json:"task"`
54+
Peer *resource.Peer `json:"peer"`
55+
Description string `json:"description"`
56+
}
57+
58+
// DeleteTaskResponse represents the response after attempting to delete tasks,
59+
// categorizing them into successfully and unsuccessfully deleted.
60+
type DeleteTaskResponse struct {
61+
SuccessTasks []*Task `json:"success_tasks"`
62+
FailureTasks []*Task `json:"failure_tasks"`
3163
}

manager/config/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,9 @@ type JobConfig struct {
296296

297297
// Sync peers configuration.
298298
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`
299+
300+
// Manager tasks configuration.
301+
ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"`
299302
}
300303

301304
type PreheatConfig struct {
@@ -315,6 +318,11 @@ type SyncPeersConfig struct {
315318
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
316319
}
317320

321+
type ManagerTasksConfig struct {
322+
// Timeout is the timeout for manager tasks information for the single scheduler.
323+
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
324+
}
325+
318326
type PreheatTLSClientConfig struct {
319327
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
320328
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
@@ -455,6 +463,9 @@ func New() *Config {
455463
Interval: DefaultJobSyncPeersInterval,
456464
Timeout: DefaultJobSyncPeersTimeout,
457465
},
466+
ManagerTasks: ManagerTasksConfig{
467+
Timeout: DefaultJobManagerTasksTimeout,
468+
},
458469
},
459470
ObjectStorage: ObjectStorageConfig{
460471
Enable: false,

manager/config/constant_otel.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,20 @@ package config
1919
import "go.opentelemetry.io/otel/attribute"
2020

2121
const (
22-
AttributeID = attribute.Key("d7y.manager.id")
23-
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
24-
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
22+
AttributeID = attribute.Key("d7y.manager.id")
23+
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
24+
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
25+
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
26+
AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id")
27+
AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page")
28+
AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page")
2529
)
2630

2731
const (
2832
SpanPreheat = "preheat"
2933
SpanSyncPeers = "sync-peers"
3034
SpanGetLayers = "get-layers"
3135
SpanAuthWithRegistry = "auth-with-registry"
36+
SpanDeleteTask = "delete-task"
37+
SpanListTasks = "list-tasks"
3238
)

manager/config/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ const (
9898

9999
// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
100100
DefaultJobSyncPeersTimeout = 10 * time.Minute
101+
102+
// DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks.
103+
DefaultJobManagerTasksTimeout = 10 * time.Minute
101104
)
102105

103106
const (

manager/handlers/job.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,34 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
5959
return
6060
}
6161

62+
ctx.JSON(http.StatusOK, job)
63+
case job.DeleteTaskJob:
64+
var json types.CreateDeleteTaskJobRequest
65+
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
66+
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
67+
return
68+
}
69+
70+
job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
71+
if err != nil {
72+
ctx.Error(err) // nolint: errcheck
73+
return
74+
}
75+
76+
ctx.JSON(http.StatusOK, job)
77+
case job.ListTasksJob:
78+
var json types.CreateListTasksJobRequest
79+
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
80+
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
81+
return
82+
}
83+
84+
job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json)
85+
if err != nil {
86+
ctx.Error(err) // nolint: errcheck
87+
return
88+
}
89+
6290
ctx.JSON(http.StatusOK, job)
6391
default:
6492
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})

manager/handlers/job_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ var (
3939
"user_id": 4,
4040
"bio": "bio"
4141
}`
42+
mockListTasksJobReqBody = `
43+
{
44+
"type": "list_tasks",
45+
"user_id": 4,
46+
"bio": "bio"
47+
}`
48+
mockDeleteTaskJobReqBody = `
49+
{
50+
"type": "delete_task",
51+
"user_id": 4,
52+
"bio": "bio"
53+
}`
4254
mockOtherJobReqBody = `
4355
{
4456
"type": "others",
@@ -50,6 +62,16 @@ var (
5062
Type: "preheat",
5163
BIO: "bio",
5264
}
65+
mockListTasksCreateJobRequest = types.CreateListTasksJobRequest{
66+
UserID: 4,
67+
Type: "list_tasks",
68+
BIO: "bio",
69+
}
70+
mockDeleteTaskCreateJobRequest = types.CreateDeleteTaskJobRequest{
71+
UserID: 4,
72+
Type: "delete_task",
73+
BIO: "bio",
74+
}
5375
mockUpdateJobRequest = types.UpdateJobRequest{
5476
UserID: 4,
5577
BIO: "bio",
@@ -61,6 +83,20 @@ var (
6183
BIO: "bio",
6284
TaskID: "2",
6385
}
86+
mockListTasksJobModel = &models.Job{
87+
BaseModel: mockBaseModel,
88+
UserID: 4,
89+
Type: "list_tasks",
90+
BIO: "bio",
91+
TaskID: "2",
92+
}
93+
mockDeleteTaskJobModel = &models.Job{
94+
BaseModel: mockBaseModel,
95+
UserID: 4,
96+
Type: "delete_task",
97+
BIO: "bio",
98+
TaskID: "2",
99+
}
64100
)
65101

66102
func mockJobRouter(h *Handlers) *gin.Engine {
@@ -115,6 +151,36 @@ func TestHandlers_CreateJob(t *testing.T) {
115151
assert.Equal(mockPreheatJobModel, &job)
116152
},
117153
},
154+
{
155+
name: "success",
156+
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockListTasksJobReqBody)),
157+
mock: func(ms *mocks.MockServiceMockRecorder) {
158+
ms.CreateListTasksJob(gomock.Any(), gomock.Eq(mockListTasksCreateJobRequest)).Return(mockListTasksJobModel, nil).Times(1)
159+
},
160+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
161+
assert := assert.New(t)
162+
assert.Equal(http.StatusOK, w.Code)
163+
job := models.Job{}
164+
err := json.Unmarshal(w.Body.Bytes(), &job)
165+
assert.NoError(err)
166+
assert.Equal(mockListTasksJobModel, &job)
167+
},
168+
},
169+
{
170+
name: "success",
171+
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)),
172+
mock: func(ms *mocks.MockServiceMockRecorder) {
173+
ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockDeleteTaskCreateJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1)
174+
},
175+
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
176+
assert := assert.New(t)
177+
assert.Equal(http.StatusOK, w.Code)
178+
job := models.Job{}
179+
err := json.Unmarshal(w.Body.Bytes(), &job)
180+
assert.NoError(err)
181+
assert.Equal(mockDeleteTaskJobModel, &job)
182+
},
183+
},
118184
}
119185
for _, tc := range tests {
120186
t.Run(tc.name, func(t *testing.T) {

manager/job/job.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Job struct {
4040
*internaljob.Job
4141
Preheat
4242
SyncPeers
43+
ManagerTasks
4344
}
4445

4546
// New returns a new Job.
@@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
7475
return nil, err
7576
}
7677

78+
managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout)
79+
7780
return &Job{
78-
Job: j,
79-
Preheat: preheat,
80-
SyncPeers: syncPeers,
81+
Job: j,
82+
Preheat: preheat,
83+
SyncPeers: syncPeers,
84+
ManagerTasks: managerTasks,
8185
}, nil
8286
}
8387

0 commit comments

Comments
 (0)