Skip to content

Commit a092c3d

Browse files
committed
refactor: refacts the seed peer selection logic from grpc resolver to self pick
Signed-off-by: chlins <chlins.zhang@gmail.com>
1 parent a7efec4 commit a092c3d

File tree

11 files changed

+170
-981
lines changed

11 files changed

+170
-981
lines changed

scheduler/job/job.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ import (
4040
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
4141
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
4242
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
43-
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
4443

4544
logger "d7y.io/dragonfly/v2/internal/dflog"
4645
internaljob "d7y.io/dragonfly/v2/internal/job"
4746
managertypes "d7y.io/dragonfly/v2/manager/types"
47+
"d7y.io/dragonfly/v2/pkg/dfnet"
4848
"d7y.io/dragonfly/v2/pkg/idgen"
49+
cndsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
4950
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
5051
"d7y.io/dragonfly/v2/scheduler/config"
5152
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
@@ -63,10 +64,11 @@ type job struct {
6364
localJob *internaljob.Job
6465
resource resource.Resource
6566
config *config.Config
67+
dialOptions []grpc.DialOption
6668
}
6769

6870
// New creates a new Job.
69-
func New(cfg *config.Config, resource resource.Resource) (Job, error) {
71+
func New(cfg *config.Config, resource resource.Resource, dialOptions ...grpc.DialOption) (Job, error) {
7072
redisConfig := &internaljob.Config{
7173
Addrs: cfg.Database.Redis.Addrs,
7274
MasterName: cfg.Database.Redis.MasterName,
@@ -111,6 +113,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
111113
localJob: localJob,
112114
resource: resource,
113115
config: cfg,
116+
dialOptions: dialOptions,
114117
}
115118

116119
namedJobFuncs := map[string]any{
@@ -230,8 +233,8 @@ func (j *job) preheatSingleSeedPeer(ctx context.Context, req *internaljob.Prehea
230233
}
231234

232235
// If scheduler has no available seed peer, return error.
233-
if len(j.resource.SeedPeer().Client().Addrs()) == 0 {
234-
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
236+
if _, err := j.resource.SeedPeer().SelectSeedPeer(ctx, req.TaskUUID); err != nil {
237+
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer: %w", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP, err)
235238
}
236239

237240
// Preheat by v2 grpc protocol. If seed peer does not support
@@ -262,8 +265,20 @@ func (j *job) preheatV1SingleSeedPeer(ctx context.Context, req *internaljob.Preh
262265
Priority: commonv1.Priority(req.Priority),
263266
}
264267

268+
selectedSeedPeer, err := j.resource.SeedPeer().SelectSeedPeer(ctx, taskID)
269+
if err != nil {
270+
return nil, err
271+
}
272+
273+
// TODO: reuse the client if we encounter the performance issue in future.
274+
client, err := cndsystemclient.GetClientByAddr(ctx, dfnet.NetAddr{Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", selectedSeedPeer.IP, selectedSeedPeer.Port)}, j.dialOptions...)
275+
if err != nil {
276+
return nil, err
277+
}
278+
defer client.Close()
279+
265280
// Trigger seed peer download seeds.
266-
stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
281+
stream, err := client.ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
267282
TaskId: taskID,
268283
Url: req.URL,
269284
UrlMeta: urlMeta,
@@ -330,7 +345,20 @@ func (j *job) preheatV2SingleSeedPeerByURL(ctx context.Context, url string, req
330345
taskID := idgen.TaskIDV2ByURLBased(url, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
331346
filteredQueryParams := strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)
332347
advertiseIP := j.config.Server.AdvertiseIP.String()
333-
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
348+
349+
selectedSeedPeer, err := j.resource.SeedPeer().SelectSeedPeer(ctx, taskID)
350+
if err != nil {
351+
return nil, err
352+
}
353+
354+
// TODO: reuse the client if we encounter the performance issue in future.
355+
client, err := dfdaemonclient.GetV2ByAddr(ctx, fmt.Sprintf("%s:%d", selectedSeedPeer.IP, selectedSeedPeer.Port), j.dialOptions...)
356+
if err != nil {
357+
return nil, err
358+
}
359+
defer client.Close()
360+
361+
stream, err := client.DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
334362
Download: &commonv2.Download{
335363
Url: url,
336364
PieceLength: req.PieceLength,
@@ -398,7 +426,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
398426
for _, seedPeer := range seedPeers {
399427
var (
400428
hostname = seedPeer.Hostname
401-
ip = seedPeer.Ip
429+
ip = seedPeer.IP
402430
port = seedPeer.Port
403431
)
404432

@@ -542,20 +570,20 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
542570
// 2. Count: If count is provided, selects up to the specified number of seed peers. If count exceeds the number of available seed peers, all seed peers are selected.
543571
// 3. Percentage: If percentage is provided, selects a proportional number of seed peers (rounded down). Ensures at least one seed peer is selected if percentage > 0.
544572
// Priority: IPs > Count > Percentage
545-
func (j *job) selectSeedPeers(ips []string, count *uint32, percentage *uint8, log *logger.SugaredLoggerOnWith) ([]*managerv2.SeedPeer, error) {
573+
func (j *job) selectSeedPeers(ips []string, count *uint32, percentage *uint8, log *logger.SugaredLoggerOnWith) ([]*resource.Host, error) {
546574
if !j.config.SeedPeer.Enable {
547575
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
548576
}
549577

550-
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
578+
seedPeers := j.resource.HostManager().LoadAllNonNormals()
551579
if len(seedPeers) == 0 {
552580
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
553581
}
554582

555583
if len(ips) > 0 {
556-
selectedSeedPeers := make([]*managerv2.SeedPeer, 0, len(ips))
584+
selectedSeedPeers := make([]*resource.Host, 0, len(ips))
557585
for _, seedPeer := range seedPeers {
558-
if slices.Contains(ips, seedPeer.Ip) {
586+
if slices.Contains(ips, seedPeer.IP) {
559587
selectedSeedPeers = append(selectedSeedPeers, seedPeer)
560588
continue
561589
}

scheduler/resource/standard/host_manager.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ type HostManager interface {
6363
// LoadAllNormals loads all normal hosts through the Range of sync.Map.
6464
LoadAllNormals() []*Host
6565

66+
// LoadAllNonNormals loads all non-normal hosts through the Range of sync.Map.
67+
LoadAllNonNormals() []*Host
68+
6669
// Try to reclaim host.
6770
RunGC(context.Context) error
6871
}
@@ -163,6 +166,27 @@ func (h *hostManager) LoadAllNormals() []*Host {
163166
return hosts
164167
}
165168

169+
// LoadAllNonNormals loads all non-normal hosts through the Range of sync.Map.
170+
func (h *hostManager) LoadAllNonNormals() []*Host {
171+
hosts := make([]*Host, 0)
172+
h.Map.Range(func(key, value any) bool {
173+
host, ok := value.(*Host)
174+
if !ok {
175+
host.Log.Error("invalid host")
176+
return true
177+
}
178+
179+
if host.Type == types.HostTypeNormal {
180+
return true
181+
}
182+
183+
hosts = append(hosts, host)
184+
return true
185+
})
186+
187+
return hosts
188+
}
189+
166190
// LoadRandom loads host randomly through the Range of sync.Map.
167191
func (h *hostManager) LoadRandom(n int, blocklist set.SafeSet[string]) []*Host {
168192
hosts := make([]*Host, 0, n)

scheduler/resource/standard/host_manager_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/resource/standard/resource.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,7 @@ func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, tran
9191
// Initialize seed peer interface.
9292
if cfg.SeedPeer.Enable {
9393
dialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithTransportCredentials(transportCredentials)}
94-
client, err := newSeedPeerClient(dynconfig, hostManager, dialOptions...)
95-
if err != nil {
96-
return nil, err
97-
}
98-
99-
resource.seedPeer = newSeedPeer(client, peerManager, hostManager)
94+
resource.seedPeer = newSeedPeer(peerManager, hostManager, dialOptions...)
10095
}
10196

10297
return resource, nil

scheduler/resource/standard/seed_peer.go

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,22 @@ import (
2626
"time"
2727

2828
"go.opentelemetry.io/otel/trace"
29+
"google.golang.org/grpc"
30+
"stathat.com/c/consistent"
2931

3032
cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1"
3133
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
3234
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
3335
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
3436
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
3537

38+
"d7y.io/dragonfly/v2/pkg/dfnet"
3639
"d7y.io/dragonfly/v2/pkg/digest"
3740
"d7y.io/dragonfly/v2/pkg/idgen"
3841
"d7y.io/dragonfly/v2/pkg/net/http"
42+
cndsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
3943
"d7y.io/dragonfly/v2/pkg/rpc/common"
44+
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
4045
"d7y.io/dragonfly/v2/scheduler/metrics"
4146
)
4247

@@ -55,31 +60,31 @@ type SeedPeer interface {
5560
// Used only in v1 version of the grpc.
5661
TriggerTask(context.Context, *http.Range, *Task) (*Peer, *schedulerv1.PeerResult, error)
5762

58-
// Client returns grpc client of seed peer.
59-
Client() SeedPeerClient
63+
// SelectSeedPeer selects a seed peer by the task id.
64+
SelectSeedPeer(context.Context, string) (*Host, error)
6065

6166
// Stop seed peer service.
6267
Stop() error
6368
}
6469

6570
// seedPeer contains content for seed peer.
6671
type seedPeer struct {
67-
// client is the dynamic client of seed peer.
68-
client SeedPeerClient
69-
7072
// peerManager is PeerManager interface.
7173
peerManager PeerManager
7274

7375
// hostManager is HostManager interface.
7476
hostManager HostManager
77+
78+
// dialOpts is the options for grpc dial.
79+
dialOptions []grpc.DialOption
7580
}
7681

7782
// New SeedPeer interface.
78-
func newSeedPeer(client SeedPeerClient, peerManager PeerManager, hostManager HostManager) SeedPeer {
83+
func newSeedPeer(peerManager PeerManager, hostManager HostManager, dialOptions ...grpc.DialOption) SeedPeer {
7984
return &seedPeer{
80-
client: client,
8185
peerManager: peerManager,
8286
hostManager: hostManager,
87+
dialOptions: dialOptions,
8388
}
8489
}
8590

@@ -89,7 +94,19 @@ func (s *seedPeer) TriggerDownloadTask(ctx context.Context, taskID string, req *
8994
ctx, cancel := context.WithCancel(trace.ContextWithSpan(ctx, trace.SpanFromContext(ctx)))
9095
defer cancel()
9196

92-
stream, err := s.client.DownloadTask(ctx, taskID, req)
97+
selectedSeedPeer, err := s.SelectSeedPeer(ctx, taskID)
98+
if err != nil {
99+
return err
100+
}
101+
102+
// TODO: reuse the client if we encounter the performance issue in future.
103+
client, err := dfdaemonclient.GetV2ByAddr(ctx, fmt.Sprintf("%s:%d", selectedSeedPeer.IP, selectedSeedPeer.Port), s.dialOptions...)
104+
if err != nil {
105+
return err
106+
}
107+
defer client.Close()
108+
109+
stream, err := client.DownloadTask(ctx, taskID, req)
93110
if err != nil {
94111
return err
95112
}
@@ -126,7 +143,19 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
126143
urlMeta.Range = rg.URLMetaString()
127144
}
128145

129-
stream, err := s.client.ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
146+
selectedSeedPeer, err := s.SelectSeedPeer(ctx, task.ID)
147+
if err != nil {
148+
return nil, nil, err
149+
}
150+
151+
// TODO: reuse the client if we encounter the performance issue in future.
152+
client, err := cndsystemclient.GetClientByAddr(ctx, dfnet.NetAddr{Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", selectedSeedPeer.IP, selectedSeedPeer.Port)}, s.dialOptions...)
153+
if err != nil {
154+
return nil, nil, err
155+
}
156+
defer client.Close()
157+
158+
stream, err := client.ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
130159
TaskId: task.ID,
131160
Url: task.URL,
132161
UrlMeta: urlMeta,
@@ -222,6 +251,32 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
222251
}
223252
}
224253

254+
// SelectSeedPeer selects a seed peer by the task id.
255+
func (s *seedPeer) SelectSeedPeer(ctx context.Context, taskID string) (*Host, error) {
256+
// Currently non normal host is seed peer.
257+
seedPeers := s.hostManager.LoadAllNonNormals()
258+
if len(seedPeers) == 0 {
259+
return nil, fmt.Errorf("no seed peer found in host manager")
260+
}
261+
262+
// Build the hash ring by adding all the seed peers.
263+
hashring := consistent.New()
264+
hosts := make(map[string]*Host, len(seedPeers))
265+
for _, host := range seedPeers {
266+
hashKey := fmt.Sprintf("%s:%d:%s", host.IP, host.Port, host.Hostname)
267+
hashring.Add(hashKey)
268+
hosts[hashKey] = host
269+
}
270+
271+
// Pick one seed peer from the hash ring by the task id.
272+
hashKey, err := hashring.Get(taskID)
273+
if err != nil {
274+
return nil, fmt.Errorf("failed to pick the seed peer: %w", err)
275+
}
276+
277+
return hosts[hashKey], nil
278+
}
279+
225280
// Initialize seed peer.
226281
func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task, hostID string, peerID string) (*Peer, error) {
227282
// Load host from manager.
@@ -256,12 +311,7 @@ func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task,
256311
return peer, nil
257312
}
258313

259-
// Client is seed peer grpc client.
260-
func (s *seedPeer) Client() SeedPeerClient {
261-
return s.client
262-
}
263-
264314
// Stop seed peer service.
265315
func (s *seedPeer) Stop() error {
266-
return s.client.Close()
316+
return nil
267317
}

0 commit comments

Comments
 (0)