Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.0.9
image-tag: v2.1.55-alpha
chart-name: manager
- module: scheduler
image: scheduler
image-tag: v2.0.9
image-tag: v2.1.55-alpha
chart-name: scheduler
- module: dfdaemon
image: dfdaemon
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.1.55
image-tag: v2.1.55-alpha
chart-name: manager
- module: scheduler
image: scheduler
image-tag: v2.1.55
image-tag: v2.1.55-alpha
chart-name: scheduler
- module: client
image: client
Expand Down
76 changes: 67 additions & 9 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,24 @@ func (t *Job) LaunchWorker(consumerTag string, concurrency int) error {
}

type GroupJobState struct {
GroupUUID string
State string
CreatedAt time.Time
UpdatedAt time.Time
JobStates []*machineryv1tasks.TaskState
GroupUUID string `json:"group_uuid"`
State string `json:"state"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
JobStates []jobState `json:"job_states"`
}

func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
type jobState struct {
TaskUUID string `json:"task_uuid"`
TaskName string `json:"task_name"`
State string `json:"state"`
Results []any `json:"results"`
Error string `json:"error"`
CreatedAt time.Time `json:"created_at"`
TTL int64 `json:"ttl"`
}

func (t *Job) GetGroupJobState(name string, groupID string) (*GroupJobState, error) {
taskStates, err := t.Server.GetBackend().GroupTaskStates(groupID, 0)
if err != nil {
return nil, err
Expand All @@ -123,6 +133,45 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
return nil, errors.New("empty group")
}

jobStates := make([]jobState, 0, len(taskStates))
for _, taskState := range taskStates {
var results []any
for _, result := range taskState.Results {
switch name {
case PreheatJob:
var resp PreheatResponse
if err := UnmarshalTaskResult(result.Value, &resp); err != nil {
return nil, err
}
results = append(results, resp)
case GetTaskJob:
var resp GetTaskResponse
if err := UnmarshalTaskResult(result.Value, &resp); err != nil {
return nil, err
}
results = append(results, resp)
case DeleteTaskJob:
var resp DeleteTaskResponse
if err := UnmarshalTaskResult(result.Value, &resp); err != nil {
return nil, err
}
results = append(results, resp)
default:
return nil, errors.New("unsupported unmarshal task result")
}
}

jobStates = append(jobStates, jobState{
TaskUUID: taskState.TaskUUID,
TaskName: taskState.TaskName,
State: taskState.State,
Results: results,
Error: taskState.Error,
CreatedAt: taskState.CreatedAt,
TTL: taskState.TTL,
})
}

for _, taskState := range taskStates {
if taskState.IsFailure() {
logger.WithGroupAndTaskID(groupID, taskState.TaskUUID).Errorf("task is failed: %#v", taskState)
Expand All @@ -131,7 +180,7 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
State: machineryv1tasks.StateFailure,
CreatedAt: taskState.CreatedAt,
UpdatedAt: time.Now(),
JobStates: taskStates,
JobStates: jobStates,
}, nil
}
}
Expand All @@ -144,7 +193,7 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
State: machineryv1tasks.StatePending,
CreatedAt: taskState.CreatedAt,
UpdatedAt: time.Now(),
JobStates: taskStates,
JobStates: jobStates,
}, nil
}
}
Expand All @@ -154,7 +203,7 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
State: machineryv1tasks.StateSuccess,
CreatedAt: taskStates[0].CreatedAt,
UpdatedAt: time.Now(),
JobStates: taskStates,
JobStates: jobStates,
}, nil
}

Expand All @@ -179,6 +228,15 @@ func MarshalRequest(v any) ([]machineryv1tasks.Arg, error) {
}}, nil
}

func UnmarshalTaskResult(data any, v any) error {
s, ok := data.(string)
if !ok {
return errors.New("invalid task result")
}

return json.Unmarshal([]byte(s), v)
}

func UnmarshalResponse(data []reflect.Value, v any) error {
if len(data) == 0 {
return errors.New("empty data is not specified")
Expand Down
31 changes: 28 additions & 3 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package job

import "d7y.io/dragonfly/v2/scheduler/resource"
import (
"time"

"d7y.io/dragonfly/v2/scheduler/resource"
)

// PreheatRequest defines the request parameters for preheating.
type PreheatRequest struct {
Expand All @@ -27,11 +31,31 @@ type PreheatRequest struct {
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
Scope string `json:"scope" validate:"omitempty"`
ConcurrentCount int64 `json:"concurrent_count" validate:"omitempty"`
Timeout time.Duration `json:"timeout" validate:"omitempty"`
}

// PreheatResponse defines the response parameters for preheating.
type PreheatResponse struct {
TaskID string `json:"task_id"`
SuccessTasks []*PreheatSuccessTask `json:"success_tasks"`
FailureTasks []*PreheatFailureTask `json:"failure_tasks"`
SchedulerClusterID uint `json:"scheduler_cluster_id"`
}

// PreheatSuccessTask defines the response parameters for preheating successfully.
type PreheatSuccessTask struct {
URL string `json:"url"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
}

