From cb73ffba68ea3a1c36eb9a8897674826ce5968cf Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 24 Feb 2025 18:03:53 +0800 Subject: [PATCH] feat: support to set piece length for preheat Signed-off-by: Gaius --- client-rs | 2 +- internal/job/types.go | 1 + manager/job/preheat.go | 2 + manager/job/task.go | 4 +- manager/types/job.go | 18 ++++ pkg/idgen/task_id.go | 10 +- pkg/idgen/task_id_test.go | 25 ++++- scheduler/job/job.go | 7 +- .../resource/standard/host_manager_test.go | 2 +- scheduler/resource/standard/host_test.go | 10 +- .../resource/standard/peer_manager_test.go | 10 +- scheduler/resource/standard/peer_test.go | 32 +++---- scheduler/resource/standard/seed_peer_test.go | 2 +- scheduler/resource/standard/task.go | 7 +- .../resource/standard/task_manager_test.go | 10 +- scheduler/resource/standard/task_test.go | 92 ++++++++++--------- .../evaluator/evaluator_base_test.go | 76 +++++++-------- scheduler/scheduling/scheduling_test.go | 46 +++++----- scheduler/service/service_v1.go | 6 +- scheduler/service/service_v1_test.go | 74 +++++++-------- scheduler/service/service_v2.go | 4 +- scheduler/service/service_v2_test.go | 36 ++++---- test/e2e/v2/util/file.go | 2 +- test/e2e/v2/util/task.go | 9 ++ 24 files changed, 278 insertions(+), 209 deletions(-) diff --git a/client-rs b/client-rs index 3339d5ca2fe..259ebc0d523 160000 --- a/client-rs +++ b/client-rs @@ -1 +1 @@ -Subproject commit 3339d5ca2feac41a63f0472811cb38205e5fbaf2 +Subproject commit 259ebc0d5233f56397429bc43cdb2cd3689b20e2 diff --git a/internal/job/types.go b/internal/job/types.go index d54f9a20eb5..d1111f882c1 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -23,6 +23,7 @@ import ( // PreheatRequest defines the request parameters for preheating. type PreheatRequest struct { URL string `json:"url" validate:"required,url"` + PieceLength *uint64 `json:"pieceLength" binding:"omitempty,gte=4194304"` Tag string `json:"tag" validate:"omitempty"` FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"` Headers map[string]string `json:"headers" validate:"omitempty"` diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 50c61cfd241..e5d08ef9793 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -128,6 +128,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul files = []internaljob.PreheatRequest{ { URL: json.URL, + PieceLength: json.PieceLength, Tag: json.Tag, FilteredQueryParams: json.FilteredQueryParams, Headers: json.Headers, @@ -349,6 +350,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh header.Set("Accept", v.MediaType) layer := internaljob.PreheatRequest{ URL: image.blobsURL(v.Digest.String()), + PieceLength: args.PieceLength, Tag: args.Tag, FilteredQueryParams: args.FilteredQueryParams, Headers: nethttp.HeaderToMap(header), diff --git a/manager/job/task.go b/manager/job/task.go index de078f50511..2974893f49a 100644 --- a/manager/job/task.go +++ b/manager/job/task.go @@ -63,7 +63,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, taskID := json.TaskID if json.URL != "" { - taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) + taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) } args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{ @@ -121,7 +121,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul taskID := json.TaskID if json.URL != "" { - taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) + taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) } args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{ diff --git a/manager/types/job.go b/manager/types/job.go index f9421196feb..fa982b738d3 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -111,6 +111,12 @@ type PreheatArgs struct { // URL is the image url for preheating. URL string `json:"url" binding:"required"` + // PieceLength is the piece length(bytes) for downloading file. The value needs to + // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). + // If the piece length is not specified, the piece length will be calculated + // according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // Tag is the tag for preheating. Tag string `json:"tag" binding:"omitempty"` @@ -177,6 +183,12 @@ type GetTaskArgs struct { // URL is the download url of the task. URL string `json:"url" binding:"omitempty"` + // PieceLength is the piece length(bytes) for downloading file. The value needs to + // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). + // If the piece length is not specified, the piece length will be calculated + // according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // Tag is the tag of the task. Tag string `json:"tag" binding:"omitempty"` @@ -211,6 +223,12 @@ type DeleteTaskArgs struct { // URL is the download url of the task. URL string `json:"url" binding:"omitempty"` + // PieceLength is the piece length(bytes) for downloading file. The value needs to + // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). + // If the piece length is not specified, the piece length will be calculated + // according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // Tag is the tag of the task. Tag string `json:"tag" binding:"omitempty"` diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 44f82847296..39e6a48999a 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -17,6 +17,7 @@ package idgen import ( + "strconv" "strings" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" @@ -91,11 +92,16 @@ func ParseFilteredQueryParams(rawFilteredQueryParams string) []string { } // TaskIDV2 generates v2 version of task id. -func TaskIDV2(url, tag, application string, filteredQueryParams []string) string { +func TaskIDV2(url string, pieceLength *uint64, tag, application string, filteredQueryParams []string) string { url, err := neturl.FilterQueryParams(url, filteredQueryParams) if err != nil { url = "" } - return pkgdigest.SHA256FromStrings(url, tag, application) + params := []string{url, tag, application} + if pieceLength != nil { + params = append(params, strconv.FormatUint(*pieceLength, 10)) + } + + return pkgdigest.SHA256FromStrings(params...) } diff --git a/pkg/idgen/task_id_test.go b/pkg/idgen/task_id_test.go index 6b7e95f9a26..bef59ee4afb 100644 --- a/pkg/idgen/task_id_test.go +++ b/pkg/idgen/task_id_test.go @@ -107,9 +107,12 @@ func TestTaskIDV1(t *testing.T) { } func TestTaskIDV2(t *testing.T) { + pieceLength := uint64(1024) + tests := []struct { name string url string + pieceLength *uint64 tag string application string filters []string @@ -118,9 +121,20 @@ func TestTaskIDV2(t *testing.T) { { name: "generate taskID", url: "https://example.com", + pieceLength: &pieceLength, tag: "foo", application: "bar", filters: []string{}, + expect: func(t *testing.T, d any) { + assert := assert.New(t) + assert.Equal(d, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221") + }, + }, + { + name: "generate taskID with tag and application", + url: "https://example.com", + tag: "foo", + application: "bar", expect: func(t *testing.T, d any) { assert := assert.New(t) assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d") @@ -144,6 +158,15 @@ func TestTaskIDV2(t *testing.T) { assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d") }, }, + { + name: "generate taskID with pieceLength", + url: "https://example.com", + pieceLength: &pieceLength, + expect: func(t *testing.T, d any) { + assert := assert.New(t) + assert.Equal(d, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38") + }, + }, { name: "generate taskID with filters", url: "https://example.com?foo=foo&bar=bar", @@ -157,7 +180,7 @@ func TestTaskIDV2(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters)) + tc.expect(t, TaskIDV2(tc.url, tc.pieceLength, tc.tag, tc.application, tc.filters)) }) } } diff --git a/scheduler/job/job.go b/scheduler/job/job.go index b6e370a9231..55c67260cf3 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -170,9 +170,9 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) { return "", err } - taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)) + taskID := idgen.TaskIDV2(req.URL, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)) log := logger.WithTask(taskID, req.URL) - log.Infof("preheat %s request: %#v", req.URL, req) + log.Infof("preheat %s %d request: %#v", req.URL, req.PieceLength, req) ctx, cancel := context.WithTimeout(ctx, req.Timeout) defer cancel() @@ -299,6 +299,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter taskID, &dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{ Url: req.URL, + PieceLength: req.PieceLength, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, @@ -440,6 +441,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj taskID, &dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{ Url: req.URL, + PieceLength: req.PieceLength, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, @@ -583,6 +585,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{ Download: &commonv2.Download{ Url: req.URL, + PieceLength: req.PieceLength, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, diff --git a/scheduler/resource/standard/host_manager_test.go b/scheduler/resource/standard/host_manager_test.go index e683a0a38e4..6c5320fc87f 100644 --- a/scheduler/resource/standard/host_manager_test.go +++ b/scheduler/resource/standard/host_manager_test.go @@ -538,7 +538,7 @@ func TestHostManager_RunGC(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) hostManager, err := newHostManager(mockHostGCConfig, gc) if err != nil { diff --git a/scheduler/resource/standard/host_test.go b/scheduler/resource/standard/host_test.go index d2edc49c2c2..0430a0ffe57 100644 --- a/scheduler/resource/standard/host_test.go +++ b/scheduler/resource/standard/host_test.go @@ -620,7 +620,7 @@ func TestHost_LoadPeer(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, host) host.StorePeer(mockPeer) @@ -665,7 +665,7 @@ func TestHost_StorePeer(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(tc.peerID, mockTask, host) host.StorePeer(mockPeer) @@ -711,7 +711,7 @@ func TestHost_DeletePeer(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, host) host.StorePeer(mockPeer) @@ -763,7 +763,7 @@ func TestHost_LeavePeers(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, host) tc.expect(t, host, mockPeer) @@ -815,7 +815,7 @@ func TestHost_FreeUploadCount(t *testing.T) { host := NewHost( tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, host) tc.expect(t, host, mockTask, mockPeer) diff --git a/scheduler/resource/standard/peer_manager_test.go b/scheduler/resource/standard/peer_manager_test.go index 84c1a59b065..770e1bf1175 100644 --- a/scheduler/resource/standard/peer_manager_test.go +++ b/scheduler/resource/standard/peer_manager_test.go @@ -136,7 +136,7 @@ func TestPeerManager_Load(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -193,7 +193,7 @@ func TestPeerManager_Store(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -248,7 +248,7 @@ func TestPeerManager_LoadOrStore(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -305,7 +305,7 @@ func TestPeerManager_Delete(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) if err != nil { @@ -578,7 +578,7 @@ func TestPeerManager_RunGC(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(tc.gcConfig, gc) if err != nil { diff --git a/scheduler/resource/standard/peer_test.go b/scheduler/resource/standard/peer_test.go index bc4bc37f767..f8abbb5da05 100644 --- a/scheduler/resource/standard/peer_test.go +++ b/scheduler/resource/standard/peer_test.go @@ -169,7 +169,7 @@ func TestPeer_NewPeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) tc.expect(t, NewPeer(tc.id, mockTask, mockHost, tc.options...), mockTask, mockHost) }) } @@ -204,7 +204,7 @@ func TestPeer_AppendPieceCost(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer) @@ -241,7 +241,7 @@ func TestPeer_PieceCosts(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer) @@ -283,7 +283,7 @@ func TestPeer_LoadReportPieceResultStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -316,7 +316,7 @@ func TestPeer_StoreReportPieceResultStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -349,7 +349,7 @@ func TestPeer_DeleteReportPieceResultStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -390,7 +390,7 @@ func TestPeer_LoadAnnouncePeerStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -423,7 +423,7 @@ func TestPeer_StoreAnnouncePeerStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -456,7 +456,7 @@ func TestPeer_DeleteAnnouncePeerStream(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) }) @@ -512,7 +512,7 @@ func TestPeer_LoadPiece(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) peer.StorePiece(tc.piece) @@ -553,7 +553,7 @@ func TestPeer_StorePiece(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) peer.StorePiece(tc.piece) @@ -598,7 +598,7 @@ func TestPeer_DeletePiece(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) peer.StorePiece(tc.piece) @@ -646,7 +646,7 @@ func TestPeer_Parents(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost) tc.expect(t, peer, seedPeer, stream) @@ -692,7 +692,7 @@ func TestPeer_Children(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost) tc.expect(t, peer, seedPeer, stream) @@ -774,7 +774,7 @@ func TestPeer_DownloadTinyFile(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) if tc.mockServer == nil { @@ -926,7 +926,7 @@ func TestPeer_CalculatePriority(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.mock(peer, dynconfig.EXPECT()) tc.expect(t, peer.CalculatePriority(dynconfig)) diff --git a/scheduler/resource/standard/seed_peer_test.go b/scheduler/resource/standard/seed_peer_test.go index 46bcab160c0..f40ab39dedc 100644 --- a/scheduler/resource/standard/seed_peer_test.go +++ b/scheduler/resource/standard/seed_peer_test.go @@ -118,7 +118,7 @@ func TestSeedPeer_TriggerTask(t *testing.T) { tc.mock(client.EXPECT()) seedPeer := newSeedPeer(client, peerManager, hostManager) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer, result, err := seedPeer.TriggerTask(context.Background(), nil, mockTask) tc.expect(t, peer, result, err) }) diff --git a/scheduler/resource/standard/task.go b/scheduler/resource/standard/task.go index 7ccdcf2e8ca..3fbca931300 100644 --- a/scheduler/resource/standard/task.go +++ b/scheduler/resource/standard/task.go @@ -88,7 +88,7 @@ const ( type TaskOption func(task *Task) // WithPieceLength set PieceLength for task. -func WithPieceLength(pieceLength int32) TaskOption { +func WithPieceLength(pieceLength uint64) TaskOption { return func(t *Task) { t.PieceLength = pieceLength } @@ -128,7 +128,7 @@ type Task struct { Header map[string]string // Task piece length. - PieceLength int32 + PieceLength uint64 // DirectPiece is tiny piece data. DirectPiece []byte @@ -169,12 +169,13 @@ type Task struct { } // New task instance. -func NewTask(id, url, tag, application string, typ commonv2.TaskType, filteredQueryParams []string, +func NewTask(id, url string, pieceLength uint64, tag, application string, typ commonv2.TaskType, filteredQueryParams []string, header map[string]string, backToSourceLimit int32, options ...TaskOption) *Task { t := &Task{ ID: id, Type: typ, URL: url, + PieceLength: pieceLength, Tag: tag, Application: application, FilteredQueryParams: filteredQueryParams, diff --git a/scheduler/resource/standard/task_manager_test.go b/scheduler/resource/standard/task_manager_test.go index d61e183fd4b..72cd920acd1 100644 --- a/scheduler/resource/standard/task_manager_test.go +++ b/scheduler/resource/standard/task_manager_test.go @@ -131,7 +131,7 @@ func TestTaskManager_Load(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -184,7 +184,7 @@ func TestTaskManager_Store(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -235,7 +235,7 @@ func TestTaskManager_LoadOrStore(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -288,7 +288,7 @@ func TestTaskManager_Delete(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { t.Fatal(err) @@ -349,7 +349,7 @@ func TestTaskManager_RunGC(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) taskManager, err := newTaskManager(mockTaskGCConfig, gc) if err != nil { diff --git a/scheduler/resource/standard/task_test.go b/scheduler/resource/standard/task_test.go index 0547d5c435a..2a7d99ae849 100644 --- a/scheduler/resource/standard/task_test.go +++ b/scheduler/resource/standard/task_test.go @@ -47,27 +47,29 @@ var ( CreatedAt: time.Now(), } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3") + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3") ) func TestTask_NewTask(t *testing.T) { tests := []struct { - name string - options []TaskOption - expect func(t *testing.T, task *Task) + name string + options []TaskOption + PieceLength uint64 + expect func(t *testing.T, task *Task) }{ { - name: "new task", - options: []TaskOption{}, + name: "new task", + PieceLength: 0, + options: []TaskOption{}, expect: func(t *testing.T, task *Task) { assert := assert.New(t) assert.Equal(task.ID, mockTaskID) @@ -78,7 +80,7 @@ func TestTask_NewTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, int32(0)) + assert.Equal(task.PieceLength, uint64(0)) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) @@ -93,8 +95,9 @@ func TestTask_NewTask(t *testing.T) { }, }, { - name: "new task with piece length", - options: []TaskOption{WithPieceLength(mockTaskPieceLength)}, + name: "new task with piece length", + PieceLength: mockTaskPieceLength, + options: []TaskOption{}, expect: func(t *testing.T, task *Task) { assert := assert.New(t) assert.Equal(task.ID, mockTaskID) @@ -120,8 +123,9 @@ func TestTask_NewTask(t *testing.T) { }, }, { - name: "new task with digest", - options: []TaskOption{WithDigest(mockTaskDigest)}, + name: "new task with digest", + PieceLength: 0, + options: []TaskOption{WithDigest(mockTaskDigest)}, expect: func(t *testing.T, task *Task) { assert := assert.New(t) assert.Equal(task.ID, mockTaskID) @@ -132,7 +136,7 @@ func TestTask_NewTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, int32(0)) + assert.Equal(task.PieceLength, uint64(0)) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) @@ -150,7 +154,7 @@ func TestTask_NewTask(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, tc.options...)) + tc.expect(t, NewTask(mockTaskID, mockTaskURL, tc.PieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, tc.options...)) }) } } @@ -193,7 +197,7 @@ func TestTask_LoadPeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) task.StorePeer(mockPeer) @@ -262,7 +266,7 @@ func TestTask_LoadRandomPeers(t *testing.T) { host := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, task, host) }) @@ -300,7 +304,7 @@ func TestTask_StorePeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(tc.peerID, task, mockHost) task.StorePeer(mockPeer) @@ -342,7 +346,7 @@ func TestTask_DeletePeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) task.StorePeer(mockPeer) @@ -381,7 +385,7 @@ func TestTask_PeerCount(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) tc.expect(t, mockPeer, task) @@ -479,7 +483,7 @@ func TestTask_AddPeerEdge(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -583,7 +587,7 @@ func TestTask_DeletePeerInEdges(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -685,7 +689,7 @@ func TestTask_DeletePeerOutEdges(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -772,7 +776,7 @@ func TestTask_CanAddPeerEdge(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -835,7 +839,7 @@ func TestTask_PeerDegree(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -898,7 +902,7 @@ func TestTask_PeerInDegree(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -961,7 +965,7 @@ func TestTask_PeerOutDegree(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, mockHost, task) }) @@ -1032,7 +1036,7 @@ func TestTask_HasAvailablePeer(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) tc.expect(t, task, mockPeer) @@ -1099,7 +1103,7 @@ func TestTask_LoadSeedPeer(t *testing.T) { mockSeedHost := NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) mockSeedPeer := NewPeer(mockSeedPeerID, task, mockSeedHost) @@ -1166,7 +1170,7 @@ func TestTask_IsSeedPeerFailed(t *testing.T) { mockSeedHost := NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) mockSeedPeer := NewPeer(mockSeedPeerID, task, mockSeedHost) @@ -1221,7 +1225,7 @@ func TestTask_LoadPiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.StorePiece(tc.piece) piece, loaded := task.LoadPiece(tc.pieceNumber) @@ -1258,7 +1262,7 @@ func TestTask_StorePiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.StorePiece(tc.piece) piece, loaded := task.LoadPiece(tc.pieceNumber) @@ -1299,7 +1303,7 @@ func TestTask_DeletePiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.StorePiece(tc.piece) task.DeletePiece(tc.pieceNumber) @@ -1379,7 +1383,7 @@ func TestTask_SizeScope(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) task.ContentLength.Store(tc.contentLength) task.TotalPieceCount.Store(tc.totalPieceCount) tc.expect(t, task) @@ -1431,7 +1435,7 @@ func TestTask_CanBackToSource(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, tc.backToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, tc.backToSourceLimit) tc.run(t, task) }) } @@ -1472,7 +1476,7 @@ func TestTask_CanReuseDirectPiece(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) tc.expect(t, task) }) } @@ -1553,7 +1557,7 @@ func TestTask_ReportPieceResultToPeers(t *testing.T) { mockHost := NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) + task := NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) mockPeer := NewPeer(mockPeerID, task, mockHost) task.StorePeer(mockPeer) tc.run(t, task, mockPeer, stream, stream.EXPECT()) diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 6a0b868770b..8495e38be56 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -134,20 +134,20 @@ var ( Platform: "darwin", } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) - mockHostLocation = "bas" - mockHostIDC = "baz" - mockPeerID = idgen.PeerIDV2() + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "bas" + mockHostIDC = "baz" + mockPeerID = idgen.PeerIDV2() ) func TestEvaluatorBase_newEvaluatorBase(t *testing.T) { @@ -184,7 +184,7 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "parents is empty", parents: []*standard.Peer{}, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -201,13 +201,13 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate single parent", parents: []*standard.Peer{ standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -226,33 +226,33 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate parents with free upload count", parents: []*standard.Peer{ standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -277,33 +277,33 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate parents with pieces", parents: []*standard.Peer{ standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -347,12 +347,12 @@ func TestEvaluatorBase_evaluate(t *testing.T) { { name: "evaluate parent", parent: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -367,12 +367,12 @@ func TestEvaluatorBase_evaluate(t *testing.T) { { name: "evaluate parent with pieces", parent: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -400,7 +400,7 @@ func TestEvaluatorBase_calculatePieceScore(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) tests := []struct { name string @@ -562,7 +562,7 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) { host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) mockPeer := standard.NewPeer(mockPeerID, mockTask, host) e := newEvaluatorBase() tc.mock(host) @@ -612,7 +612,7 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) { host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) mockPeer := standard.NewPeer(mockPeerID, mockTask, host) e := newEvaluatorBase() tc.mock(host, mockPeer) @@ -664,7 +664,7 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) e := newEvaluatorBase() tc.mock(peer) @@ -878,7 +878,7 @@ func TestEvaluatorBase_IsBadParent(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) tests := []struct { name string diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 5f97b01ad5a..1a96f2f6f02 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -162,22 +162,22 @@ var ( Platform: "darwin", } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) - mockHostLocation = "baz" - mockHostIDC = "bas" - mockPeerID = idgen.PeerIDV2() - mockSeedPeerID = idgen.PeerIDV2() - mockPiece = standard.Piece{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "baz" + mockHostIDC = "bas" + mockPeerID = idgen.PeerIDV2() + mockSeedPeerID = idgen.PeerIDV2() + mockPiece = standard.Piece{ Number: 1, ParentID: "foo", Offset: 2, @@ -439,7 +439,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) mockSeedHost := standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -710,7 +710,7 @@ func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) mockSeedHost := standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -1039,7 +1039,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*standard.Peer @@ -1357,7 +1357,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*standard.Peer @@ -1619,7 +1619,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*standard.Peer @@ -1764,7 +1764,7 @@ func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) candidateParents := []*standard.Peer{standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost, standard.WithRange(nethttp.Range{ Start: 1, Length: 10, @@ -1814,7 +1814,7 @@ func TestScheduling_constructSuccessPeerPacket(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) parent := standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost) diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 50c9590e361..462ab72ac60 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -340,7 +340,8 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ options = append(options, resource.WithDigest(d)) } - task := resource.NewTask(taskID, req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), types.TaskTypeV1ToV2(req.GetTaskType()), + // Piece length is not supported in Protocol V1, use default value 0. + task := resource.NewTask(taskID, req.GetUrl(), 0, req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), types.TaskTypeV1ToV2(req.GetTaskType()), strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator), req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) task, _ = v.resource.TaskManager().LoadOrStore(task) host := v.storeHost(ctx, req.GetPeerHost()) @@ -806,7 +807,8 @@ func (v *V1) storeTask(_ context.Context, req *schedulerv1.PeerTaskRequest, typ options = append(options, resource.WithDigest(d)) } - task := resource.NewTask(req.GetTaskId(), req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), + // Piece length is not supported in Protocol V1, use default value 0. + task := resource.NewTask(req.GetTaskId(), req.GetUrl(), 0, req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), typ, filteredQueryParams, req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) v.resource.TaskManager().Store(task) task.Log.Info("create new task") diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index ec051f5a57c..5f6740e5e07 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -82,22 +82,22 @@ var ( Idc: mockHostIDC, } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} - mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) - mockHostLocation = "bas" - mockHostIDC = "baz" - mockPeerID = idgen.PeerIDV2() - mockSeedPeerID = idgen.PeerIDV2() - mockPeerRange = nethttp.Range{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "bas" + mockHostIDC = "baz" + mockPeerID = idgen.PeerIDV2() + mockSeedPeerID = idgen.PeerIDV2() + mockPeerRange = nethttp.Range{ Start: 0, Length: 10, } @@ -806,7 +806,7 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -1069,7 +1069,7 @@ func TestServiceV1_ReportPieceResult(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), stream.EXPECT()) tc.expect(t, mockPeer, svc.ReportPieceResult(stream)) @@ -1218,7 +1218,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -1277,7 +1277,7 @@ func TestServiceV1_StatTask(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) taskManager := resource.NewMockTaskManager(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) task, err := svc.StatTask(context.Background(), &schedulerv1.StatTaskRequest{TaskId: mockTaskID}) @@ -1575,7 +1575,7 @@ func TestServiceV1_AnnounceTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) @@ -1771,7 +1771,7 @@ func TestServiceV1_LeaveTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) @@ -2433,7 +2433,7 @@ func TestServiceV1_LeaveHost(t *testing.T) { host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) @@ -2537,7 +2537,7 @@ func TestServiceV1_prefetchTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, task, mockHost) svc := NewV1(tc.config, res, scheduling, dynconfig) taskManager := resource.NewMockTaskManager(ctl) @@ -3006,7 +3006,7 @@ func TestServiceV1_triggerTask(t *testing.T) { mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost) seedPeer := resource.NewMockSeedPeer(ctl) @@ -3023,7 +3023,7 @@ func TestServiceV1_storeTask(t *testing.T) { { name: "task already exists", run: func(t *testing.T, svc *V1, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { - mockTask := resource.NewTask(mockTaskID, "", mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, nil, nil, mockTaskBackToSourceLimit) + mockTask := resource.NewTask(mockTaskID, "", mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, nil, nil, mockTaskBackToSourceLimit) gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), @@ -3078,7 +3078,7 @@ func TestServiceV1_storeTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, int32(0)) + assert.Equal(task.PieceLength, uint64(0)) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) @@ -3202,7 +3202,7 @@ func TestServiceV1_storePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) gomock.InOrder( @@ -3222,7 +3222,7 @@ func TestServiceV1_storePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1), @@ -3323,7 +3323,7 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, task, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig) @@ -3403,7 +3403,7 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) @@ -3418,7 +3418,7 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tests := []struct { name string @@ -3730,7 +3730,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) seedPeer := resource.NewMockSeedPeer(ctl) @@ -3854,7 +3854,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) @@ -3934,7 +3934,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) child := resource.NewPeer(mockPeerID, mockTask, mockHost) @@ -4016,7 +4016,7 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) svc.handleTaskSuccess(context.Background(), task, tc.result) @@ -4154,7 +4154,7 @@ func TestServiceV1_handleTaskFail(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) svc.handleTaskFailure(context.Background(), task, tc.backToSourceErr, tc.seedPeerErr) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 8514c8f36f8..33304f85157 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -1537,7 +1537,7 @@ func (v *V2) handleResource(_ context.Context, stream schedulerv2.Scheduler_Anno // Store new task or update task. task, loaded := v.resource.TaskManager().Load(taskID) if !loaded { - options := []standard.TaskOption{standard.WithPieceLength(int32(download.GetPieceLength()))} + options := []standard.TaskOption{} if download.GetDigest() != "" { d, err := digest.Parse(download.GetDigest()) if err != nil { @@ -1548,7 +1548,7 @@ func (v *V2) handleResource(_ context.Context, stream schedulerv2.Scheduler_Anno options = append(options, standard.WithDigest(d)) } - task = standard.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(), + task = standard.NewTask(taskID, download.GetUrl(), download.GetPieceLength(), download.GetTag(), download.GetApplication(), download.GetType(), download.GetFilteredQueryParams(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) v.resource.TaskManager().Store(task) } else { diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 675ca1049ad..a6528683053 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -427,7 +427,7 @@ func TestServiceV2_StatPeer(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) @@ -498,7 +498,7 @@ func TestServiceV2_DeletePeer(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) @@ -584,7 +584,7 @@ func TestServiceV2_StatTask(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) taskManager := standard.NewMockTaskManager(ctl) - task := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + task := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(task, taskManager, resource.EXPECT(), taskManager.EXPECT()) @@ -1608,7 +1608,7 @@ func TestServiceV2_DeleteHost(t *testing.T) { host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) mockPeer := standard.NewPeer(mockSeedPeerID, mockTask, host) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, nil, scheduling, dynconfig) @@ -1910,7 +1910,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) seedPeer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2004,7 +2004,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2097,7 +2097,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2169,7 +2169,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2243,7 +2243,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2443,7 +2443,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { mockHost.IP = ip mockHost.DownloadPort = int32(port) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2516,7 +2516,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2636,7 +2636,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2795,7 +2795,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2921,7 +2921,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3032,7 +3032,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3098,7 +3098,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3312,7 +3312,7 @@ func TestServiceV2_handleResource(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) mockPeer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3589,7 +3589,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskPieceLength, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&tc.config, resource, persistentCacheResource, scheduling, dynconfig) diff --git a/test/e2e/v2/util/file.go b/test/e2e/v2/util/file.go index c529594ea84..56c65d47fc1 100644 --- a/test/e2e/v2/util/file.go +++ b/test/e2e/v2/util/file.go @@ -117,7 +117,7 @@ func (f *File) GetTaskID(opts ...TaskIDOption) string { opt(taskIDOptions) } - return idgen.TaskIDV2(taskIDOptions.url, taskIDOptions.tag, taskIDOptions.application, taskIDOptions.filteredQueryParams) + return idgen.TaskIDV2(taskIDOptions.url, taskIDOptions.pieceLength, taskIDOptions.tag, taskIDOptions.application, taskIDOptions.filteredQueryParams) } // GetOutputPath returns the output path of the file. diff --git a/test/e2e/v2/util/task.go b/test/e2e/v2/util/task.go index 0793b0c59b3..71990e1446c 100644 --- a/test/e2e/v2/util/task.go +++ b/test/e2e/v2/util/task.go @@ -132,6 +132,8 @@ func CalculateSha256ByOutput(pods []*PodExec, output string) (string, error) { type taskID struct { // url is the url of the download task. url string + // pieceLength is the piece length of the download task. + pieceLength *uint64 // tag is the tag of the download task. tag string // appliccation is the application of the download task. @@ -150,6 +152,13 @@ func WithTaskIDURL(url string) TaskIDOption { } } +// WithTaskIDPieceLength sets the piece length of the download task. +func WithTaskIDPieceLength(pieceLength uint64) TaskIDOption { + return func(o *taskID) { + o.pieceLength = &pieceLength + } +} + // WithTaskIDTag sets the tag of the download task. func WithTaskIDTag(tag string) TaskIDOption { return func(o *taskID) {