Skip to content

Commit 597320a

Browse files
committed
refactor: refacts the seed peer selection logic from grpc resolver to self pick
Signed-off-by: chlins <chlins.zhang@gmail.com>
1 parent 88dc8e1 commit 597320a

13 files changed

+295
-1035
lines changed

scheduler/job/job.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@ import (
3939
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
4040
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
4141
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
42-
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
4342

4443
logger "d7y.io/dragonfly/v2/internal/dflog"
4544
internaljob "d7y.io/dragonfly/v2/internal/job"
4645
managertypes "d7y.io/dragonfly/v2/manager/types"
46+
"d7y.io/dragonfly/v2/pkg/dfnet"
4747
"d7y.io/dragonfly/v2/pkg/idgen"
48+
cndsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
4849
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
4950
"d7y.io/dragonfly/v2/scheduler/config"
5051
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
@@ -79,10 +80,11 @@ type job struct {
7980
localJob *internaljob.Job
8081
resource resource.Resource
8182
config *config.Config
83+
dialOptions []grpc.DialOption
8284
}
8385

8486
// New creates a new Job.
85-
func New(cfg *config.Config, resource resource.Resource) (Job, error) {
87+
func New(cfg *config.Config, resource resource.Resource, dialOptions ...grpc.DialOption) (Job, error) {
8688
redisConfig := &internaljob.Config{
8789
Addrs: cfg.Database.Redis.Addrs,
8890
MasterName: cfg.Database.Redis.MasterName,
@@ -127,6 +129,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
127129
localJob: localJob,
128130
resource: resource,
129131
config: cfg,
132+
dialOptions: dialOptions,
130133
}
131134

132135
namedJobFuncs := map[string]any{
@@ -241,11 +244,6 @@ func (j *job) PreheatSingleSeedPeer(ctx context.Context, req *internaljob.Prehea
241244
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
242245
}
243246

244-
// If scheduler has no available seed peer, return error.
245-
if len(j.resource.SeedPeer().Client().Addrs()) == 0 {
246-
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
247-
}
248-
249247
// Preheat by v2 grpc protocol. If seed peer does not support
250248
// v2 protocol, preheat by v1 grpc protocol.
251249
resp, err := j.preheatV2SingleSeedPeer(ctx, req, log)
@@ -275,8 +273,22 @@ func (j *job) preheatV1SingleSeedPeer(ctx context.Context, req *internaljob.Preh
275273
Priority: commonv1.Priority(req.Priority),
276274
}
277275

276+
selectedSeedPeer, err := j.resource.SeedPeer().SelectSeedPeer(ctx, taskID)
277+
if err != nil {
278+
return nil, err
279+
}
280+
281+
log.Infof("[preheat]: selected seed peer %s", selectedSeedPeer)
282+
283+
// TODO(chlins): reuse the client if we encounter the performance issue in future.
284+
client, err := cndsystemclient.GetClientByAddr(ctx, dfnet.NetAddr{Type: dfnet.TCP, Addr: selectedSeedPeer}, j.dialOptions...)
285+
if err != nil {
286+
return nil, err
287+
}
288+
defer client.Close()
289+
278290
// Trigger seed peer download seeds.
279-
stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
291+
stream, err := client.ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
280292
TaskId: taskID,
281293
Url: req.URL,
282294
UrlMeta: urlMeta,
@@ -343,7 +355,22 @@ func (j *job) preheatV2SingleSeedPeerByURL(ctx context.Context, url string, req
343355
filteredQueryParams := idgen.ParseFilteredQueryParams(req.FilteredQueryParams)
344356
taskID := idgen.TaskIDV2ByURLBased(url, req.PieceLength, req.Tag, req.Application, filteredQueryParams)
345357
advertiseIP := j.config.Server.AdvertiseIP.String()
346-
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
358+
359+
selectedSeedPeer, err := j.resource.SeedPeer().SelectSeedPeer(ctx, taskID)
360+
if err != nil {
361+
return nil, err
362+
}
363+
364+
log.Infof("[preheat]: selected seed peer %s", selectedSeedPeer)
365+
366+
// TODO(chlins): reuse the client if we encounter the performance issue in future.
367+
client, err := dfdaemonclient.GetV2ByAddr(ctx, selectedSeedPeer, j.dialOptions...)
368+
if err != nil {
369+
return nil, err
370+
}
371+
defer client.Close()
372+
373+
stream, err := client.DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
347374
Download: &commonv2.Download{
348375
Url: url,
349376
PieceLength: req.PieceLength,
@@ -410,7 +437,7 @@ func (j *job) PreheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
410437
for _, seedPeer := range seedPeers {
411438
var (
412439
hostname = seedPeer.Hostname
413-
ip = seedPeer.Ip
440+
ip = seedPeer.IP
414441
port = seedPeer.Port
415442
)
416443

@@ -554,20 +581,20 @@ func (j *job) PreheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
554581
// 2. Count: If count is provided, selects up to the specified number of seed peers. If count exceeds the number of available seed peers, all seed peers are selected.
555582
// 3. Percentage: If percentage is provided, selects a proportional number of seed peers (rounded down). Ensures at least one seed peer is selected if percentage > 0.
556583
// Priority: IPs > Count > Percentage
557-
func (j *job) selectSeedPeers(ips []string, count *uint32, percentage *uint32, log *logger.SugaredLoggerOnWith) ([]*managerv2.SeedPeer, error) {
584+
func (j *job) selectSeedPeers(ips []string, count *uint32, percentage *uint32, log *logger.SugaredLoggerOnWith) ([]*resource.Host, error) {
558585
if !j.config.SeedPeer.Enable {
559586
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
560587
}
561588

562-
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
589+
seedPeers := j.resource.HostManager().LoadAllSeeds()
563590
if len(seedPeers) == 0 {
564591
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
565592
}
566593

567594
if len(ips) > 0 {
568-
selectedSeedPeers := make([]*managerv2.SeedPeer, 0, len(ips))
595+
selectedSeedPeers := make([]*resource.Host, 0, len(ips))
569596
for _, seedPeer := range seedPeers {
570-
if slices.Contains(ips, seedPeer.Ip) {
597+
if slices.Contains(ips, seedPeer.IP) {
571598
selectedSeedPeers = append(selectedSeedPeers, seedPeer)
572599
continue
573600
}

scheduler/resource/standard/host_manager.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ type HostManager interface {
6363
// LoadAllNormals loads all normal hosts through the Range of sync.Map.
6464
LoadAllNormals() []*Host
6565

66+
// LoadAllSeeds loads all seed hosts through the Range of sync.Map.
67+
LoadAllSeeds() []*Host
68+
6669
// Try to reclaim host.
6770
RunGC(context.Context) error
6871
}
@@ -163,6 +166,27 @@ func (h *hostManager) LoadAllNormals() []*Host {
163166
return hosts
164167
}
165168

169+
// LoadAllSeeds loads all seed hosts through the Range of sync.Map.
170+
func (h *hostManager) LoadAllSeeds() []*Host {
171+
hosts := make([]*Host, 0)
172+
h.Map.Range(func(key, value any) bool {
173+
host, ok := value.(*Host)
174+
if !ok {
175+
host.Log.Error("invalid host")
176+
return true
177+
}
178+
179+
if host.Type == types.HostTypeNormal {
180+
return true
181+
}
182+
183+
hosts = append(hosts, host)
184+
return true
185+
})
186+
187+
return hosts
188+
}
189+
166190
// LoadRandom loads host randomly through the Range of sync.Map.
167191
func (h *hostManager) LoadRandom(n int, blocklist set.SafeSet[string]) []*Host {
168192
hosts := make([]*Host, 0, n)

scheduler/resource/standard/host_manager_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/resource/standard/resource.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type Resource interface {
4141
// Task manager interface.
4242
TaskManager() TaskManager
4343

44+
// Serve serves resource service.
45+
Serve() error
46+
4447
// Stop resource service.
4548
Stop() error
4649
}
@@ -64,7 +67,7 @@ type resource struct {
6467
}
6568

6669
// New returns Resource interface.
67-
func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, transportCredentials credentials.TransportCredentials) (Resource, error) {
70+
func New(cfg *config.Config, gc gc.GC, transportCredentials credentials.TransportCredentials) (Resource, error) {
6871
resource := &resource{config: cfg}
6972

7073
// Initialize host manager interface.
@@ -91,12 +94,7 @@ func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, tran
9194
// Initialize seed peer interface.
9295
if cfg.SeedPeer.Enable {
9396
dialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithTransportCredentials(transportCredentials)}
94-
client, err := newSeedPeerClient(dynconfig, hostManager, dialOptions...)
95-
if err != nil {
96-
return nil, err
97-
}
98-
99-
resource.seedPeer = newSeedPeer(client, peerManager, hostManager)
97+
resource.seedPeer = newSeedPeer(peerManager, hostManager, dialOptions...)
10098
}
10199

102100
return resource, nil
@@ -122,6 +120,15 @@ func (r *resource) TaskManager() TaskManager {
122120
return r.taskManager
123121
}
124122

123+
// Serve serves resource service.
124+
func (r *resource) Serve() error {
125+
if r.config.SeedPeer.Enable {
126+
return r.seedPeer.Serve()
127+
}
128+
129+
return nil
130+
}
131+
125132
// Stop resource service.
126133
func (r *resource) Stop() error {
127134
if r.config.SeedPeer.Enable {

scheduler/resource/standard/resource_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/resource/standard/resource_test.go

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -23,43 +23,25 @@ import (
2323

2424
"github.com/stretchr/testify/assert"
2525
"go.uber.org/mock/gomock"
26-
"google.golang.org/grpc/resolver"
27-
28-
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
2926

3027
"d7y.io/dragonfly/v2/pkg/gc"
3128
"d7y.io/dragonfly/v2/pkg/rpc"
3229
"d7y.io/dragonfly/v2/scheduler/config"
33-
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
3430
)
3531

3632
func TestResource_New(t *testing.T) {
3733
tests := []struct {
3834
name string
3935
config *config.Config
40-
mock func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
36+
mock func(mg *gc.MockGCMockRecorder)
4137
expect func(t *testing.T, resource Resource, err error)
4238
}{
4339
{
4440
name: "new resource",
4541
config: config.New(),
46-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
42+
mock: func(mg *gc.MockGCMockRecorder) {
4743
gomock.InOrder(
4844
mg.Add(gomock.Any()).Return(nil).Times(3),
49-
md.Get().Return(&config.DynconfigData{
50-
Scheduler: &managerv2.Scheduler{
51-
SeedPeers: []*managerv2.SeedPeer{
52-
{
53-
Id: 1,
54-
},
55-
},
56-
},
57-
}, nil).Times(1),
58-
md.Register(gomock.Any()).Return().Times(1),
59-
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
60-
md.Register(gomock.Any()).Return().Times(1),
61-
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
62-
md.Register(gomock.Any()).Return().Times(1),
6345
)
6446
},
6547
expect: func(t *testing.T, resource Resource, err error) {
@@ -71,7 +53,7 @@ func TestResource_New(t *testing.T) {
7153
{
7254
name: "new resource failed because of host manager error",
7355
config: config.New(),
74-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
56+
mock: func(mg *gc.MockGCMockRecorder) {
7557
mg.Add(gomock.Any()).Return(errors.New("foo")).Times(1)
7658
},
7759
expect: func(t *testing.T, resource Resource, err error) {
@@ -82,7 +64,7 @@ func TestResource_New(t *testing.T) {
8264
{
8365
name: "new resource failed because of task manager error",
8466
config: config.New(),
85-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
67+
mock: func(mg *gc.MockGCMockRecorder) {
8668
gomock.InOrder(
8769
mg.Add(gomock.Any()).Return(nil).Times(1),
8870
mg.Add(gomock.Any()).Return(errors.New("foo")).Times(1),
@@ -96,7 +78,7 @@ func TestResource_New(t *testing.T) {
9678
{
9779
name: "new resource failed because of peer manager error",
9880
config: config.New(),
99-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
81+
mock: func(mg *gc.MockGCMockRecorder) {
10082
gomock.InOrder(
10183
mg.Add(gomock.Any()).Return(nil).Times(2),
10284
mg.Add(gomock.Any()).Return(errors.New("foo")).Times(1),
@@ -107,36 +89,12 @@ func TestResource_New(t *testing.T) {
10789
assert.EqualError(err, "foo")
10890
},
10991
},
110-
{
111-
name: "new resource faild because of dynconfig get error",
112-
config: config.New(),
113-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
114-
gomock.InOrder(
115-
mg.Add(gomock.Any()).Return(nil).Times(3),
116-
md.Get().Return(&config.DynconfigData{}, errors.New("foo")).Times(1),
117-
)
118-
},
119-
expect: func(t *testing.T, resource Resource, err error) {
120-
assert := assert.New(t)
121-
assert.EqualError(err, "foo")
122-
},
123-
},
12492
{
12593
name: "new resource faild because of seed peer list is empty",
12694
config: config.New(),
127-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
95+
mock: func(mg *gc.MockGCMockRecorder) {
12896
gomock.InOrder(
12997
mg.Add(gomock.Any()).Return(nil).Times(3),
130-
md.Get().Return(&config.DynconfigData{
131-
Scheduler: &managerv2.Scheduler{
132-
SeedPeers: []*managerv2.SeedPeer{},
133-
},
134-
}, nil).Times(1),
135-
md.Register(gomock.Any()).Return().Times(1),
136-
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
137-
md.Register(gomock.Any()).Return().Times(1),
138-
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
139-
md.Register(gomock.Any()).Return().Times(1),
14098
)
14199
},
142100
expect: func(t *testing.T, resource Resource, err error) {
@@ -159,7 +117,7 @@ func TestResource_New(t *testing.T) {
159117
Enable: false,
160118
},
161119
},
162-
mock: func(mg *gc.MockGCMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
120+
mock: func(mg *gc.MockGCMockRecorder) {
163121
mg.Add(gomock.Any()).Return(nil).Times(3)
164122
},
165123
expect: func(t *testing.T, resource Resource, err error) {
@@ -175,10 +133,9 @@ func TestResource_New(t *testing.T) {
175133
ctl := gomock.NewController(t)
176134
defer ctl.Finish()
177135
gc := gc.NewMockGC(ctl)
178-
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
179-
tc.mock(gc.EXPECT(), dynconfig.EXPECT())
136+
tc.mock(gc.EXPECT())
180137

181-
resource, err := New(tc.config, gc, dynconfig, rpc.NewInsecureCredentials())
138+
resource, err := New(tc.config, gc, rpc.NewInsecureCredentials())
182139
tc.expect(t, resource, err)
183140
})
184141
}

0 commit comments

Comments
 (0)