Skip to content

Commit eae443a

Browse files
committed
feat: update delete task job and list tasks with concurrency and filter valid tasks.
Signed-off-by: Asklv <boironic@gmail.com>
1 parent a212060 commit eae443a

File tree

1 file changed

+91
-43
lines changed

1 file changed

+91
-43
lines changed

scheduler/job/job.go

Lines changed: 91 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,18 @@ import (
2323
"errors"
2424
"fmt"
2525
"io"
26+
"math"
2627
"strings"
28+
"sync"
2729
"time"
2830

2931
"github.com/RichardKnop/machinery/v1"
3032
"github.com/go-playground/validator/v10"
33+
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
34+
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
35+
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
36+
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
37+
"google.golang.org/grpc"
3138
"google.golang.org/grpc/codes"
3239
"google.golang.org/grpc/status"
3340

@@ -46,10 +53,14 @@ import (
4653
const (
4754
// preheatTimeout is timeout of preheating.
4855
preheatTimeout = 20 * time.Minute
49-
// listTasksTimeout is timeout of listing tasks.
50-
listTasksTimeout = 10 * time.Minute
5156
// deleteTaskTimeout is timeout of deleting task.
5257
deleteTaskTimeout = 20 * time.Minute
58+
// deleteTaskConcurrency is the number of concurrent delete tasks.
59+
deleteTaskConcurrency = 10
60+
// deleteTaskMaxRetries is the maximum number of retries for delete tasks.
61+
deleteTaskMaxRetries = 3
62+
// deleteTaskBackoffWaitBetween is waiting for a fixed period of time between calls in backoff linear.
63+
deleteTaskBackoffWaitBetween = 500 * time.Millisecond
5364
)
5465

5566
// Job is an interface for job.
@@ -306,12 +317,6 @@ func (j *job) syncPeers() (string, error) {
306317

307318
// listTasks is a job to list tasks.
308319
func (j *job) listTasks(ctx context.Context, data string) (string, error) {
309-
// TODO:
310-
// 1. query all peers with task id
311-
// 2. delete current task by task id and host id
312-
ctx, cancel := context.WithTimeout(ctx, listTasksTimeout)
313-
defer cancel()
314-
315320
req := &internaljob.ListTasksRequest{}
316321
if err := internaljob.UnmarshalRequest(data, req); err != nil {
317322
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data)
@@ -324,26 +329,21 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) {
324329
}
325330

326331
// Get all peers by task id
327-
peers, err := j.getPeers(req.TaskID)
332+
peers, err := j.getValidPeers(req.TaskID)
328333
if err != nil {
329334
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
330335
return "", err
331336
}
332337

333-
// Return peers by page
334338
listTaskResponse := &internaljob.ListTasksResponse{
335-
Total: len(peers),
336-
Page: req.Page,
337-
Peers: peers[req.Page*req.PerPage : (req.Page+1)*req.PerPage],
339+
Peers: peers,
338340
}
339341

340342
return internaljob.MarshalResponse(listTaskResponse)
341343
}
342344

343345
// deleteTask is a job to delete task.
344346
func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
345-
// TODO:
346-
// 1. query all peers with task id
347347
ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout)
348348
defer cancel()
349349

@@ -359,43 +359,82 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
359359
}
360360

361361
// Get all peers by task id
362-
peers, err := j.getPeers(req.TaskID)
362+
peers, err := j.getValidPeers(req.TaskID)
363363
if err != nil {
364364
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
365365
return "", err
366366
}
367367

368368
// Delete task by task id and host id
369-
successTasks := make([]*internaljob.TaskInfo, 0)
370-
failureTasks := make([]*internaljob.TaskInfo, 0)
369+
successTasks := make([]*internaljob.Task, 0)
370+
failureTasks := make([]*internaljob.Task, 0)
371371

372+
// Create a wait group to limit delete rpc concurrency
373+
// and avoid too many rpc requests to the host.
374+
wg := sync.WaitGroup{}
375+
deleteTaskLimit := make(chan struct{}, deleteTaskConcurrency)
372376
for _, peer := range peers {
373-
// hostID := peer.Host.ID
374-
// get task info by task id
375-
task, ok := j.resource.TaskManager().Load(req.TaskID)
376-
if !ok {
377-
logger.Errorf("task %s not found", req.TaskID)
378-
failureTasks = append(failureTasks, &internaljob.TaskInfo{
379-
Task: nil,
380-
Peer: peer,
381-
Desc: "task not found",
377+
wg.Add(1)
378+
deleteTaskLimit <- struct{}{}
379+
go func(peer *resource.Peer) {
380+
defer func() {
381+
wg.Done()
382+
<-deleteTaskLimit
383+
}()
384+
385+
// Get dfdaemon client from host
386+
target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
387+
conn, err := grpc.DialContext(
388+
ctx,
389+
target,
390+
grpc.WithIdleTimeout(0),
391+
grpc.WithDefaultCallOptions(
392+
grpc.MaxCallRecvMsgSize(math.MaxInt32),
393+
grpc.MaxCallSendMsgSize(math.MaxInt32),
394+
),
395+
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
396+
grpc_prometheus.UnaryClientInterceptor,
397+
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
398+
grpc_retry.UnaryClientInterceptor(
399+
grpc_retry.WithMax(deleteTaskMaxRetries),
400+
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(deleteTaskBackoffWaitBetween)),
401+
),
402+
)),
403+
)
404+
if err != nil {
405+
logger.Errorf("create grpc client to %s failed: %s", target, err.Error())
406+
failureTasks = append(failureTasks, &internaljob.Task{
407+
Task: peer.Task,
408+
Peer: peer,
409+
Description: err.Error(),
410+
})
411+
return
412+
}
413+
414+
dfdaemonUploadClient := dfdaemonv2.NewDfdaemonUploadClient(conn)
415+
_, err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{
416+
TaskId: req.TaskID,
382417
})
383-
continue
384-
}
418+
if err != nil {
419+
logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error())
420+
failureTasks = append(failureTasks, &internaljob.Task{
421+
Task: peer.Task,
422+
Peer: peer,
423+
Description: err.Error(),
424+
})
425+
return
426+
}
385427

386-
// TODO: change to scheduler delete task grpc function
387-
// and add batch delete
388-
j.resource.SeedPeer().Client().DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{
389-
TaskId: req.TaskID,
390-
})
391-
392-
successTasks = append(successTasks, &internaljob.TaskInfo{
393-
Task: task,
394-
Peer: peer,
395-
Desc: "success",
396-
})
428+
successTasks = append(successTasks, &internaljob.Task{
429+
Task: peer.Task,
430+
Peer: peer,
431+
Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target),
432+
})
433+
}(peer)
397434
}
398435

436+
wg.Wait()
437+
399438
deleteTaskResponse := &internaljob.DeleteTaskResponse{
400439
SuccessTasks: successTasks,
401440
FailureTasks: failureTasks,
@@ -404,8 +443,8 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
404443
return internaljob.MarshalResponse(deleteTaskResponse)
405444
}
406445

407-
// getPeers try to get peers by task id
408-
func (j *job) getPeers(taskID string) ([]*resource.Peer, error) {
446+
// getValidPeers try to get valid peers by task id
447+
func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) {
409448
// get task info by task id
410449
task, ok := j.resource.TaskManager().Load(taskID)
411450
if !ok {
@@ -424,5 +463,14 @@ func (j *job) getPeers(taskID string) ([]*resource.Peer, error) {
424463
peers = append(peers, peer)
425464
}
426465

427-
return peers, nil
466+
// Choose finished peers as list tasks result
467+
finishedPeers := make([]*resource.Peer, len(peers))
468+
for _, peer := range peers {
469+
currentState := peer.FSM.Current()
470+
if currentState == resource.PeerStateSucceeded || currentState == resource.PeerStateFailed {
471+
finishedPeers = append(finishedPeers, peer)
472+
}
473+
}
474+
475+
return finishedPeers, nil
428476
}

0 commit comments

Comments
 (0)