Skip to content

Commit 3f7e48e

Browse files
authored
feat: when peer disabled shared, scheduler will skip peer in filterCandidateParents (#3506)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 76cf35f commit 3f7e48e

File tree

9 files changed

+88
-3
lines changed

9 files changed

+88
-3
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
33
go 1.21
44

55
require (
6-
d7y.io/api/v2 v2.0.156
6+
d7y.io/api/v2 v2.0.157
77
github.com/MysteriousPotato/go-lockable v1.0.0
88
github.com/RichardKnop/machinery v1.10.8
99
github.com/Showmax/go-fqdn v1.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
5353
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
5454
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
5555
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
56-
d7y.io/api/v2 v2.0.156 h1:KjKnhCeSQaxugUCiZcq5BkulmKgqKHBh0iZDvU5eDZM=
57-
d7y.io/api/v2 v2.0.156/go.mod h1:wvA3IIh6Gjwsh1HsxujyELTh4cqlNtXIaYxDeVobN/w=
56+
d7y.io/api/v2 v2.0.157 h1:nzVAY1TBazmYzXwbNuGl//leUvsyYEt/fqObNEJ9CEw=
57+
d7y.io/api/v2 v2.0.157/go.mod h1:wvA3IIh6Gjwsh1HsxujyELTh4cqlNtXIaYxDeVobN/w=
5858
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
5959
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
6060
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

scheduler/resource/host.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ func WithConcurrentUploadLimit(limit int32) HostOption {
5252
}
5353
}
5454

55+
// WithDisableShared sets host's DisableShared.
56+
func WithDisableShared(disableShared bool) HostOption {
57+
return func(h *Host) {
58+
h.DisableShared = disableShared
59+
}
60+
}
61+
5562
// WithOS sets host's os.
5663
func WithOS(os string) HostOption {
5764
return func(h *Host) {
@@ -152,6 +159,10 @@ type Host struct {
152159
// ObjectStoragePort is object storage port.
153160
ObjectStoragePort int32
154161

162+
// DisableShared is whether the host is disabled for
163+
// shared with other peers.
164+
DisableShared bool
165+
155166
// Host OS.
156167
OS string
157168

@@ -363,6 +374,7 @@ func NewHost(
363374
Hostname: hostname,
364375
Port: port,
365376
DownloadPort: downloadPort,
377+
DisableShared: false,
366378
ConcurrentUploadLimit: atomic.NewInt32(int32(concurrentUploadLimit)),
367379
ConcurrentUploadCount: atomic.NewInt32(0),
368380
UploadCount: atomic.NewInt64(0),

scheduler/resource/peer_manager.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,18 @@ func (p *peerManager) RunGC() error {
167167
return true
168168
}
169169

170+
// If host is disabled shared, then set the peer state to PeerStateLeave and then delete peer.
171+
// Avoid the disabled shared host to be scheduled, and store the unused peer in the peer manager.
172+
if peer.Host.DisableShared {
173+
peer.Log.Info("peer host is disabled shared, causing the peer to leave")
174+
if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
175+
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
176+
return true
177+
}
178+
179+
return true
180+
}
181+
170182
// If the peer's elapsed of downloading piece exceeds the pieceDownloadTimeout,
171183
// then sets the peer state to PeerStateLeave and then delete peer.
172184
if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateBackToSource) {

scheduler/resource/peer_manager_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,35 @@ func TestPeerManager_RunGC(t *testing.T) {
347347
assert.Equal(peer.FSM.Current(), PeerStateLeave)
348348
},
349349
},
350+
{
351+
name: "peer reclaimed with disabled shared",
352+
gcConfig: &config.GCConfig{
353+
PieceDownloadTimeout: 5 * time.Minute,
354+
PeerGCInterval: 1 * time.Second,
355+
PeerTTL: 1 * time.Microsecond,
356+
HostTTL: 10 * time.Second,
357+
},
358+
mock: func(m *gc.MockGCMockRecorder) {
359+
m.Add(gomock.Any()).Return(nil).Times(1)
360+
},
361+
expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) {
362+
assert := assert.New(t)
363+
peerManager.Store(mockPeer)
364+
mockPeer.Host.DisableShared = true
365+
err := peerManager.RunGC()
366+
assert.NoError(err)
367+
368+
peer, loaded := peerManager.Load(mockPeer.ID)
369+
assert.Equal(loaded, true)
370+
assert.Equal(peer.FSM.Current(), PeerStateLeave)
371+
372+
err = peerManager.RunGC()
373+
assert.NoError(err)
374+
375+
_, loaded = peerManager.Load(mockPeer.ID)
376+
assert.Equal(loaded, false)
377+
},
378+
},
350379
{
351380
name: "peer download piece timeout and peer state is PeerStateRunning",
352381
gcConfig: &config.GCConfig{

scheduler/scheduling/scheduling.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,12 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
516516
continue
517517
}
518518

519+
// Candidate parent is disable shared.
520+
if candidateParent.Host.DisableShared {
521+
peer.Log.Debugf("parent %s host %s is not selected because it is disable shared", candidateParent.ID, candidateParent.Host.ID)
522+
continue
523+
}
524+
519525
// Candidate parent host is not allowed to be the same as the peer host,
520526
// because dfdaemon cannot handle the situation
521527
// where two tasks are downloading and downloading each other.

scheduler/scheduling/scheduling_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,22 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
838838
assert.False(ok)
839839
},
840840
},
841+
{
842+
name: "parent is disabled share data with other peers",
843+
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
844+
peer.FSM.SetState(resource.PeerStateReceivedNormal)
845+
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
846+
peer.Task.StorePeer(peer)
847+
peer.Task.StorePeer(mockPeers[0])
848+
mockPeers[0].Host.DisableShared = true
849+
850+
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
851+
},
852+
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) {
853+
assert := assert.New(t)
854+
assert.False(ok)
855+
},
856+
},
841857
{
842858
name: "find back-to-source parent",
843859
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {

scheduler/service/service_v2.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
513513
host, loaded := v.resource.HostManager().Load(req.Host.GetId())
514514
if !loaded {
515515
options := []resource.HostOption{
516+
resource.WithDisableShared(req.Host.GetDisableShared()),
516517
resource.WithOS(req.Host.GetOs()),
517518
resource.WithPlatform(req.Host.GetPlatform()),
518519
resource.WithPlatformFamily(req.Host.GetPlatformFamily()),
@@ -610,6 +611,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
610611
host.Port = req.Host.GetPort()
611612
host.DownloadPort = req.Host.GetDownloadPort()
612613
host.Type = types.HostType(req.Host.GetType())
614+
host.DisableShared = req.Host.GetDisableShared()
613615
host.OS = req.Host.GetOs()
614616
host.Platform = req.Host.GetPlatform()
615617
host.PlatformFamily = req.Host.GetPlatformFamily()

scheduler/service/service_v2_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
435435
Ip: "127.0.0.1",
436436
Port: 8003,
437437
DownloadPort: 8001,
438+
DisableShared: true,
438439
Os: "darwin",
439440
Platform: "darwin",
440441
PlatformFamily: "Standalone Workstation",
@@ -505,6 +506,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
505506
assert.Equal(host.IP, req.Host.Ip)
506507
assert.Equal(host.Port, req.Host.Port)
507508
assert.Equal(host.DownloadPort, req.Host.DownloadPort)
509+
assert.Equal(host.DisableShared, req.Host.DisableShared)
508510
assert.Equal(host.OS, req.Host.Os)
509511
assert.Equal(host.Platform, req.Host.Platform)
510512
assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
@@ -541,6 +543,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
541543
Ip: "127.0.0.1",
542544
Port: 8003,
543545
DownloadPort: 8001,
546+
DisableShared: false,
544547
Os: "darwin",
545548
Platform: "darwin",
546549
PlatformFamily: "Standalone Workstation",
@@ -611,6 +614,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
611614
assert.Equal(host.IP, req.Host.Ip)
612615
assert.Equal(host.Port, req.Host.Port)
613616
assert.Equal(host.DownloadPort, req.Host.DownloadPort)
617+
assert.Equal(host.DisableShared, req.Host.DisableShared)
614618
assert.Equal(host.OS, req.Host.Os)
615619
assert.Equal(host.Platform, req.Host.Platform)
616620
assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
@@ -647,6 +651,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
647651
Ip: "127.0.0.1",
648652
Port: 8003,
649653
DownloadPort: 8001,
654+
DisableShared: true,
650655
Os: "darwin",
651656
Platform: "darwin",
652657
PlatformFamily: "Standalone Workstation",
@@ -718,6 +723,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
718723
assert.Equal(host.IP, req.Host.Ip)
719724
assert.Equal(host.Port, req.Host.Port)
720725
assert.Equal(host.DownloadPort, req.Host.DownloadPort)
726+
assert.Equal(host.DisableShared, req.Host.DisableShared)
721727
assert.Equal(host.OS, req.Host.Os)
722728
assert.Equal(host.Platform, req.Host.Platform)
723729
assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
@@ -749,6 +755,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
749755
Ip: "127.0.0.1",
750756
Port: 8003,
751757
DownloadPort: 8001,
758+
DisableShared: false,
752759
Os: "darwin",
753760
Platform: "darwin",
754761
PlatformFamily: "Standalone Workstation",
@@ -820,6 +827,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
820827
assert.Equal(host.IP, req.Host.Ip)
821828
assert.Equal(host.Port, req.Host.Port)
822829
assert.Equal(host.DownloadPort, req.Host.DownloadPort)
830+
assert.Equal(host.DisableShared, req.Host.DisableShared)
823831
assert.Equal(host.OS, req.Host.Os)
824832
assert.Equal(host.Platform, req.Host.Platform)
825833
assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)

0 commit comments

Comments
 (0)