Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/compatibility-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.3.1-rc.1
image-tag: v2.3.1-rc.2
chart-name: manager
skip: "Rate Limit | preheat files in cache"
- module: scheduler
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.8

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.47
d7y.io/api/v2 v2.1.55
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4=
cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI=
d7y.io/api/v2 v2.1.47 h1:HpH8iPpaVmIP5c01yaJzMM1HytkRttij91SU8/+b4u0=
d7y.io/api/v2 v2.1.47/go.mod h1:IbhylQWRkqRka+oUl73Fzz331fHFIAwS2m4cMNpFWdk=
d7y.io/api/v2 v2.1.55 h1:MM0Zpk6KqWAhwwHsGXfaCNt5sgJta6J8u5ewLVH03pQ=
d7y.io/api/v2 v2.1.55/go.mod h1:E3XBK7gLlGEGezrmQ328zYzd23js8Os3MESzaNvqrgw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
Expand Down
18 changes: 18 additions & 0 deletions internal/dflog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,24 @@ func WithDeleteTaskJobAndPeer(groupUUID, taskUUID, hostID, taskID, peerID string
}
}

func WithPreheatImage(url string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"url", url},
}
}

func WithStatImage(url string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"url", url},
}
}

func WithStatImageAndTaskID(url, taskID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"url", url, "taskID", taskID},
}
}

