Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.2.1-rc.0
image-tag: v2.2.1-rc.2
chart-name: manager
skip: "Rate Limit"
- module: scheduler
image: scheduler
image-tag: v2.2.1-rc.0
image-tag: v2.2.1-rc.2
chart-name: scheduler
skip: "Rate Limit"
- module: client
image: client
image-tag: v0.2.11
image-tag: v0.2.14
chart-name: client
skip: "Rate Limit"
- module: seed-client
image: client
image-tag: v0.2.11
image-tag: v0.2.14
chart-name: seed-client
skip: "Rate Limit"

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.27
d7y.io/api/v2 v2.1.30
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.27 h1:bEiAnNNs944DrV1rZP4m1BR9vh8KkBKLwlyVv+kOQoQ=
d7y.io/api/v2 v2.1.27/go.mod h1:RFsSiRgEBfMrSHlrZWbkJ3xil8fnbbGSj0UMwbUUC5I=
d7y.io/api/v2 v2.1.30 h1:KCHcg2oWX8jSFwJXuxgmNCPlPx29/2tcMXHT7wVKqWo=
d7y.io/api/v2 v2.1.30/go.mod h1:RFsSiRgEBfMrSHlrZWbkJ3xil8fnbbGSj0UMwbUUC5I=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
1 change: 1 addition & 0 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// PreheatRequest defines the request parameters for preheating.
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
PieceLength *uint64 `json:"pieceLength" binding:"omitempty,gte=4194304"`
Tag string `json:"tag" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
files = []internaljob.PreheatRequest{
{
URL: json.URL,
PieceLength: json.PieceLength,
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
Expand Down Expand Up @@ -349,6 +350,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
header.Set("Accept", v.MediaType)
layer := internaljob.PreheatRequest{
URL: image.blobsURL(v.Digest.String()),
PieceLength: args.PieceLength,
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
Expand Down
4 changes: 2 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
Expand Down
18 changes: 18 additions & 0 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ type PreheatArgs struct {
// URL is the image url for preheating.
URL string `json:"url" binding:"required"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag for preheating.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -177,6 +183,12 @@ type GetTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -211,6 +223,12 @@ type DeleteTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down
10 changes: 8 additions & 2 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package idgen

import (
"strconv"
"strings"

commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
Expand Down Expand Up @@ -91,11 +92,16 @@ func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url string, pieceLength *uint64, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, tag, application)
params := []string{url, tag, application}
if pieceLength != nil {
params = append(params, strconv.FormatUint(*pieceLength, 10))
}

return pkgdigest.SHA256FromStrings(params...)
}
25 changes: 24 additions & 1 deletion pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ func TestTaskIDV1(t *testing.T) {
}

func TestTaskIDV2(t *testing.T) {
pieceLength := uint64(1024)

tests := []struct {
name string
url string
pieceLength *uint64
tag string
application string
filters []string
Expand All @@ -118,9 +121,20 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
pieceLength: &pieceLength,
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221")
},
},
{
name: "generate taskID with tag and application",
url: "https://example.com",
tag: "foo",
application: "bar",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
Expand All @@ -144,6 +158,15 @@ func TestTaskIDV2(t *testing.T) {
assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d")
},
},
{
name: "generate taskID with pieceLength",
url: "https://example.com",
pieceLength: &pieceLength,
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38")
},
},
{
name: "generate taskID with filters",
url: "https://example.com?foo=foo&bar=bar",
Expand All @@ -157,7 +180,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.pieceLength, tc.tag, tc.application, tc.filters))
})
}
}
7 changes: 5 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)
log.Infof("preheat %s %d request: %#v", req.URL, req.PieceLength, req)

ctx, cancel := context.WithTimeout(ctx, req.Timeout)
defer cancel()
Expand Down Expand Up @@ -299,6 +299,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -440,6 +441,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -583,6 +585,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
10 changes: 0 additions & 10 deletions scheduler/resource/standard/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ const (
// TaskOption is a functional option for task.
type TaskOption func(task *Task)

// WithPieceLength set PieceLength for task.
func WithPieceLength(pieceLength int32) TaskOption {
return func(t *Task) {
t.PieceLength = pieceLength
}
}

// WithDigest set Digest for task.
func WithDigest(d *digest.Digest) TaskOption {
return func(t *Task) {
Expand Down Expand Up @@ -127,9 +120,6 @@ type Task struct {
// Task request headers.
Header map[string]string

// Task piece length.
PieceLength int32

// DirectPiece is tiny piece data.
DirectPiece []byte

Expand Down
25 changes: 11 additions & 14 deletions scheduler/resource/standard/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ var (
CreatedAt: time.Now(),
}

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3")
mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskPieceLength uint64 = 2048
mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3")
)

func TestTask_NewTask(t *testing.T) {
Expand All @@ -78,7 +78,6 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceLength, int32(0))
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
Expand All @@ -94,7 +93,7 @@ func TestTask_NewTask(t *testing.T) {
},
{
name: "new task with piece length",
options: []TaskOption{WithPieceLength(mockTaskPieceLength)},
options: []TaskOption{},
expect: func(t *testing.T, task *Task) {
assert := assert.New(t)
assert.Equal(task.ID, mockTaskID)
Expand All @@ -105,7 +104,6 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceLength, mockTaskPieceLength)
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
Expand All @@ -132,7 +130,6 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceLength, int32(0))
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
Expand Down
Loading
Loading