// PreheatFailureTask defines the response parameters for preheating failed.
type PreheatFailureTask struct {
URL string `json:"url"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
Description string `json:"description"`
}

// GetTaskRequest defines the request parameters for getting task.
Expand All @@ -46,7 +70,8 @@ type GetTaskResponse struct {

// DeleteTaskRequest defines the request parameters for deleting task.
type DeleteTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
TaskID string `json:"task_id" validate:"required"`
Timeout time.Duration `json:"timeout" validate:"omitempty"`
}

// DeleteTaskResponse defines the response parameters for deleting task.
Expand Down
4 changes: 2 additions & 2 deletions manager/handlers/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var (
URL: "http://example.com/foo",
BIO: "bio",
UserID: 4,
Priority: models.JSONMap{"value": 20, "urls": []interface{}{map[string]interface{}{"regex": "regex", "value": 20}}},
Priority: models.JSONMap{"value": 20, "urls": []any{map[string]any{"regex": "regex", "value": 20}}},
}
mockUnmarshalApplicationModel = models.Application{
BaseModel: mockBaseModel,
Expand All @@ -103,7 +103,7 @@ var (
BIO: "bio",
UserID: 4,
// when w.Body.Bytes() is unmarshal to models.Application, the value of Priority will be float64
Priority: models.JSONMap{"value": float64(20), "urls": []interface{}{map[string]interface{}{"regex": "regex", "value": float64(20)}}},
Priority: models.JSONMap{"value": float64(20), "urls": []any{map[string]any{"regex": "regex", "value": float64(20)}}},
}
)

Expand Down
6 changes: 6 additions & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
Scope: json.Scope,
ConcurrentCount: json.ConcurrentCount,
Timeout: json.Timeout,
},
}
default:
Expand Down Expand Up @@ -323,6 +326,9 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
Scope: args.Scope,
ConcurrentCount: args.ConcurrentCount,
Timeout: args.Timeout,
}

layers = append(layers, layer)
Expand Down
50 changes: 32 additions & 18 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/retry"
Expand All @@ -32,6 +33,23 @@ import (
)

func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
if json.Args.Scope == "" {
json.Args.Scope = types.SinglePeerScope
}

if json.Args.ConcurrentCount == 0 {
json.Args.ConcurrentCount = types.DefaultPreheatConcurrentCount
}

if json.Args.Timeout == 0 {
json.Args.Timeout = types.DefaultJobTimeout
}

args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}

candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs)
if err != nil {
return nil, err
Expand All @@ -47,11 +65,6 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster)
}

args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}

job := models.Job{
TaskID: groupJobState.GroupUUID,
BIO: json.BIO,
Expand All @@ -66,12 +79,20 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
return nil, err
}

go s.pollingJob(context.Background(), job.ID, job.TaskID)

go s.pollingJob(context.Background(), internaljob.PreheatJob, job.ID, job.TaskID)
return &job, nil
}

func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) {
if json.Args.Timeout == 0 {
json.Args.Timeout = types.DefaultJobTimeout
}

args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}

candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs)
if err != nil {
return nil, err
Expand All @@ -87,11 +108,6 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele
candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster)
}

args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}

job := models.Job{
TaskID: groupJobState.GroupUUID,
BIO: json.BIO,
Expand All @@ -106,8 +122,7 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele
return nil, err
}

go s.pollingJob(context.Background(), job.ID, job.TaskID)

go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID)
return &job, nil
}

Expand Down Expand Up @@ -146,8 +161,7 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask
return nil, err
}

go s.pollingJob(context.Background(), job.ID, job.TaskID)

go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID)
return &job, nil
}

Expand Down Expand Up @@ -210,13 +224,13 @@ func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterI
return candidateSchedulers, nil
}

func (s *service) pollingJob(ctx context.Context, id uint, groupID string) {
func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID string) {
var (
job models.Job
log = logger.WithGroupAndJobID(groupID, fmt.Sprint(id))
)
if _, _, err := retry.Run(ctx, 5, 10, 480, func() (any, bool, error) {
groupJob, err := s.job.GetGroupJobState(groupID)
groupJob, err := s.job.GetGroupJobState(name, groupID)
if err != nil {
log.Errorf("polling group failed: %s", err.Error())
return nil, false, err
Expand Down
Loading
Loading