func (log *SugaredLoggerOnWith) With(args ...any) *SugaredLoggerOnWith {
args = append(args, log.withArgs...)
return &SugaredLoggerOnWith{
Expand Down
148 changes: 120 additions & 28 deletions internal/job/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

//go:generate mockgen -destination mocks/job_mock.go -source image.go -package mocks

package job

import (
Expand Down Expand Up @@ -43,10 +45,12 @@ import (
"go.opentelemetry.io/otel/trace"

"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/types"
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
)

// defaultRegistryTimeout is the default timeout for registry requests.
const defaultRegistryTimeout = 1 * time.Minute

// defaultHTTPTransport is the default http transport.
var defaultHTTPTransport = &http.Transport{
MaxIdleConns: 400,
Expand Down Expand Up @@ -91,33 +95,121 @@ func parseManifestURL(url string) (*preheatImage, error) {
}, nil
}

type ManifestRequest struct {
// URL is the image manifest url for preheating.
URL string

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes),
// for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified,
// the piece length will be calculated according to the file size.
PieceLength *uint64

// Tag is the tag for preheating.
Tag string

// Application is the application string for preheating.
Application string

// FilteredQueryParams is the filtered query params for preheating.
FilteredQueryParams string

// Headers is the http headers for authentication.
Headers map[string]string

// Username is the username for authentication.
Username string

// Password is the password for authentication.
Password string

// The image type preheating task can specify the image architecture type. eg: linux/amd64.
Platform string

// Scope is the scope for preheating, default is single_seed_peer.
Scope string

// IPs is a list of specific peer IPs for preheating.
// This field has the highest priority: if provided, both 'Count' and 'Percentage' will be ignored.
// Applies to 'all_peers' and 'all_seed_peers' scopes.
IPs []string

// Percentage is the percentage of available peers to preheat.
// This field has the lowest priority and is only used if both 'IPs' and 'Count' are not provided.
// It must be a value between 1 and 100 (inclusive) if provided.
// Applies to 'all_peers' and 'all_seed_peers' scopes.
Percentage *uint32

// Count is the desired number of peers to preheat.
// This field is used only when 'IPs' is not specified. It has priority over 'Percentage'.
// It must be a value between 1 and 200 (inclusive) if provided.
// Applies to 'all_peers' and 'all_seed_peers' scopes.
Count *uint32

// ConcurrentTaskCount specifies the maximum number of tasks (e.g., image layers) to preheat concurrently.
// For example, if preheating 100 layers with ConcurrentTaskCount set to 10, up to 10 layers are processed simultaneously.
// If ConcurrentPeerCount is 10 for 1000 peers, each layer is preheated by 10 peers concurrently.
// Default is 8, maximum is 100.
ConcurrentTaskCount int64

// ConcurrentPeerCount specifies the maximum number of peers to preheat concurrently for a single task (e.g., an image layer).
// For example, if preheating a layer with ConcurrentPeerCount set to 10, up to 10 peers process that layer simultaneously.
// Default is 500, maximum is 1000.
ConcurrentPeerCount int64

// Timeout is the timeout for preheating, default is 30 minutes.
Timeout time.Duration

// RootCAs is the root CAs for TLS verification.
RootCAs *x509.CertPool

// InsecureSkipVerify indicates whether to skip TLS verification.
InsecureSkipVerify bool
}

// Image implements the interface for handling container images.
type Image interface {
// CreatePreheatRequestsByManifestURL generates a list of preheat requests for a container image
// by fetching and parsing its manifest from a registry. It handles authentication, platform-specific
// manifest filtering, and layer extraction for preheating.
CreatePreheatRequestsByManifestURL(ctx context.Context, req *ManifestRequest) ([]*PreheatRequest, error)
}

// image is the implementation of the Image interface.
type image struct{}

// NewImage creates a new instance of the Image interface.
func NewImage() Image {
return &image{}
}

// CreatePreheatRequestsByManifestURL generates a list of preheat requests for a container image
// by fetching and parsing its manifest from a registry. It handles authentication, platform-specific
// manifest filtering, and layer extraction for preheating.
func CreatePreheatRequestsByManifestURL(ctx context.Context, args types.PreheatArgs, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]PreheatRequest, error) {
func (i *image) CreatePreheatRequestsByManifestURL(ctx context.Context, req *ManifestRequest) ([]*PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()

// Parse image manifest url.
image, err := parseManifestURL(args.URL)
image, err := parseManifestURL(req.URL)
if err != nil {
return nil, err
}

// Background:
// Harbor uses the V1 preheat request and will carry the auth info in the headers.
options := []imageAuthClientOption{}
header := nethttp.MapToHeader(args.Headers)
header := nethttp.MapToHeader(req.Headers)
if token := header.Get("Authorization"); len(token) > 0 {
options = append(options, withIssuedToken(token))
header.Set("Authorization", token)
}

httpClient := &http.Client{
Timeout: registryTimeout,
Timeout: defaultRegistryTimeout,
Transport: &http.Transport{
DialContext: nethttp.NewSafeDialer().DialContext,
TLSClientConfig: &tls.Config{RootCAs: rootCAs, InsecureSkipVerify: insecureSkipVerify},
TLSClientConfig: &tls.Config{RootCAs: req.RootCAs, InsecureSkipVerify: req.InsecureSkipVerify},
MaxIdleConns: defaultHTTPTransport.MaxIdleConns,
MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost,
MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost,
Expand All @@ -126,15 +218,15 @@ func CreatePreheatRequestsByManifestURL(ctx context.Context, args types.PreheatA
}

// Init docker auth client.
client, err := newImageAuthClient(image, httpClient, &typesregistry.AuthConfig{Username: args.Username, Password: args.Password}, options...)
client, err := newImageAuthClient(image, httpClient, &typesregistry.AuthConfig{Username: req.Username, Password: req.Password}, options...)
if err != nil {
return nil, err
}

// Get platform.
platform := platforms.DefaultSpec()
if args.Platform != "" {
platform, err = platforms.Parse(args.Platform)
if req.Platform != "" {
platform, err = platforms.Parse(req.Platform)
if err != nil {
return nil, err
}
Expand All @@ -155,12 +247,12 @@ func CreatePreheatRequestsByManifestURL(ctx context.Context, args types.PreheatA
header.Set("Authorization", client.GetAuthToken())

// Build preheat requests for container image layers from the provided manifests.
req, err := buildPreheatRequestFromManifests(manifests, args, header.Clone(), image, rootCAs, insecureSkipVerify)
preheatRequest, err := buildPreheatRequestFromManifests(manifests, req, header.Clone(), image)
if err != nil {
return nil, err
}

return req, nil
return preheatRequest, nil
}

// resolveManifests fetches and resolves container image manifests from a registry for a specified platform.
Expand Down Expand Up @@ -246,10 +338,10 @@ func filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs
// buildPreheatRequestFromManifests constructs preheat requests for container image layers from
// the provided manifests. It extracts layer URLs from the manifests and builds a PreheatRequest
// using the specified arguments, headers, and TLS settings.
func buildPreheatRequestFromManifests(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]PreheatRequest, error) {
func buildPreheatRequestFromManifests(manifests []distribution.Manifest, req *ManifestRequest, header http.Header, image *preheatImage) ([]*PreheatRequest, error) {
var certificateChain [][]byte
if rootCAs != nil {
certificateChain = rootCAs.Subjects()
if req.RootCAs != nil {
certificateChain = req.RootCAs.Subjects()
}

var layerURLs []string
Expand All @@ -260,25 +352,25 @@ func buildPreheatRequestFromManifests(manifests []distribution.Manifest, args ty
}
}

layers := PreheatRequest{
layers := &PreheatRequest{
URLs: layerURLs,
PieceLength: args.PieceLength,
Tag: args.Tag,
Application: args.Application,
FilteredQueryParams: args.FilteredQueryParams,
PieceLength: req.PieceLength,
Tag: req.Tag,
Application: req.Application,
FilteredQueryParams: req.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
Scope: args.Scope,
IPs: args.IPs,
Percentage: args.Percentage,
Count: args.Count,
ConcurrentTaskCount: args.ConcurrentTaskCount,
ConcurrentPeerCount: args.ConcurrentPeerCount,
Scope: req.Scope,
IPs: req.IPs,
Percentage: req.Percentage,
Count: req.Count,
ConcurrentTaskCount: req.ConcurrentTaskCount,
ConcurrentPeerCount: req.ConcurrentPeerCount,
CertificateChain: certificateChain,
InsecureSkipVerify: insecureSkipVerify,
Timeout: args.Timeout,
InsecureSkipVerify: req.InsecureSkipVerify,
Timeout: req.Timeout,
}

return []PreheatRequest{layers}, nil
return []*PreheatRequest{layers}, nil
}

// imageAuthClientOption is an option for imageAuthClient.
Expand Down
27 changes: 14 additions & 13 deletions internal/job/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,35 @@ import (
"time"

"github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/manager/types"
)

func TestPreheat_CreatePreheatRequestsByManifestURL(t *testing.T) {
tests := []struct {
name string
args types.PreheatArgs
expect func(t *testing.T, layers []PreheatRequest)
req *ManifestRequest
expect func(t *testing.T, layers []*PreheatRequest)
}{
{
name: "get image layers with manifest url",
args: types.PreheatArgs{
URL: "https://registry-1.docker.io/v2/dragonflyoss/busybox/manifests/1.35.0",
Type: "image",
req: &ManifestRequest{
URL: "https://registry-1.docker.io/v2/dragonflyoss/busybox/manifests/1.35.0",
Timeout: 30 * time.Second,
InsecureSkipVerify: true,
},
expect: func(t *testing.T, layers []PreheatRequest) {
expect: func(t *testing.T, layers []*PreheatRequest) {
assert := assert.New(t)
assert.Equal(2, len(layers[0].URLs))
},
},
{
name: "get image layers with multi arch image layers",
args: types.PreheatArgs{
URL: "https://registry-1.docker.io/v2/dragonflyoss/scheduler/manifests/v2.1.0",
Platform: "linux/amd64",
req: &ManifestRequest{
URL: "https://registry-1.docker.io/v2/dragonflyoss/scheduler/manifests/v2.1.0",
Platform: "linux/amd64",
Timeout: 30 * time.Second,
InsecureSkipVerify: true,
},
expect: func(t *testing.T, layers []PreheatRequest) {
expect: func(t *testing.T, layers []*PreheatRequest) {
assert := assert.New(t)
assert.Equal(5, len(layers[0].URLs))
},
Expand All @@ -58,7 +59,7 @@ func TestPreheat_CreatePreheatRequestsByManifestURL(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
layers, err := CreatePreheatRequestsByManifestURL(context.Background(), tc.args, 30*time.Second, nil, true)
layers, err := NewImage().CreatePreheatRequestsByManifestURL(context.Background(), tc.req)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading