Skip to content

Commit 2b2d3fd

Browse files
committed
fix: update getFinishedPeers function and remove concurrency feature.
Signed-off-by: Asklv <boironic@gmail.com>
1 parent eae443a commit 2b2d3fd

File tree

3 files changed

+87
-96
lines changed

3 files changed

+87
-96
lines changed

pkg/rpc/dfdaemon/client/client_v2.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,41 @@ func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grp
8383
}, nil
8484
}
8585

86+
// GetV2ByAddr returns v2 version of the dfdaemon client by address.
87+
func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
88+
conn, err := grpc.DialContext(
89+
ctx,
90+
target,
91+
append([]grpc.DialOption{
92+
grpc.WithIdleTimeout(0),
93+
grpc.WithDefaultCallOptions(
94+
grpc.MaxCallRecvMsgSize(math.MaxInt32),
95+
grpc.MaxCallSendMsgSize(math.MaxInt32),
96+
),
97+
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
98+
grpc_prometheus.UnaryClientInterceptor,
99+
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
100+
grpc_retry.UnaryClientInterceptor(
101+
grpc_retry.WithMax(maxRetries),
102+
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)),
103+
),
104+
)),
105+
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
106+
grpc_prometheus.StreamClientInterceptor,
107+
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
108+
)),
109+
}, opts...)...,
110+
)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
return &v2{
116+
DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn),
117+
ClientConn: conn,
118+
}, nil
119+
}
120+
86121
// V2 is the interface for v2 version of the grpc client.
87122
type V2 interface {
88123
// SyncPieces syncs pieces from the other peers.

scheduler/job/job.go

Lines changed: 33 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,19 @@ import (
2323
"errors"
2424
"fmt"
2525
"io"
26-
"math"
2726
"strings"
28-
"sync"
2927
"time"
3028

3129
"github.com/RichardKnop/machinery/v1"
3230
"github.com/go-playground/validator/v10"
33-
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
34-
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
35-
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
36-
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
37-
"google.golang.org/grpc"
3831
"google.golang.org/grpc/codes"
3932
"google.golang.org/grpc/status"
4033

4134
cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1"
4235
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
4336
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
4437
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
38+
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
4539

4640
logger "d7y.io/dragonfly/v2/internal/dflog"
4741
internaljob "d7y.io/dragonfly/v2/internal/job"
@@ -55,12 +49,6 @@ const (
5549
preheatTimeout = 20 * time.Minute
5650
// deleteTaskTimeout is timeout of deleting task.
5751
deleteTaskTimeout = 20 * time.Minute
58-
// deleteTaskConcurrency is the number of concurrent delete tasks.
59-
deleteTaskConcurrency = 10
60-
// deleteTaskMaxRetries is the maximum number of retries for delete tasks.
61-
deleteTaskMaxRetries = 3
62-
// deleteTaskBackoffWaitBetween is waiting for a fixed period of time between calls in backoff linear.
63-
deleteTaskBackoffWaitBetween = 500 * time.Millisecond
6452
)
6553

6654
// Job is an interface for job.
@@ -329,7 +317,7 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) {
329317
}
330318

331319
// Get all peers by task id
332-
peers, err := j.getValidPeers(req.TaskID)
320+
peers, err := j.getFinishedPeers(req.TaskID)
333321
if err != nil {
334322
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
335323
return "", err
@@ -359,7 +347,7 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
359347
}
360348

361349
// Get all peers by task id
362-
peers, err := j.getValidPeers(req.TaskID)
350+
peers, err := j.getFinishedPeers(req.TaskID)
363351
if err != nil {
364352
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
365353
return "", err
@@ -369,71 +357,40 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
369357
successTasks := make([]*internaljob.Task, 0)
370358
failureTasks := make([]*internaljob.Task, 0)
371359

372-
// Create a wait group to limit delete rpc concurrency
360+
// TODO: Create a limiter to limit delete rpc concurrency
373361
// and avoid too many rpc requests to the host.
374-
wg := sync.WaitGroup{}
375-
deleteTaskLimit := make(chan struct{}, deleteTaskConcurrency)
376362
for _, peer := range peers {
377-
wg.Add(1)
378-
deleteTaskLimit <- struct{}{}
379-
go func(peer *resource.Peer) {
380-
defer func() {
381-
wg.Done()
382-
<-deleteTaskLimit
383-
}()
384-
385-
// Get dfdaemon client from host
386-
target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
387-
conn, err := grpc.DialContext(
388-
ctx,
389-
target,
390-
grpc.WithIdleTimeout(0),
391-
grpc.WithDefaultCallOptions(
392-
grpc.MaxCallRecvMsgSize(math.MaxInt32),
393-
grpc.MaxCallSendMsgSize(math.MaxInt32),
394-
),
395-
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
396-
grpc_prometheus.UnaryClientInterceptor,
397-
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
398-
grpc_retry.UnaryClientInterceptor(
399-
grpc_retry.WithMax(deleteTaskMaxRetries),
400-
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(deleteTaskBackoffWaitBetween)),
401-
),
402-
)),
403-
)
404-
if err != nil {
405-
logger.Errorf("create grpc client to %s failed: %s", target, err.Error())
406-
failureTasks = append(failureTasks, &internaljob.Task{
407-
Task: peer.Task,
408-
Peer: peer,
409-
Description: err.Error(),
410-
})
411-
return
412-
}
413-
414-
dfdaemonUploadClient := dfdaemonv2.NewDfdaemonUploadClient(conn)
415-
_, err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{
416-
TaskId: req.TaskID,
363+
// Get dfdaemon client from host
364+
target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
365+
dfdaemonUploadClient, err := dfdaemonclient.GetV2ByAddr(ctx, target)
366+
if err != nil {
367+
logger.Errorf("get dfdaemon client from %s failed: %s", target, err.Error())
368+
failureTasks = append(failureTasks, &internaljob.Task{
369+
Task: peer.Task,
370+
Peer: peer,
371+
Description: err.Error(),
417372
})
418-
if err != nil {
419-
logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error())
420-
failureTasks = append(failureTasks, &internaljob.Task{
421-
Task: peer.Task,
422-
Peer: peer,
423-
Description: err.Error(),
424-
})
425-
return
426-
}
427-
428-
successTasks = append(successTasks, &internaljob.Task{
373+
continue
374+
}
375+
err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{
376+
TaskId: req.TaskID,
377+
})
378+
if err != nil {
379+
logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error())
380+
failureTasks = append(failureTasks, &internaljob.Task{
429381
Task: peer.Task,
430382
Peer: peer,
431-
Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target),
383+
Description: err.Error(),
432384
})
433-
}(peer)
434-
}
385+
continue
386+
}
435387

