Skip to content

Commit cdd49b5

Browse files
committed
fix: add unit tests for AnnouncePeers
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
1 parent 5826a1f commit cdd49b5

File tree

2 files changed

+273
-8
lines changed

2 files changed

+273
-8
lines changed

scheduler/service/service_v2.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
9393
for {
9494
select {
9595
case <-ctx.Done():
96-
logger.Info("context was done")
96+
logger.Info("announce peer context was done")
9797
return ctx.Err()
9898
default:
9999
}
@@ -141,29 +141,29 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
141141
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
142142
downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
143143
log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
144-
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
144+
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
145145
if err := v.handleDownloadPeerFinishedRequest(context.Background(), req.GetPeerId()); err != nil {
146146
log.Error(err)
147147
return err
148148
}
149149
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
150150
downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
151151
log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
152-
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
152+
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
153153
if err := v.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
154154
log.Error(err)
155155
return err
156156
}
157157
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
158158
log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
159-
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
159+
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
160160
if err := v.handleDownloadPeerFailedRequest(context.Background(), req.GetPeerId()); err != nil {
161161
log.Error(err)
162162
return err
163163
}
164164
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
165165
log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())
166-
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
166+
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
167167
if err := v.handleDownloadPeerBackToSourceFailedRequest(context.Background(), req.GetPeerId()); err != nil {
168168
log.Error(err)
169169
return err
@@ -870,7 +870,7 @@ func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) err
870870
for {
871871
select {
872872
case <-ctx.Done():
873-
logger.Info("context was done")
873+
logger.Info("announce peers context was done")
874874
return ctx.Err()
875875
default:
876876
}
@@ -1490,10 +1490,13 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
14901490
for _, p := range request.Peers {
14911491
hostID := p.GetHost().GetId()
14921492
peerTask := p.GetTask()
1493+
if peerTask == nil {
1494+
return nil, status.Error(codes.InvalidArgument, "request is invalid and doesn't contain a task")
1495+
}
14931496
taskID := peerTask.GetId()
14941497
peerID := p.GetId()
14951498
download := &commonv2.Download{
1496-
PieceLength: peerTask.GetPieceLength(),
1499+
PieceLength: &peerTask.PieceLength,
14971500
Digest: peerTask.Digest,
14981501
Url: peerTask.GetUrl(),
14991502
Tag: peerTask.Tag,
@@ -1586,5 +1589,6 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
15861589
peers = append(peers, peer)
15871590
}
15881591

1592+
logger.Infof("announce %d peer(s)", len(peers))
15891593
return peers, nil
15901594
}

scheduler/service/service_v2_test.go

Lines changed: 262 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
schedulerv2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
4545

4646
managertypes "d7y.io/dragonfly/v2/manager/types"
47+
"d7y.io/dragonfly/v2/pkg/idgen"
4748
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
4849
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
4950
"d7y.io/dragonfly/v2/scheduler/config"
@@ -3098,7 +3099,7 @@ func TestServiceV2_handleResource(t *testing.T) {
30983099
assert.Equal(peer.Priority, download.Priority)
30993100
assert.Equal(peer.Range.Start, int64(download.Range.Start))
31003101
assert.Equal(peer.Range.Length, int64(download.Range.Length))
3101-
assert.NotNil(peer.AnnouncePeerStream)
3102+
assert.NotNil(peer.AnnouncePeerStream.Load())
31023103
assert.EqualValues(peer.Host, mockHost)
31033104
assert.EqualValues(peer.Task, mockTask)
31043105
},
@@ -3407,3 +3408,263 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
34073408
})
34083409
}
34093410
}
3411+
3412+
func TestServiceV2_handleAnnouncePeersRequest(t *testing.T) {
3413+
tests := []struct {
3414+
name string
3415+
request *schedulerv2.AnnouncePeersRequest
3416+
run func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3417+
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3418+
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder)
3419+
}{
3420+
{
3421+
name: "task and host exist in scheduler, peer does not",
3422+
request: &schedulerv2.AnnouncePeersRequest{
3423+
Peers: []*commonv2.Peer{
3424+
{
3425+
Id: mockPeerID,
3426+
Pieces: []*commonv2.Piece{
3427+
{
3428+
Number: uint32(mockPiece.Number),
3429+
ParentId: &mockPiece.ParentID,
3430+
Offset: mockPiece.Offset,
3431+
Length: mockPiece.Length,
3432+
Digest: mockPiece.Digest.String(),
3433+
TrafficType: &mockPiece.TrafficType,
3434+
Cost: durationpb.New(mockPiece.Cost),
3435+
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
3436+
},
3437+
},
3438+
Task: &commonv2.Task{
3439+
Id: mockTaskID,
3440+
PieceLength: uint64(mockTaskPieceLength),
3441+
ContentLength: uint64(1024),
3442+
},
3443+
Host: &commonv2.Host{
3444+
Id: mockHostID,
3445+
},
3446+
},
3447+
},
3448+
},
3449+
run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3450+
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3451+
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3452+
gomock.InOrder(
3453+
mr.HostManager().Return(hostManager).Times(1),
3454+
mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3455+
mr.TaskManager().Return(taskManager).Times(1),
3456+
mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3457+
mr.PeerManager().Return(peerManager).Times(1),
3458+
mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1),
3459+
mr.PeerManager().Return(peerManager).Times(1),
3460+
mp.Store(gomock.Any()).Return().Times(1),
3461+
)
3462+
3463+
assert := assert.New(t)
3464+
peers, err := svc.handleAnnouncePeersRequest(context.Background(), request)
3465+
assert.NoError(err)
3466+
peer := peers[0]
3467+
assert.Equal(peer.ID, mockPeer.ID)
3468+
assert.Nil(peer.AnnouncePeerStream.Load())
3469+
assert.True(peer.FSM.Is(resource.PeerStateSucceeded))
3470+
assert.True(peer.Task.FSM.Is(resource.PeerStateSucceeded))
3471+
assert.EqualValues(peer.Host, mockHost)
3472+
assert.EqualValues(peer.Task, mockTask)
3473+
piece, _ := peer.Pieces.Load(mockPiece.Number)
3474+
assert.EqualValues(piece.(*resource.Piece).Digest, mockPiece.Digest)
3475+
},
3476+
},
3477+
{
3478+
name: "invalid request with no task",
3479+
request: &schedulerv2.AnnouncePeersRequest{
3480+
Peers: []*commonv2.Peer{
3481+
{
3482+
Id: mockPeerID,
3483+
Pieces: []*commonv2.Piece{
3484+
{
3485+
Number: uint32(mockPiece.Number),
3486+
ParentId: &mockPiece.ParentID,
3487+
Offset: mockPiece.Offset,
3488+
Length: mockPiece.Length,
3489+
Digest: mockPiece.Digest.String(),
3490+
TrafficType: &mockPiece.TrafficType,
3491+
Cost: durationpb.New(mockPiece.Cost),
3492+
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
3493+
},
3494+
},
3495+
Host: &commonv2.Host{
3496+
Id: mockHostID,
3497+
},
3498+
},
3499+
},
3500+
},
3501+
run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3502+
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3503+
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3504+
3505+
assert := assert.New(t)
3506+
_, err := svc.handleAnnouncePeersRequest(context.Background(), request)
3507+
assert.ErrorIs(err, status.Error(codes.InvalidArgument, "request is invalid and doesn't contain a task"))
3508+
},
3509+
},
3510+
{
3511+
name: "host does not exist in scheduler",
3512+
request: &schedulerv2.AnnouncePeersRequest{
3513+
Peers: []*commonv2.Peer{
3514+
{
3515+
Id: mockPeerID,
3516+
Task: &commonv2.Task{
3517+
Id: mockTaskID,
3518+
PieceLength: uint64(mockTaskPieceLength),
3519+
ContentLength: uint64(1024),
3520+
},
3521+
Host: &commonv2.Host{
3522+
Id: mockHostID,
3523+
},
3524+
},
3525+
},
3526+
},
3527+
run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3528+
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3529+
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3530+
gomock.InOrder(
3531+
mr.HostManager().Return(hostManager).Times(1),
3532+
mh.Load(gomock.Eq(mockHost.ID)).Return(nil, false).Times(1),
3533+
)
3534+
3535+
assert := assert.New(t)
3536+
_, err := svc.handleAnnouncePeersRequest(context.Background(), request)
3537+
assert.ErrorIs(err, status.Errorf(codes.NotFound, "host %s not found", mockHost.ID))
3538+
},
3539+
},
3540+
{
3541+
name: "task dag size exceeds the limit",
3542+
request: &schedulerv2.AnnouncePeersRequest{
3543+
Peers: []*commonv2.Peer{
3544+
{
3545+
Id: mockPeerID,
3546+
Pieces: []*commonv2.Piece{
3547+
{
3548+
Number: uint32(mockPiece.Number),
3549+
ParentId: &mockPiece.ParentID,
3550+
Offset: mockPiece.Offset,
3551+
Length: mockPiece.Length,
3552+
Digest: mockPiece.Digest.String(),
3553+
TrafficType: &mockPiece.TrafficType,
3554+
Cost: durationpb.New(mockPiece.Cost),
3555+
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
3556+
},
3557+
},
3558+
Task: &commonv2.Task{
3559+
Id: mockTaskID,
3560+
PieceLength: uint64(mockTaskPieceLength),
3561+
ContentLength: uint64(1024),
3562+
},
3563+
Host: &commonv2.Host{
3564+
Id: mockHostID,
3565+
},
3566+
},
3567+
},
3568+
},
3569+
run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3570+
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3571+
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3572+
gomock.InOrder(
3573+
mr.HostManager().Return(hostManager).Times(1),
3574+
mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3575+
mr.TaskManager().Return(taskManager).Times(1),
3576+
mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3577+
mr.PeerManager().Return(peerManager).Times(1),
3578+
mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1),
3579+
mr.PeerManager().Return(peerManager).Times(1),
3580+
mp.Store(gomock.Any()).Return().Times(1),
3581+
mr.PeerManager().Return(peerManager).Times(1),
3582+
mp.Delete(gomock.Eq(mockPeer.ID)).Return().Times(1),
3583+
mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1),
3584+
)
3585+
for i := 0; i < resource.PeerCountLimitForTask+1; i++ {
3586+
peer := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost)
3587+
mockTask.StorePeer(peer)
3588+
}
3589+
3590+
assert := assert.New(t)
3591+
_, err := svc.handleAnnouncePeersRequest(context.Background(), request)
3592+
assert.NoError(err)
3593+
_, loaded := peerManager.Load(mockPeer.ID)
3594+
assert.Equal(loaded, false)
3595+
},
3596+
},
3597+
{
3598+
name: "construct piece fails due to invalid digest",
3599+
request: &schedulerv2.AnnouncePeersRequest{
3600+
Peers: []*commonv2.Peer{
3601+
{
3602+
Id: mockPeerID,
3603+
Pieces: []*commonv2.Piece{
3604+
{
3605+
Number: uint32(mockPiece.Number),
3606+
ParentId: &mockPiece.ParentID,
3607+
Offset: mockPiece.Offset,
3608+
Length: mockPiece.Length,
3609+
Digest: mockPiece.Digest.String() + ":",
3610+
TrafficType: &mockPiece.TrafficType,
3611+
Cost: durationpb.New(mockPiece.Cost),
3612+
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
3613+
},
3614+
},
3615+
Task: &commonv2.Task{
3616+
Id: mockTaskID,
3617+
PieceLength: uint64(mockTaskPieceLength),
3618+
ContentLength: uint64(1024),
3619+
},
3620+
Host: &commonv2.Host{
3621+
Id: mockHostID,
3622+
},
3623+
},
3624+
},
3625+
},
3626+
run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3627+
hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3628+
mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3629+
gomock.InOrder(
3630+
mr.HostManager().Return(hostManager).Times(1),
3631+
mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3632+
mr.TaskManager().Return(taskManager).Times(1),
3633+
mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3634+
mr.PeerManager().Return(peerManager).Times(1),
3635+
mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1),
3636+
mr.PeerManager().Return(peerManager).Times(1),
3637+
mp.Store(gomock.Any()).Return().Times(1),
3638+
)
3639+
3640+
assert := assert.New(t)
3641+
_, err := svc.handleAnnouncePeersRequest(context.Background(), request)
3642+
assert.ErrorIs(err, status.Errorf(codes.InvalidArgument, "invalid digest"))
3643+
},
3644+
},
3645+
}
3646+
3647+
for _, tc := range tests {
3648+
t.Run(tc.name, func(t *testing.T) {
3649+
ctl := gomock.NewController(t)
3650+
defer ctl.Finish()
3651+
scheduling := schedulingmocks.NewMockScheduling(ctl)
3652+
res := resource.NewMockResource(ctl)
3653+
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
3654+
storage := storagemocks.NewMockStorage(ctl)
3655+
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
3656+
hostManager := resource.NewMockHostManager(ctl)
3657+
taskManager := resource.NewMockTaskManager(ctl)
3658+
peerManager := resource.NewMockPeerManager(ctl)
3659+
3660+
mockHost := resource.NewHost(
3661+
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
3662+
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
3663+
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
3664+
mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
3665+
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
3666+
3667+
tc.run(t, svc, tc.request, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
3668+
})
3669+
}
3670+
}

0 commit comments

Comments
 (0)