From 64420a924fc962bc03ff8aad935c093ee68385e3 Mon Sep 17 00:00:00 2001 From: Asklv Date: Mon, 15 Jul 2024 19:59:15 +0800 Subject: [PATCH 1/9] feat: add delete task and list tasks manager api define. Signed-off-by: Asklv --- internal/job/constants.go | 6 ++++++ manager/handlers/job.go | 28 ++++++++++++++++++++++++++++ manager/service/service.go | 2 ++ manager/types/job.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 64 insertions(+) diff --git a/internal/job/constants.go b/internal/job/constants.go index d977f443638..c6303b375a6 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -29,6 +29,12 @@ const ( // SyncPeersJob is the name of syncing peers job. SyncPeersJob = "sync_peers" + + // ListTasksJob is the name of listing tasks job. + ListTasksJob = "list_tasks" + + // DeleteTasksJob is the name of deleting tasks job. + DeleteTaskJob = "delete_task" ) // Machinery server configuration. diff --git a/manager/handlers/job.go b/manager/handlers/job.go index f96ae39aaa7..e7937136173 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -59,6 +59,34 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + ctx.JSON(http.StatusOK, job) + case job.DeleteTaskJob: + var json types.CreateDeleteTaskJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + + ctx.JSON(http.StatusOK, job) + case job.ListTasksJob: + var json types.CreateListTasksJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + ctx.JSON(http.StatusOK, job) default: ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"}) diff --git a/manager/service/service.go b/manager/service/service.go index bde2c0c860c..dc413d0020d 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -114,6 +114,8 @@ type Service interface { GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error) CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error) + CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error) + CreateListTasksJob(context.Context, types.CreateListTasksJobRequest) (*models.Job, error) DestroyJob(context.Context, uint) error UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error) GetJob(context.Context, uint) (*models.Job, error) diff --git a/manager/types/job.go b/manager/types/job.go index 4d3a286ce33..450ac2a1ccf 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -57,6 +57,34 @@ type CreatePreheatJobRequest struct { SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } +type CreateDeleteTaskJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args DeleteTasksJobArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type DeleteTasksJobArgs struct { + TaskID string `json:"taskID" binding:"required"` +} + +type CreateListTasksJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args ListTasksJobArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type ListTasksJobArgs struct { + TaskID string `json:"taskID" binding:"required"` + Page int `json:"page" binding:"omitempty,gte=1"` + PerPage int `json:"count" binding:"omitempty,gte=1,lte=10000000"` +} + type PreheatArgs struct { // Type is the preheating type, support image and file. Type string `json:"type" binding:"required,oneof=image file"` From b3d4e1737a1d55ced89568c5661fe6a529d766fb Mon Sep 17 00:00:00 2001 From: Asklv Date: Fri, 19 Jul 2024 13:30:30 +0800 Subject: [PATCH 2/9] feat: add manager job service and service mock for delete task and list tasks. Signed-off-by: Asklv --- manager/config/config.go | 11 +++ manager/config/constant_otel.go | 12 ++- manager/config/constants.go | 3 + manager/job/job.go | 10 +- manager/job/manager_tasks.go | 127 ++++++++++++++++++++++++++ manager/service/job.go | 81 ++++++++++++++++ manager/service/mocks/service_mock.go | 30 ++++++ manager/types/job.go | 38 ++++---- 8 files changed, 287 insertions(+), 25 deletions(-) create mode 100644 manager/job/manager_tasks.go diff --git a/manager/config/config.go b/manager/config/config.go index 5c5e300d48b..29914a6b94c 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -296,6 +296,9 @@ type JobConfig struct { // Sync peers configuration. SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"` + + // Manager tasks configuration. + ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"` } type PreheatConfig struct { @@ -315,6 +318,11 @@ type SyncPeersConfig struct { Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` } +type ManagerTasksConfig struct { + // Timeout is the timeout for manager tasks information for the single scheduler. + Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` +} + type PreheatTLSClientConfig struct { // CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string. CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"` @@ -455,6 +463,9 @@ func New() *Config { Interval: DefaultJobSyncPeersInterval, Timeout: DefaultJobSyncPeersTimeout, }, + ManagerTasks: ManagerTasksConfig{ + Timeout: DefaultJobManagerTasksTimeout, + }, }, ObjectStorage: ObjectStorageConfig{ Enable: false, diff --git a/manager/config/constant_otel.go b/manager/config/constant_otel.go index 071c4f9bf92..07c34bbd4b2 100644 --- a/manager/config/constant_otel.go +++ b/manager/config/constant_otel.go @@ -19,9 +19,13 @@ package config import "go.opentelemetry.io/otel/attribute" const ( - AttributeID = attribute.Key("d7y.manager.id") - AttributePreheatType = attribute.Key("d7y.manager.preheat.type") - AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") + AttributeID = attribute.Key("d7y.manager.id") + AttributePreheatType = attribute.Key("d7y.manager.preheat.type") + AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") + AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id") + AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id") + AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page") + AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page") ) const ( @@ -29,4 +33,6 @@ const ( SpanSyncPeers = "sync-peers" SpanGetLayers = "get-layers" SpanAuthWithRegistry = "auth-with-registry" + SpanDeleteTask = "delete-task" + SpanListTasks = "list-tasks" ) diff --git a/manager/config/constants.go b/manager/config/constants.go index 75e6ad8e0a7..b73a6f337da 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -98,6 +98,9 @@ const ( // DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler. DefaultJobSyncPeersTimeout = 10 * time.Minute + + // DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks. + DefaultJobManagerTasksTimeout = 10 * time.Minute ) const ( diff --git a/manager/job/job.go b/manager/job/job.go index bac8c7a6e86..e939e50f0a9 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -40,6 +40,7 @@ type Job struct { *internaljob.Job Preheat SyncPeers + ManagerTasks } // New returns a new Job. @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { return nil, err } + managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout) + return &Job{ - Job: j, - Preheat: preheat, - SyncPeers: syncPeers, + Job: j, + Preheat: preheat, + SyncPeers: syncPeers, + ManagerTasks: managerTasks, }, nil } diff --git a/manager/job/manager_tasks.go b/manager/job/manager_tasks.go new file mode 100644 index 00000000000..44e45f5bc02 --- /dev/null +++ b/manager/job/manager_tasks.go @@ -0,0 +1,127 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//go:generate mockgen -destination mocks/delete_task_mock.go -source delete_task.go -package mocks +package job + +import ( + "context" + "fmt" + "time" + + logger "d7y.io/dragonfly/v2/internal/dflog" + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" +) + +// ManagerTask is an interface for delete and list tasks. +type ManagerTasks interface { + // CreateDeleteTask create a delete task job + CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTasksArgs) (*internaljob.GroupJobState, error) + // CreateListTasks create a list tasks job + CreateListTasks(context.Context, []models.Scheduler, types.ListTasksArgs) (*internaljob.GroupJobState, error) +} + +// managerTasks is an implementation of ManagerTasks. +type managerTasks struct { + job *internaljob.Job + registryTimeout time.Duration +} + +// newManagerTasks create a new ManagerTasks. +func newManagerTasks(job *internaljob.Job, registryTimeout time.Duration) ManagerTasks { + return &managerTasks{ + job: job, + registryTimeout: registryTimeout, + } +} + +// Create a delete task job. +func (m *managerTasks) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTasksArgs) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID)) + defer span.End() + + args, err := internaljob.MarshalRequest(json) + if err != nil { + logger.Errorf("delete task marshal request: %v, error: %v", args, err) + return nil, err + } + + // Initialize queues. + queues := getSchedulerQueues(schedulers) + return m.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues) +} + +// Create a list tasks job. +func (m *managerTasks) CreateListTasks(ctx context.Context, schedulers []models.Scheduler, json types.ListTasksArgs) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanListTasks, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeListTasksID.String(json.TaskID)) + span.SetAttributes(config.AttributeListTasksPage.Int(json.Page)) + span.SetAttributes(config.AttributeListTasksPerPage.Int(json.PerPage)) + defer span.End() + + args, err := internaljob.MarshalRequest(json) + if err != nil { + logger.Errorf("list tasks marshal request: %v, error: %v", args, err) + return nil, err + } + + // Initialize queues. + queues := getSchedulerQueues(schedulers) + return m.createGroupJob(ctx, internaljob.ListTasksJob, args, queues) +} + +// createGroupJob creates a group job. +func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { + var signatures []*machineryv1tasks.Signature + for _, queue := range queues { + signatures = append(signatures, &machineryv1tasks.Signature{ + UUID: fmt.Sprintf("task_%s", uuid.New().String()), + Name: name, + RoutingKey: queue.String(), + Args: args, + }) + } + + group, err := machineryv1tasks.NewGroup(signatures...) + if err != nil { + return nil, err + } + + var tasks []machineryv1tasks.Signature + for _, signature := range signatures { + tasks = append(tasks, *signature) + } + + logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) + if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err) + return nil, err + } + + return &internaljob.GroupJobState{ + GroupUUID: group.GroupUUID, + State: machineryv1tasks.StatePending, + CreatedAt: time.Now(), + }, nil +} diff --git a/manager/service/job.go b/manager/service/job.go index f2836a05fc9..3b4319babec 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -75,6 +75,87 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return &job, nil } +func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) { + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err + } + + groupJobState, err := s.job.CreateDeleteTask(ctx, candidateSchedulers, json.Args) + if err != nil { + return nil, err + } + + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: candidateSchedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil +} + +func (s *service) CreateListTasksJob(ctx context.Context, json types.CreateListTasksJobRequest) (*models.Job, error) { + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err + } + + groupJobState, err := s.job.CreateListTasks(ctx, candidateSchedulers, json.Args) + if err != nil { + return nil, err + } + + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: candidateSchedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil + +} + func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { var candidateSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 77eba9d9a72..81294cf9eab 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -169,6 +169,36 @@ func (m *MockService) CreateConfig(arg0 context.Context, arg1 types.CreateConfig return ret0, ret1 } +// CreateDeleteTaskJob mocks base method. +func (m *MockService) CreateDeleteTaskJob(arg0 context.Context, arg1 types.CreateDeleteTaskJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDeleteTaskJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDeleteTaskJob indicates an expected call of CreateDeleteTaskJob. +func (mr *MockServiceMockRecorder) CreateDeleteTaskJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTaskJob", reflect.TypeOf((*MockService)(nil).CreateDeleteTaskJob), arg0, arg1) +} + +// CreateListTasksJob mocks base method. +func (m *MockService) CreateListTasksJob(arg0 context.Context, arg1 types.CreateListTasksJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateListTasksJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateListTasksJob indicates an expected call of CreateListTasksJob. +func (mr *MockServiceMockRecorder) CreateListTasksJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateListTasksJob", reflect.TypeOf((*MockService)(nil).CreateListTasksJob), arg0, arg1) +} + // CreateConfig indicates an expected call of CreateConfig. func (mr *MockServiceMockRecorder) CreateConfig(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() diff --git a/manager/types/job.go b/manager/types/job.go index 450ac2a1ccf..81cdb90eba4 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -58,31 +58,31 @@ type CreatePreheatJobRequest struct { } type CreateDeleteTaskJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args DeleteTasksJobArgs `json:"args" binding:"omitempty"` - Result map[string]any `json:"result" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args DeleteTasksArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } -type DeleteTasksJobArgs struct { - TaskID string `json:"taskID" binding:"required"` +type DeleteTasksArgs struct { + TaskID string `json:"task_id" binding:"required"` } type CreateListTasksJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args ListTasksJobArgs `json:"args" binding:"omitempty"` - Result map[string]any `json:"result" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args ListTasksArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } -type ListTasksJobArgs struct { - TaskID string `json:"taskID" binding:"required"` +type ListTasksArgs struct { + TaskID string `json:"task_id" binding:"required"` Page int `json:"page" binding:"omitempty,gte=1"` - PerPage int `json:"count" binding:"omitempty,gte=1,lte=10000000"` + PerPage int `json:"per_page" binding:"omitempty,gte=1,lte=10000000"` } type PreheatArgs struct { @@ -96,10 +96,10 @@ type PreheatArgs struct { Tag string `json:"tag" binding:"omitempty"` // FilteredQueryParams is the filtered query params for preheating. - FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"` + FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"` // PieceLength is the piece length for preheating. - PieceLength uint32 `json:"pieceLength" binding:"omitempty"` + PieceLength uint32 `json:"piece_length" binding:"omitempty"` // Headers is the http headers for authentication. Headers map[string]string `json:"headers" binding:"omitempty"` From df2ae9c0ec9c6fea39551f4fc7fb8436cfef9d2a Mon Sep 17 00:00:00 2001 From: Asklv Date: Tue, 23 Jul 2024 19:51:57 +0800 Subject: [PATCH 3/9] chore: fix lint issue and generate the mocked job file. Signed-off-by: Asklv --- manager/job/manager_tasks.go | 13 +++-- manager/job/mocks/manager_tasks_mock.go | 72 +++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 5 deletions(-) create mode 100644 manager/job/mocks/manager_tasks_mock.go diff --git a/manager/job/manager_tasks.go b/manager/job/manager_tasks.go index 44e45f5bc02..3e5bb4868dd 100644 --- a/manager/job/manager_tasks.go +++ b/manager/job/manager_tasks.go @@ -1,5 +1,5 @@ /* - * Copyright 2020 The Dragonfly Authors + * Copyright 2024 The Dragonfly Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -//go:generate mockgen -destination mocks/delete_task_mock.go -source delete_task.go -package mocks + +//go:generate mockgen -destination mocks/manager_tasks_mock.go -source manager_tasks.go -package mocks + package job import ( @@ -21,14 +23,15 @@ import ( "fmt" "time" + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" + logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" - machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" - "github.com/google/uuid" - "go.opentelemetry.io/otel/trace" ) // ManagerTask is an interface for delete and list tasks. diff --git a/manager/job/mocks/manager_tasks_mock.go b/manager/job/mocks/manager_tasks_mock.go new file mode 100644 index 00000000000..1d9338422de --- /dev/null +++ b/manager/job/mocks/manager_tasks_mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: manager_tasks.go +// +// Generated by this command: +// +// mockgen -destination mocks/manager_tasks_mock.go -source manager_tasks.go -package mocks +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + job "d7y.io/dragonfly/v2/internal/job" + models "d7y.io/dragonfly/v2/manager/models" + types "d7y.io/dragonfly/v2/manager/types" + gomock "go.uber.org/mock/gomock" +) + +// MockManagerTasks is a mock of ManagerTasks interface. +type MockManagerTasks struct { + ctrl *gomock.Controller + recorder *MockManagerTasksMockRecorder +} + +// MockManagerTasksMockRecorder is the mock recorder for MockManagerTasks. +type MockManagerTasksMockRecorder struct { + mock *MockManagerTasks +} + +// NewMockManagerTasks creates a new mock instance. +func NewMockManagerTasks(ctrl *gomock.Controller) *MockManagerTasks { + mock := &MockManagerTasks{ctrl: ctrl} + mock.recorder = &MockManagerTasksMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockManagerTasks) EXPECT() *MockManagerTasksMockRecorder { + return m.recorder +} + +// CreateDeleteTask mocks base method. +func (m *MockManagerTasks) CreateDeleteTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.DeleteTasksArgs) (*job.GroupJobState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDeleteTask", arg0, arg1, arg2) + ret0, _ := ret[0].(*job.GroupJobState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDeleteTask indicates an expected call of CreateDeleteTask. +func (mr *MockManagerTasksMockRecorder) CreateDeleteTask(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTask", reflect.TypeOf((*MockManagerTasks)(nil).CreateDeleteTask), arg0, arg1, arg2) +} + +// CreateListTasks mocks base method. +func (m *MockManagerTasks) CreateListTasks(arg0 context.Context, arg1 []models.Scheduler, arg2 types.ListTasksArgs) (*job.GroupJobState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateListTasks", arg0, arg1, arg2) + ret0, _ := ret[0].(*job.GroupJobState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateListTasks indicates an expected call of CreateListTasks. +func (mr *MockManagerTasksMockRecorder) CreateListTasks(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateListTasks", reflect.TypeOf((*MockManagerTasks)(nil).CreateListTasks), arg0, arg1, arg2) +} From dcbb0f7691f54fbf11aafc9be18777bfcd0718dd Mon Sep 17 00:00:00 2001 From: Asklv Date: Sun, 4 Aug 2024 10:08:08 +0800 Subject: [PATCH 4/9] feat: add delete task job and list tasks job in scheduler job implementation. Signed-off-by: Asklv --- internal/job/types.go | 38 ++++++++++++++++++++- manager/handlers/job_test.go | 66 ++++++++++++++++++++++++++++++++++++ manager/job/manager_tasks.go | 2 +- 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/internal/job/types.go b/internal/job/types.go index cdbdb55a760..35d1d842241 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -16,11 +16,13 @@ package job +import "d7y.io/dragonfly/v2/scheduler/resource" + type PreheatRequest struct { URL string `json:"url" validate:"required,url"` Tag string `json:"tag" validate:"omitempty"` Digest string `json:"digest" validate:"omitempty"` - FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"` + FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"` Headers map[string]string `json:"headers" validate:"omitempty"` Application string `json:"application" validate:"omitempty"` Priority int32 `json:"priority" validate:"omitempty"` @@ -28,4 +30,38 @@ type PreheatRequest struct { } type PreheatResponse struct { + TaskID string `json:"taskID"` +} + +// ListTasksRequest defines the request parameters for listing tasks. +type ListTasksRequest struct { + TaskID string `json:"task_id" validate:"required"` + Page int `json:"page" validate:"required"` + PerPage int `json:"per_page" validate:"required"` +} + +// ListTasksResponse defines the response parameters for listing tasks. +type ListTasksResponse struct { + Peers []*resource.Peer `json:"peers"` + Page int `json:"page"` + Total int `json:"total"` +} + +// DeleteTaskRequest defines the request parameters for deleting task. +type DeleteTaskRequest struct { + TaskID string `json:"task_id" validate:"required"` +} + +// TaskInfo includes information about a task along with peer details and a description. +type TaskInfo struct { + Task *resource.Task `json:"task"` + Peer *resource.Peer `json:"peer"` + Desc string `json:"desc"` +} + +// DeleteTaskResponse represents the response after attempting to delete tasks, +// categorizing them into successfully and unsuccessfully deleted. +type DeleteTaskResponse struct { + SuccessTasks []*TaskInfo `json:"success_tasks"` + FailureTasks []*TaskInfo `json:"failure_tasks"` } diff --git a/manager/handlers/job_test.go b/manager/handlers/job_test.go index b3d02668c59..2a7f2f53771 100644 --- a/manager/handlers/job_test.go +++ b/manager/handlers/job_test.go @@ -39,6 +39,18 @@ var ( "user_id": 4, "bio": "bio" }` + mockListTasksJobReqBody = ` + { + "type": "list_tasks", + "user_id": 4, + "bio": "bio" + }` + mockDeleteTaskJobReqBody = ` + { + "type": "delete_task", + "user_id": 4, + "bio": "bio" + }` mockOtherJobReqBody = ` { "type": "others", @@ -50,6 +62,16 @@ var ( Type: "preheat", BIO: "bio", } + mockListTasksCreateJobRequest = types.CreateListTasksJobRequest{ + UserID: 4, + Type: "list_tasks", + BIO: "bio", + } + mockDeleteTaskCreateJobRequest = types.CreateDeleteTaskJobRequest{ + UserID: 4, + Type: "delete_task", + BIO: "bio", + } mockUpdateJobRequest = types.UpdateJobRequest{ UserID: 4, BIO: "bio", @@ -61,6 +83,20 @@ var ( BIO: "bio", TaskID: "2", } + mockListTasksJobModel = &models.Job{ + BaseModel: mockBaseModel, + UserID: 4, + Type: "list_tasks", + BIO: "bio", + TaskID: "2", + } + mockDeleteTaskJobModel = &models.Job{ + BaseModel: mockBaseModel, + UserID: 4, + Type: "delete_task", + BIO: "bio", + TaskID: "2", + } ) func mockJobRouter(h *Handlers) *gin.Engine { @@ -115,6 +151,36 @@ func TestHandlers_CreateJob(t *testing.T) { assert.Equal(mockPreheatJobModel, &job) }, }, + { + name: "success", + req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockListTasksJobReqBody)), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.CreateListTasksJob(gomock.Any(), gomock.Eq(mockListTasksCreateJobRequest)).Return(mockListTasksJobModel, nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + job := models.Job{} + err := json.Unmarshal(w.Body.Bytes(), &job) + assert.NoError(err) + assert.Equal(mockListTasksJobModel, &job) + }, + }, + { + name: "success", + req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockDeleteTaskCreateJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + job := models.Job{} + err := json.Unmarshal(w.Body.Bytes(), &job) + assert.NoError(err) + assert.Equal(mockDeleteTaskJobModel, &job) + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { diff --git a/manager/job/manager_tasks.go b/manager/job/manager_tasks.go index 3e5bb4868dd..ba7e0840b56 100644 --- a/manager/job/manager_tasks.go +++ b/manager/job/manager_tasks.go @@ -118,7 +118,7 @@ func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []m logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { - logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err) + logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err) return nil, err } From 16cb22478eea0a41fcb9270740d6639bb980200d Mon Sep 17 00:00:00 2001 From: Asklv Date: Sun, 4 Aug 2024 10:08:08 +0800 Subject: [PATCH 5/9] fix: update delete task job and list tasks job api args. Signed-off-by: Asklv --- internal/job/types.go | 20 ++++++++------------ manager/job/manager_tasks.go | 2 -- manager/types/job.go | 4 +--- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/internal/job/types.go b/internal/job/types.go index 35d1d842241..be1595d56a2 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -35,16 +35,12 @@ type PreheatResponse struct { // ListTasksRequest defines the request parameters for listing tasks. type ListTasksRequest struct { - TaskID string `json:"task_id" validate:"required"` - Page int `json:"page" validate:"required"` - PerPage int `json:"per_page" validate:"required"` + TaskID string `json:"task_id" validate:"required"` } // ListTasksResponse defines the response parameters for listing tasks. type ListTasksResponse struct { Peers []*resource.Peer `json:"peers"` - Page int `json:"page"` - Total int `json:"total"` } // DeleteTaskRequest defines the request parameters for deleting task. @@ -52,16 +48,16 @@ type DeleteTaskRequest struct { TaskID string `json:"task_id" validate:"required"` } -// TaskInfo includes information about a task along with peer details and a description. -type TaskInfo struct { - Task *resource.Task `json:"task"` - Peer *resource.Peer `json:"peer"` - Desc string `json:"desc"` +// Task includes information about a task along with peer details and a description. +type Task struct { + Task *resource.Task `json:"task"` + Peer *resource.Peer `json:"peer"` + Description string `json:"description"` } // DeleteTaskResponse represents the response after attempting to delete tasks, // categorizing them into successfully and unsuccessfully deleted. type DeleteTaskResponse struct { - SuccessTasks []*TaskInfo `json:"success_tasks"` - FailureTasks []*TaskInfo `json:"failure_tasks"` + SuccessTasks []*Task `json:"success_tasks"` + FailureTasks []*Task `json:"failure_tasks"` } diff --git a/manager/job/manager_tasks.go b/manager/job/manager_tasks.go index ba7e0840b56..b42a5ce1734 100644 --- a/manager/job/manager_tasks.go +++ b/manager/job/manager_tasks.go @@ -79,8 +79,6 @@ func (m *managerTasks) CreateListTasks(ctx context.Context, schedulers []models. var span trace.Span ctx, span = tracer.Start(ctx, config.SpanListTasks, trace.WithSpanKind(trace.SpanKindProducer)) span.SetAttributes(config.AttributeListTasksID.String(json.TaskID)) - span.SetAttributes(config.AttributeListTasksPage.Int(json.Page)) - span.SetAttributes(config.AttributeListTasksPerPage.Int(json.PerPage)) defer span.End() args, err := internaljob.MarshalRequest(json) diff --git a/manager/types/job.go b/manager/types/job.go index 81cdb90eba4..15e5265fd6a 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -80,9 +80,7 @@ type CreateListTasksJobRequest struct { } type ListTasksArgs struct { - TaskID string `json:"task_id" binding:"required"` - Page int `json:"page" binding:"omitempty,gte=1"` - PerPage int `json:"per_page" binding:"omitempty,gte=1,lte=10000000"` + TaskID string `json:"task_id" binding:"required"` } type PreheatArgs struct { From 16e73d6502dc3ed98e7370a742502503ad6aabb0 Mon Sep 17 00:00:00 2001 From: Asklv Date: Tue, 30 Jul 2024 13:29:44 +0800 Subject: [PATCH 6/9] feat: add delete task job and list tasks job in scheduler job implementation. Signed-off-by: Asklv --- scheduler/job/job.go | 125 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/scheduler/job/job.go b/scheduler/job/job.go index e497cfdbf0b..fe4e91b401f 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -46,6 +46,10 @@ import ( const ( // preheatTimeout is timeout of preheating. preheatTimeout = 20 * time.Minute + // listTasksTimeout is timeout of listing tasks. + listTasksTimeout = 10 * time.Minute + // deleteTaskTimeout is timeout of deleting task. + deleteTaskTimeout = 20 * time.Minute ) // Job is an interface for job. @@ -109,8 +113,10 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { } namedJobFuncs := map[string]any{ - internaljob.PreheatJob: t.preheat, - internaljob.SyncPeersJob: t.syncPeers, + internaljob.PreheatJob: t.preheat, + internaljob.SyncPeersJob: t.syncPeers, + internaljob.ListTasksJob: t.listTasks, + internaljob.DeleteTaskJob: t.deleteTask, } if err := localJob.RegisterJob(namedJobFuncs); err != nil { @@ -297,3 +303,118 @@ func (j *job) syncPeers() (string, error) { return internaljob.MarshalResponse(hosts) } + +// listTasks is a job to list tasks. +func (j *job) listTasks(ctx context.Context, data string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, listTasksTimeout) + defer cancel() + + req := &internaljob.ListTasksRequest{} + if err := internaljob.UnmarshalRequest(data, req); err != nil { + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data) + return "", err + } + + if err := validator.New().Struct(req); err != nil { + logger.Errorf("listTasks %s validate failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Get all peers by task id + peers, err := j.getPeers(req.TaskID) + if err != nil { + logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Return peers by page + listTaskResponse := &internaljob.ListTasksResponse{ + Total: len(peers), + Page: req.Page, + Peers: peers[req.Page*req.PerPage : (req.Page+1)*req.PerPage], + } + + return internaljob.MarshalResponse(listTaskResponse) +} + +// deleteTask is a job to delete task. +func (j *job) deleteTask(ctx context.Context, data string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout) + defer cancel() + + req := &internaljob.DeleteTaskRequest{} + if err := internaljob.UnmarshalRequest(data, req); err != nil { + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data) + return "", err + } + + if err := validator.New().Struct(req); err != nil { + logger.Errorf("deleteTask %s validate failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Get all peers by task id + peers, err := j.getPeers(req.TaskID) + if err != nil { + logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Delete task by task id and host id + successTasks := make([]*internaljob.TaskInfo, 0) + failureTasks := make([]*internaljob.TaskInfo, 0) + + for _, peer := range peers { + // hostID := peer.Host.ID + // get task info by task id + task, ok := j.resource.TaskManager().Load(req.TaskID) + if !ok { + logger.Errorf("task %s not found", req.TaskID) + failureTasks = append(failureTasks, &internaljob.TaskInfo{ + Task: nil, + Peer: peer, + Desc: "task not found", + }) + continue + } + + // TODO: change to scheduler delete task grpc function + // and add batch delete + + successTasks = append(successTasks, &internaljob.TaskInfo{ + Task: task, + Peer: peer, + Desc: "success", + }) + } + + deleteTaskResponse := &internaljob.DeleteTaskResponse{ + SuccessTasks: successTasks, + FailureTasks: failureTasks, + } + + return internaljob.MarshalResponse(deleteTaskResponse) +} + +// getPeers try to get peers by task id +func (j *job) getPeers(taskID string) ([]*resource.Peer, error) { + // get task info by task id + task, ok := j.resource.TaskManager().Load(taskID) + if !ok { + logger.Errorf("task %s not found", taskID) + return nil, fmt.Errorf("task %s not found", taskID) + } + + // get peer info by task info + peers := make([]*resource.Peer, 0) + for _, vertex := range task.DAG.GetVertices() { + peer := vertex.Value + if peer == nil { + continue + } + + peers = append(peers, peer) + } + + return peers, nil +} From a2120600281692d7193caec8bcfde79d2fec2ec9 Mon Sep 17 00:00:00 2001 From: Asklv Date: Sun, 4 Aug 2024 10:08:08 +0800 Subject: [PATCH 7/9] feat: add delete task job and list tasks job scheduler define and basic implement. Signed-off-by: Asklv --- scheduler/job/job.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/scheduler/job/job.go b/scheduler/job/job.go index fe4e91b401f..3d40527ea9e 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -306,6 +306,9 @@ func (j *job) syncPeers() (string, error) { // listTasks is a job to list tasks. func (j *job) listTasks(ctx context.Context, data string) (string, error) { + // TODO: + // 1. query all peers with task id + // 2. delete current task by task id and host id ctx, cancel := context.WithTimeout(ctx, listTasksTimeout) defer cancel() @@ -339,6 +342,8 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) { // deleteTask is a job to delete task. func (j *job) deleteTask(ctx context.Context, data string) (string, error) { + // TODO: + // 1. query all peers with task id ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout) defer cancel() @@ -380,6 +385,9 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { // TODO: change to scheduler delete task grpc function // and add batch delete + j.resource.SeedPeer().Client().DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ + TaskId: req.TaskID, + }) successTasks = append(successTasks, &internaljob.TaskInfo{ Task: task, From eae443a76e63c5ddb88c665bbbf15b3d0bc51720 Mon Sep 17 00:00:00 2001 From: Asklv Date: Thu, 8 Aug 2024 10:08:08 +0800 Subject: [PATCH 8/9] feat: update delete task job and list tasks with concurrency and filter valid tasks. Signed-off-by: Asklv --- scheduler/job/job.go | 134 +++++++++++++++++++++++++++++-------------- 1 file changed, 91 insertions(+), 43 deletions(-) diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 3d40527ea9e..84c9b410a3d 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -23,11 +23,18 @@ import ( "errors" "fmt" "io" + "math" "strings" + "sync" "time" "github.com/RichardKnop/machinery/v1" "github.com/go-playground/validator/v10" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,10 +53,14 @@ import ( const ( // preheatTimeout is timeout of preheating. preheatTimeout = 20 * time.Minute - // listTasksTimeout is timeout of listing tasks. - listTasksTimeout = 10 * time.Minute // deleteTaskTimeout is timeout of deleting task. deleteTaskTimeout = 20 * time.Minute + // deleteTaskConcurrency is the number of concurrent delete tasks. + deleteTaskConcurrency = 10 + // deleteTaskMaxRetries is the maximum number of retries for delete tasks. + deleteTaskMaxRetries = 3 + // deleteTaskBackoffWaitBetween is waiting for a fixed period of time between calls in backoff linear. + deleteTaskBackoffWaitBetween = 500 * time.Millisecond ) // Job is an interface for job. @@ -306,12 +317,6 @@ func (j *job) syncPeers() (string, error) { // listTasks is a job to list tasks. func (j *job) listTasks(ctx context.Context, data string) (string, error) { - // TODO: - // 1. query all peers with task id - // 2. delete current task by task id and host id - ctx, cancel := context.WithTimeout(ctx, listTasksTimeout) - defer cancel() - req := &internaljob.ListTasksRequest{} if err := internaljob.UnmarshalRequest(data, req); err != nil { logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data) @@ -324,17 +329,14 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) { } // Get all peers by task id - peers, err := j.getPeers(req.TaskID) + peers, err := j.getValidPeers(req.TaskID) if err != nil { logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) return "", err } - // Return peers by page listTaskResponse := &internaljob.ListTasksResponse{ - Total: len(peers), - Page: req.Page, - Peers: peers[req.Page*req.PerPage : (req.Page+1)*req.PerPage], + Peers: peers, } return internaljob.MarshalResponse(listTaskResponse) @@ -342,8 +344,6 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) { // deleteTask is a job to delete task. func (j *job) deleteTask(ctx context.Context, data string) (string, error) { - // TODO: - // 1. query all peers with task id ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout) defer cancel() @@ -359,43 +359,82 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { } // Get all peers by task id - peers, err := j.getPeers(req.TaskID) + peers, err := j.getValidPeers(req.TaskID) if err != nil { logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) return "", err } // Delete task by task id and host id - successTasks := make([]*internaljob.TaskInfo, 0) - failureTasks := make([]*internaljob.TaskInfo, 0) + successTasks := make([]*internaljob.Task, 0) + failureTasks := make([]*internaljob.Task, 0) + // Create a wait group to limit delete rpc concurrency + // and avoid too many rpc requests to the host. + wg := sync.WaitGroup{} + deleteTaskLimit := make(chan struct{}, deleteTaskConcurrency) for _, peer := range peers { - // hostID := peer.Host.ID - // get task info by task id - task, ok := j.resource.TaskManager().Load(req.TaskID) - if !ok { - logger.Errorf("task %s not found", req.TaskID) - failureTasks = append(failureTasks, &internaljob.TaskInfo{ - Task: nil, - Peer: peer, - Desc: "task not found", + wg.Add(1) + deleteTaskLimit <- struct{}{} + go func(peer *resource.Peer) { + defer func() { + wg.Done() + <-deleteTaskLimit + }() + + // Get dfdaemon client from host + target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) + conn, err := grpc.DialContext( + ctx, + target, + grpc.WithIdleTimeout(0), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt32), + grpc.MaxCallSendMsgSize(math.MaxInt32), + ), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(deleteTaskMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(deleteTaskBackoffWaitBetween)), + ), + )), + ) + if err != nil { + logger.Errorf("create grpc client to %s failed: %s", target, err.Error()) + failureTasks = append(failureTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: err.Error(), + }) + return + } + + dfdaemonUploadClient := dfdaemonv2.NewDfdaemonUploadClient(conn) + _, err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ + TaskId: req.TaskID, }) - continue - } + if err != nil { + logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error()) + failureTasks = append(failureTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: err.Error(), + }) + return + } - // TODO: change to scheduler delete task grpc function - // and add batch delete - j.resource.SeedPeer().Client().DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ - TaskId: req.TaskID, - }) - - successTasks = append(successTasks, &internaljob.TaskInfo{ - Task: task, - Peer: peer, - Desc: "success", - }) + successTasks = append(successTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target), + }) + }(peer) } + wg.Wait() + deleteTaskResponse := &internaljob.DeleteTaskResponse{ SuccessTasks: successTasks, FailureTasks: failureTasks, @@ -404,8 +443,8 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { return internaljob.MarshalResponse(deleteTaskResponse) } -// getPeers try to get peers by task id -func (j *job) getPeers(taskID string) ([]*resource.Peer, error) { +// getValidPeers try to get valid peers by task id +func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) { // get task info by task id task, ok := j.resource.TaskManager().Load(taskID) if !ok { @@ -424,5 +463,14 @@ func (j *job) getPeers(taskID string) ([]*resource.Peer, error) { peers = append(peers, peer) } - return peers, nil + // Choose finished peers as list tasks result + finishedPeers := make([]*resource.Peer, len(peers)) + for _, peer := range peers { + currentState := peer.FSM.Current() + if currentState == resource.PeerStateSucceeded || currentState == resource.PeerStateFailed { + finishedPeers = append(finishedPeers, peer) + } + } + + return finishedPeers, nil } From 915d8808f9b85cdd67f9329072f652583c6fce40 Mon Sep 17 00:00:00 2001 From: Asklv Date: Sat, 10 Aug 2024 21:24:34 +0800 Subject: [PATCH 9/9] fix: update getFinishedPeers function and remove concurrency feature. Signed-off-by: Asklv --- pkg/rpc/dfdaemon/client/client_v2.go | 35 ++++++++ scheduler/job/job.go | 129 +++++++-------------------- scheduler/resource/task.go | 19 ++++ 3 files changed, 87 insertions(+), 96 deletions(-) diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index f4c0c136f16..a92f1ea4eec 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -83,6 +83,41 @@ func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grp }, nil } +// GetV2ByAddr returns v2 version of the dfdaemon client by address. +func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) { + conn, err := grpc.DialContext( + ctx, + target, + append([]grpc.DialOption{ + grpc.WithIdleTimeout(0), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt32), + grpc.MaxCallSendMsgSize(math.MaxInt32), + ), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), + }, opts...)..., + ) + if err != nil { + return nil, err + } + + return &v2{ + DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn), + ClientConn: conn, + }, nil +} + // V2 is the interface for v2 version of the grpc client. type V2 interface { // SyncPieces syncs pieces from the other peers. diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 84c9b410a3d..82c56a5a6d7 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -23,18 +23,11 @@ import ( "errors" "fmt" "io" - "math" "strings" - "sync" "time" "github.com/RichardKnop/machinery/v1" "github.com/go-playground/validator/v10" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,6 +39,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/pkg/idgen" + dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -55,12 +49,6 @@ const ( preheatTimeout = 20 * time.Minute // deleteTaskTimeout is timeout of deleting task. deleteTaskTimeout = 20 * time.Minute - // deleteTaskConcurrency is the number of concurrent delete tasks. - deleteTaskConcurrency = 10 - // deleteTaskMaxRetries is the maximum number of retries for delete tasks. - deleteTaskMaxRetries = 3 - // deleteTaskBackoffWaitBetween is waiting for a fixed period of time between calls in backoff linear. - deleteTaskBackoffWaitBetween = 500 * time.Millisecond ) // Job is an interface for job. @@ -329,7 +317,7 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) { } // Get all peers by task id - peers, err := j.getValidPeers(req.TaskID) + peers, err := j.getFinishedPeers(req.TaskID) if err != nil { logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) return "", err @@ -359,7 +347,7 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { } // Get all peers by task id - peers, err := j.getValidPeers(req.TaskID) + peers, err := j.getFinishedPeers(req.TaskID) if err != nil { logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) return "", err @@ -369,71 +357,40 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { successTasks := make([]*internaljob.Task, 0) failureTasks := make([]*internaljob.Task, 0) - // Create a wait group to limit delete rpc concurrency + // TODO: Create a limiter to limit delete rpc concurrency // and avoid too many rpc requests to the host. - wg := sync.WaitGroup{} - deleteTaskLimit := make(chan struct{}, deleteTaskConcurrency) for _, peer := range peers { - wg.Add(1) - deleteTaskLimit <- struct{}{} - go func(peer *resource.Peer) { - defer func() { - wg.Done() - <-deleteTaskLimit - }() - - // Get dfdaemon client from host - target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) - conn, err := grpc.DialContext( - ctx, - target, - grpc.WithIdleTimeout(0), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt32), - grpc.MaxCallSendMsgSize(math.MaxInt32), - ), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - grpc_prometheus.UnaryClientInterceptor, - grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), - grpc_retry.UnaryClientInterceptor( - grpc_retry.WithMax(deleteTaskMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffLinear(deleteTaskBackoffWaitBetween)), - ), - )), - ) - if err != nil { - logger.Errorf("create grpc client to %s failed: %s", target, err.Error()) - failureTasks = append(failureTasks, &internaljob.Task{ - Task: peer.Task, - Peer: peer, - Description: err.Error(), - }) - return - } - - dfdaemonUploadClient := dfdaemonv2.NewDfdaemonUploadClient(conn) - _, err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ - TaskId: req.TaskID, + // Get dfdaemon client from host + target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) + dfdaemonUploadClient, err := dfdaemonclient.GetV2ByAddr(ctx, target) + if err != nil { + logger.Errorf("get dfdaemon client from %s failed: %s", target, err.Error()) + failureTasks = append(failureTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: err.Error(), }) - if err != nil { - logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error()) - failureTasks = append(failureTasks, &internaljob.Task{ - Task: peer.Task, - Peer: peer, - Description: err.Error(), - }) - return - } - - successTasks = append(successTasks, &internaljob.Task{ + continue + } + err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ + TaskId: req.TaskID, + }) + if err != nil { + logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error()) + failureTasks = append(failureTasks, &internaljob.Task{ Task: peer.Task, Peer: peer, - Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target), + Description: err.Error(), }) - }(peer) - } + continue + } - wg.Wait() + successTasks = append(successTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target), + }) + } deleteTaskResponse := &internaljob.DeleteTaskResponse{ SuccessTasks: successTasks, @@ -443,8 +400,8 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { return internaljob.MarshalResponse(deleteTaskResponse) } -// getValidPeers try to get valid peers by task id -func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) { +// getFinishedPeers try to get valid peers by task id +func (j *job) getFinishedPeers(taskID string) ([]*resource.Peer, error) { // get task info by task id task, ok := j.resource.TaskManager().Load(taskID) if !ok { @@ -452,25 +409,5 @@ func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) { return nil, fmt.Errorf("task %s not found", taskID) } - // get peer info by task info - peers := make([]*resource.Peer, 0) - for _, vertex := range task.DAG.GetVertices() { - peer := vertex.Value - if peer == nil { - continue - } - - peers = append(peers, peer) - } - - // Choose finished peers as list tasks result - finishedPeers := make([]*resource.Peer, len(peers)) - for _, peer := range peers { - currentState := peer.FSM.Current() - if currentState == resource.PeerStateSucceeded || currentState == resource.PeerStateFailed { - finishedPeers = append(finishedPeers, peer) - } - } - - return finishedPeers, nil + return task.LoadFinishedPeers(), nil } diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index b5a3ac864dd..a9d0caa4c8c 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -249,6 +249,25 @@ func (t *Task) LoadRandomPeers(n uint) []*Peer { return peers } +// LoadFinishedPeers return finished peers. +func (t *Task) LoadFinishedPeers() []*Peer { + // Choose finished peers + var finishedPeers []*Peer + for _, vertex := range t.DAG.GetVertices() { + peer := vertex.Value + if peer == nil { + continue + } + + currentState := peer.FSM.Current() + if currentState == PeerStateSucceeded || currentState == PeerStateFailed { + finishedPeers = append(finishedPeers, peer) + } + } + + return finishedPeers +} + // StorePeer set peer. func (t *Task) StorePeer(peer *Peer) { t.DAG.AddVertex(peer.ID, peer) // nolint: errcheck