436-
wg.Wait()
388+
successTasks = append(successTasks, &internaljob.Task{
389+
Task: peer.Task,
390+
Peer: peer,
391+
Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target),
392+
})
393+
}
437394

438395
deleteTaskResponse := &internaljob.DeleteTaskResponse{
439396
SuccessTasks: successTasks,
@@ -443,34 +400,14 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
443400
return internaljob.MarshalResponse(deleteTaskResponse)
444401
}
445402

446-
// getValidPeers try to get valid peers by task id
447-
func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) {
403+
// getFinishedPeers try to get valid peers by task id
404+
func (j *job) getFinishedPeers(taskID string) ([]*resource.Peer, error) {
448405
// get task info by task id
449406
task, ok := j.resource.TaskManager().Load(taskID)
450407
if !ok {
451408
logger.Errorf("task %s not found", taskID)
452409
return nil, fmt.Errorf("task %s not found", taskID)
453410
}
454411

455-
// get peer info by task info
456-
peers := make([]*resource.Peer, 0)
457-
for _, vertex := range task.DAG.GetVertices() {
458-
peer := vertex.Value
459-
if peer == nil {
460-
continue
461-
}
462-
463-
peers = append(peers, peer)
464-
}
465-
466-
// Choose finished peers as list tasks result
467-
finishedPeers := make([]*resource.Peer, len(peers))
468-
for _, peer := range peers {
469-
currentState := peer.FSM.Current()
470-
if currentState == resource.PeerStateSucceeded || currentState == resource.PeerStateFailed {
471-
finishedPeers = append(finishedPeers, peer)
472-
}
473-
}
474-
475-
return finishedPeers, nil
412+
return task.LoadFinishedPeers(), nil
476413
}

scheduler/resource/task.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,25 @@ func (t *Task) LoadRandomPeers(n uint) []*Peer {
249249
return peers
250250
}
251251

252+
// LoadFinishedPeers return finished peers.
253+
func (t *Task) LoadFinishedPeers() []*Peer {
254+
// Choose finished peers
255+
var finishedPeers []*Peer
256+
for _, vertex := range t.DAG.GetVertices() {
257+
peer := vertex.Value
258+
if peer == nil {
259+
continue
260+
}
261+
262+
currentState := peer.FSM.Current()
263+
if currentState == PeerStateSucceeded || currentState == PeerStateFailed {
264+
finishedPeers = append(finishedPeers, peer)
265+
}
266+
}
267+
268+
return finishedPeers
269+
}
270+
252271
// StorePeer set peer.
253272
func (t *Task) StorePeer(peer *Peer) {
254273
t.DAG.AddVertex(peer.ID, peer) // nolint: errcheck

0 commit comments

Comments
 (0)