From 6b36dc1be967e08084ddaddd54dcf1690a510865 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 29 Jul 2025 14:50:29 +0800 Subject: [PATCH] feat: Moved preheatImage struct and parseManifestURL from manager/job/types.go to internal/job/image.go for better encapsulation Signed-off-by: Gaius --- internal/job/image.go | 397 ++++++++++++++++++++++++++++++++++++ internal/job/image_test.go | 69 +++++++ internal/job/job.go | 4 + manager/job/preheat.go | 326 +---------------------------- manager/job/preheat_test.go | 54 ----- manager/job/types.go | 41 ---- manager/service/job.go | 6 +- 7 files changed, 474 insertions(+), 423 deletions(-) create mode 100644 internal/job/image.go create mode 100644 internal/job/image_test.go delete mode 100644 manager/job/preheat_test.go delete mode 100644 manager/job/types.go diff --git a/internal/job/image.go b/internal/job/image.go new file mode 100644 index 00000000000..0ead44619e5 --- /dev/null +++ b/internal/job/image.go @@ -0,0 +1,397 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package job + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "regexp" + "time" + + "github.com/containerd/containerd/platforms" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/ocischema" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + registryclient "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/auth" + "github.com/docker/distribution/registry/client/transport" + typesregistry "github.com/docker/docker/api/types/registry" + "github.com/docker/docker/registry" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "go.opentelemetry.io/otel/trace" + + "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/types" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" +) + +// defaultHTTPTransport is the default http transport. +var defaultHTTPTransport = &http.Transport{ + MaxIdleConns: 400, + MaxIdleConnsPerHost: 20, + MaxConnsPerHost: 50, + IdleConnTimeout: 120 * time.Second, +} + +// accessURLRegexp is the regular expression for parsing access url. +var accessURLRegexp, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)") + +// preheatImage is image information for preheat. +type preheatImage struct { + protocol string + domain string + name string + tag string +} + +func (p *preheatImage) manifestURL() string { + return fmt.Sprintf("%s://%s/v2/%s/manifests/%s", p.protocol, p.domain, p.name, p.tag) +} + +func (p *preheatImage) blobsURL(digest string) string { + return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", p.protocol, p.domain, p.name, digest) +} + +// parseManifestURL parses a container image manifest URL into its components. +// It extracts the protocol, domain, image name, and tag from the provided URL +// using a regular expression. +func parseManifestURL(url string) (*preheatImage, error) { + r := accessURLRegexp.FindStringSubmatch(url) + if len(r) != 5 { + return nil, errors.New("parse access url failed") + } + + return &preheatImage{ + protocol: r[1], + domain: r[2], + name: r[3], + tag: r[4], + }, nil +} + +// CreatePreheatRequestsByManifestURL generates a list of preheat requests for a container image +// by fetching and parsing its manifest from a registry. It handles authentication, platform-specific +// manifest filtering, and layer extraction for preheating. +func CreatePreheatRequestsByManifestURL(ctx context.Context, args types.PreheatArgs, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]PreheatRequest, error) { + ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) + defer span.End() + + // Parse image manifest url. + image, err := parseManifestURL(args.URL) + if err != nil { + return nil, err + } + + // Background: + // Harbor uses the V1 preheat request and will carry the auth info in the headers. + options := []imageAuthClientOption{} + header := nethttp.MapToHeader(args.Headers) + if token := header.Get("Authorization"); len(token) > 0 { + options = append(options, withIssuedToken(token)) + header.Set("Authorization", token) + } + + httpClient := &http.Client{ + Timeout: registryTimeout, + Transport: &http.Transport{ + DialContext: nethttp.NewSafeDialer().DialContext, + TLSClientConfig: &tls.Config{RootCAs: rootCAs, InsecureSkipVerify: insecureSkipVerify}, + MaxIdleConns: defaultHTTPTransport.MaxIdleConns, + MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost, + MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost, + IdleConnTimeout: defaultHTTPTransport.IdleConnTimeout, + }, + } + + // Init docker auth client. + client, err := newImageAuthClient(image, httpClient, &typesregistry.AuthConfig{Username: args.Username, Password: args.Password}, options...) + if err != nil { + return nil, err + } + + // Get platform. + platform := platforms.DefaultSpec() + if args.Platform != "" { + platform, err = platforms.Parse(args.Platform) + if err != nil { + return nil, err + } + } + + // Resolve manifests for the image. + manifests, err := resolveManifests(ctx, client, image, header.Clone(), platform) + if err != nil { + return nil, err + } + + // No matching manifest for platform in the manifest list entries + if len(manifests) == 0 { + return nil, fmt.Errorf("no matching manifest for platform %s", platforms.Format(platform)) + } + + // Set authorization header + header.Set("Authorization", client.GetAuthToken()) + + // Build preheat requests for container image layers from the provided manifests. + req, err := buildPreheatRequestFromManifests(manifests, args, header.Clone(), image, rootCAs, insecureSkipVerify) + if err != nil { + return nil, err + } + + return req, nil +} + +// resolveManifests fetches and resolves container image manifests from a registry for a specified platform. +// It constructs an HTTP request to retrieve the manifest, handles authentication via headers, and processes the response +// to return manifests matching the given platform. Supports single manifests and manifest lists. +func resolveManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, image.manifestURL(), nil) + if err != nil { + return nil, err + } + + // Set header from the user request. + for key, values := range header { + for _, value := range values { + req.Header.Add(key, value) + } + } + + // Set accept header with media types. + for _, mediaType := range distribution.ManifestMediaTypes() { + req.Header.Add("Accept", mediaType) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Handle response. + if resp.StatusCode == http.StatusNotModified { + return nil, distribution.ErrManifestNotModified + } else if !registryclient.SuccessStatus(resp.StatusCode) { + return nil, registryclient.HandleErrorResponse(resp) + } + + ctHeader := resp.Header.Get("Content-Type") + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal manifest. + manifest, _, err := distribution.UnmarshalManifest(ctHeader, body) + if err != nil { + return nil, err + } + + switch v := manifest.(type) { + case *schema1.SignedManifest, *schema2.DeserializedManifest, *ocischema.DeserializedManifest: + return []distribution.Manifest{v}, nil + case *manifestlist.DeserializedManifestList: + var result []distribution.Manifest + for _, v := range filterManifests(v.Manifests, platform) { + image.tag = v.Digest.String() + manifests, err := resolveManifests(ctx, client, image, header.Clone(), platform) + if err != nil { + return nil, err + } + + result = append(result, manifests...) + } + + return result, nil + } + + return nil, errors.New("unknown manifest type") +} + +// filterManifests filters a list of manifest descriptors to return only those matching the specified platform. +// It compares the architecture and operating system of each manifest descriptor against the target platform. +func filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor { + var matches []manifestlist.ManifestDescriptor + for _, desc := range manifests { + if desc.Platform.Architecture == platform.Architecture && desc.Platform.OS == platform.OS { + matches = append(matches, desc) + } + } + + return matches +} + +// buildPreheatRequestFromManifests constructs preheat requests for container image layers from +// the provided manifests. It extracts layer URLs from the manifests and builds a PreheatRequest +// using the specified arguments, headers, and TLS settings. +func buildPreheatRequestFromManifests(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]PreheatRequest, error) { + var certificateChain [][]byte + if rootCAs != nil { + certificateChain = rootCAs.Subjects() + } + + var layerURLs []string + for _, m := range manifests { + for _, v := range m.References() { + header.Set("Accept", v.MediaType) + layerURLs = append(layerURLs, image.blobsURL(v.Digest.String())) + } + } + + layers := PreheatRequest{ + URLs: layerURLs, + PieceLength: args.PieceLength, + Tag: args.Tag, + Application: args.Application, + FilteredQueryParams: args.FilteredQueryParams, + Headers: nethttp.HeaderToMap(header), + Scope: args.Scope, + IPs: args.IPs, + Percentage: args.Percentage, + Count: args.Count, + ConcurrentTaskCount: args.ConcurrentTaskCount, + ConcurrentPeerCount: args.ConcurrentPeerCount, + CertificateChain: certificateChain, + InsecureSkipVerify: insecureSkipVerify, + Timeout: args.Timeout, + } + + return []PreheatRequest{layers}, nil +} + +// imageAuthClientOption is an option for imageAuthClient. +type imageAuthClientOption func(*imageAuthClient) + +// withIssuedToken sets the issuedToken for imageAuthClient. +func withIssuedToken(token string) imageAuthClientOption { + return func(c *imageAuthClient) { + c.issuedToken = token + } +} + +// imageAuthClient is a client for image authentication. +type imageAuthClient struct { + // issuedToken is the issued token specified in header from user request, + // there is no need to go through v2 authentication to get a new token + // if the token is not empty, just use this token to access v2 API directly. + issuedToken string + + // httpClient is the http client. + httpClient *http.Client + + // authConfig is the auth config. + authConfig *typesregistry.AuthConfig + + // interceptorTokenHandler is the token interceptor. + interceptorTokenHandler *interceptorTokenHandler +} + +// newImageAuthClient creates a new imageAuthClient. +func newImageAuthClient(image *preheatImage, httpClient *http.Client, authConfig *typesregistry.AuthConfig, opts ...imageAuthClientOption) (*imageAuthClient, error) { + d := &imageAuthClient{ + httpClient: httpClient, + authConfig: authConfig, + interceptorTokenHandler: newInterceptorTokenHandler(), + } + + for _, opt := range opts { + opt(d) + } + + if len(d.issuedToken) > 0 { + return d, nil + } + + // New a challenge manager for the supported authentication types. + challengeManager, err := registry.PingV2Registry(&url.URL{Scheme: image.protocol, Host: image.domain}, d.httpClient.Transport) + if err != nil { + return nil, err + } + + // New a credential store which always returns the same credential values. + creds := registry.NewStaticCredentialStore(d.authConfig) + + // Transport with authentication. + d.httpClient.Transport = transport.NewTransport( + d.httpClient.Transport, + auth.NewAuthorizer( + challengeManager, + auth.NewTokenHandlerWithOptions(auth.TokenHandlerOptions{ + Transport: d.httpClient.Transport, + Credentials: creds, + Scopes: []auth.Scope{auth.RepositoryScope{ + Repository: image.name, + Actions: []string{"pull"}, + }}, + ClientID: registry.AuthClientID, + }), + d.interceptorTokenHandler, + auth.NewBasicHandler(creds), + ), + ) + + return d, nil +} + +// Do sends an HTTP request and returns an HTTP response. +func (d *imageAuthClient) Do(req *http.Request) (*http.Response, error) { + return d.httpClient.Do(req) +} + +// GetAuthToken returns the bearer token. +func (d *imageAuthClient) GetAuthToken() string { + if len(d.issuedToken) > 0 { + return d.issuedToken + } + + return d.interceptorTokenHandler.GetAuthToken() +} + +// interceptorTokenHandler is a token interceptor intercept bearer token from auth handler. +type interceptorTokenHandler struct { + auth.AuthenticationHandler + token string +} + +// NewInterceptorTokenHandler returns a new InterceptorTokenHandler. +func newInterceptorTokenHandler() *interceptorTokenHandler { + return &interceptorTokenHandler{} +} + +// Scheme returns the authentication scheme. +func (h *interceptorTokenHandler) Scheme() string { + return "bearer" +} + +// AuthorizeRequest sets the Authorization header on the request. +func (h *interceptorTokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error { + h.token = req.Header.Get("Authorization") + return nil +} + +// GetAuthToken returns the bearer token. +func (h *interceptorTokenHandler) GetAuthToken() string { + return h.token +} diff --git a/internal/job/image_test.go b/internal/job/image_test.go new file mode 100644 index 00000000000..48d8c6ad933 --- /dev/null +++ b/internal/job/image_test.go @@ -0,0 +1,69 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package job + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/manager/types" +) + +func TestPreheat_CreatePreheatRequestsByManifestURL(t *testing.T) { + tests := []struct { + name string + args types.PreheatArgs + expect func(t *testing.T, layers []PreheatRequest) + }{ + { + name: "get image layers with manifest url", + args: types.PreheatArgs{ + URL: "https://registry-1.docker.io/v2/dragonflyoss/busybox/manifests/1.35.0", + Type: "image", + }, + expect: func(t *testing.T, layers []PreheatRequest) { + assert := assert.New(t) + assert.Equal(2, len(layers[0].URLs)) + }, + }, + { + name: "get image layers with multi arch image layers", + args: types.PreheatArgs{ + URL: "https://registry-1.docker.io/v2/dragonflyoss/scheduler/manifests/v2.1.0", + Platform: "linux/amd64", + }, + expect: func(t *testing.T, layers []PreheatRequest) { + assert := assert.New(t) + assert.Equal(5, len(layers[0].URLs)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + layers, err := CreatePreheatRequestsByManifestURL(context.Background(), tc.args, 30*time.Second, nil, true) + if err != nil { + t.Fatal(err) + } + + tc.expect(t, layers) + }) + } +} diff --git a/internal/job/job.go b/internal/job/job.go index 14503417fb3..df54db900a7 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -31,10 +31,14 @@ import ( machineryv1log "github.com/dragonflyoss/machinery/v1/log" machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks" "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" logger "d7y.io/dragonfly/v2/internal/dflog" ) +// tracer is a global tracer for job. +var tracer = otel.Tracer("job") + type Config struct { Addrs []string MasterName string diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 18fda067b4e..4f492589125 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -20,29 +20,13 @@ package job import ( "context" - "crypto/tls" "crypto/x509" "errors" "fmt" - "io" - "net/http" - "net/url" "time" - "github.com/containerd/containerd/platforms" - "github.com/docker/distribution" - "github.com/docker/distribution/manifest/manifestlist" - "github.com/docker/distribution/manifest/ocischema" - "github.com/docker/distribution/manifest/schema1" - "github.com/docker/distribution/manifest/schema2" - registryclient "github.com/docker/distribution/registry/client" - "github.com/docker/distribution/registry/client/auth" - "github.com/docker/distribution/registry/client/transport" - typesregistry "github.com/docker/docker/api/types/registry" - "github.com/docker/docker/registry" machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks" "github.com/google/uuid" - specs "github.com/opencontainers/image-spec/specs-go/v1" "go.opentelemetry.io/otel/trace" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -50,7 +34,6 @@ import ( "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" - nethttp "d7y.io/dragonfly/v2/pkg/net/http" ) // preheatImage is an image for preheat. @@ -69,14 +52,6 @@ const ( PreheatFileType PreheatType = "file" ) -// defaultHTTPTransport is the default http transport. -var defaultHTTPTransport = &http.Transport{ - MaxIdleConns: 400, - MaxIdleConnsPerHost: 20, - MaxConnsPerHost: 50, - IdleConnTimeout: 120 * time.Second, -} - // Preheat is an interface for preheat job. type Preheat interface { // CreatePreheat creates a preheat job. @@ -119,7 +94,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul return nil, errors.New("invalid params: url is required") } - files, err = GetImageLayers(ctx, json, p.registryTimeout, p.rootCAs, p.insecureSkipVerify) + files, err = internaljob.CreatePreheatRequestsByManifestURL(ctx, json, p.registryTimeout, p.rootCAs, p.insecureSkipVerify) if err != nil { return nil, err } @@ -218,302 +193,3 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea CreatedAt: time.Now(), }, nil } - -func GetImageLayers(ctx context.Context, args types.PreheatArgs, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]internaljob.PreheatRequest, error) { - ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) - defer span.End() - - // Parse image manifest url. - image, err := parseManifestURL(args.URL) - if err != nil { - return nil, err - } - - // Background: - // Harbor uses the V1 preheat request and will carry the auth info in the headers. - options := []imageAuthClientOption{} - header := nethttp.MapToHeader(args.Headers) - if token := header.Get("Authorization"); len(token) > 0 { - options = append(options, withIssuedToken(token)) - header.Set("Authorization", token) - } - - httpClient := &http.Client{ - Timeout: registryTimeout, - Transport: &http.Transport{ - DialContext: nethttp.NewSafeDialer().DialContext, - TLSClientConfig: &tls.Config{RootCAs: rootCAs, InsecureSkipVerify: insecureSkipVerify}, - MaxIdleConns: defaultHTTPTransport.MaxIdleConns, - MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost, - MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost, - IdleConnTimeout: defaultHTTPTransport.IdleConnTimeout, - }, - } - - // Init docker auth client. - client, err := newImageAuthClient(image, httpClient, &typesregistry.AuthConfig{Username: args.Username, Password: args.Password}, options...) - if err != nil { - return nil, err - } - - // Get platform. - platform := platforms.DefaultSpec() - if args.Platform != "" { - platform, err = platforms.Parse(args.Platform) - if err != nil { - return nil, err - } - } - - // Get manifests. - manifests, err := getManifests(ctx, client, image, header.Clone(), platform) - if err != nil { - return nil, err - } - - // no matching manifest for platform in the manifest list entries - if len(manifests) == 0 { - return nil, fmt.Errorf("no matching manifest for platform %s", platforms.Format(platform)) - } - - // set authorization header - header.Set("Authorization", client.GetAuthToken()) - - // parse image layers to preheat - layers, err := parseLayers(manifests, args, header.Clone(), image, rootCAs, insecureSkipVerify) - if err != nil { - return nil, err - } - - return layers, nil -} - -// getManifests gets manifests of image. -func getManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, image.manifestURL(), nil) - if err != nil { - return nil, err - } - - // Set header from the user request. - for key, values := range header { - for _, value := range values { - req.Header.Add(key, value) - } - } - - // Set accept header with media types. - for _, mediaType := range distribution.ManifestMediaTypes() { - req.Header.Add("Accept", mediaType) - } - - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - // Handle response. - if resp.StatusCode == http.StatusNotModified { - return nil, distribution.ErrManifestNotModified - } else if !registryclient.SuccessStatus(resp.StatusCode) { - return nil, registryclient.HandleErrorResponse(resp) - } - - ctHeader := resp.Header.Get("Content-Type") - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - // Unmarshal manifest. - manifest, _, err := distribution.UnmarshalManifest(ctHeader, body) - if err != nil { - return nil, err - } - - switch v := manifest.(type) { - case *schema1.SignedManifest, *schema2.DeserializedManifest, *ocischema.DeserializedManifest: - return []distribution.Manifest{v}, nil - case *manifestlist.DeserializedManifestList: - var result []distribution.Manifest - for _, v := range filterManifests(v.Manifests, platform) { - image.tag = v.Digest.String() - manifests, err := getManifests(ctx, client, image, header.Clone(), platform) - if err != nil { - return nil, err - } - - result = append(result, manifests...) - } - - return result, nil - } - - return nil, errors.New("unknown manifest type") -} - -// filterManifests filters manifests with platform. -func filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor { - var matches []manifestlist.ManifestDescriptor - for _, desc := range manifests { - if desc.Platform.Architecture == platform.Architecture && desc.Platform.OS == platform.OS { - matches = append(matches, desc) - } - } - - return matches -} - -// parseLayers parses layers of image. -func parseLayers(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]internaljob.PreheatRequest, error) { - var certificateChain [][]byte - if rootCAs != nil { - certificateChain = rootCAs.Subjects() - } - - var layerURLs []string - for _, m := range manifests { - for _, v := range m.References() { - header.Set("Accept", v.MediaType) - layerURLs = append(layerURLs, image.blobsURL(v.Digest.String())) - } - } - - layers := internaljob.PreheatRequest{ - URLs: layerURLs, - PieceLength: args.PieceLength, - Tag: args.Tag, - Application: args.Application, - FilteredQueryParams: args.FilteredQueryParams, - Headers: nethttp.HeaderToMap(header), - Scope: args.Scope, - IPs: args.IPs, - Percentage: args.Percentage, - Count: args.Count, - ConcurrentTaskCount: args.ConcurrentTaskCount, - ConcurrentPeerCount: args.ConcurrentPeerCount, - CertificateChain: certificateChain, - InsecureSkipVerify: insecureSkipVerify, - Timeout: args.Timeout, - } - - return []internaljob.PreheatRequest{layers}, nil -} - -// imageAuthClientOption is an option for imageAuthClient. -type imageAuthClientOption func(*imageAuthClient) - -// withIssuedToken sets the issuedToken for imageAuthClient. -func withIssuedToken(token string) imageAuthClientOption { - return func(c *imageAuthClient) { - c.issuedToken = token - } -} - -// imageAuthClient is a client for image authentication. -type imageAuthClient struct { - // issuedToken is the issued token specified in header from user request, - // there is no need to go through v2 authentication to get a new token - // if the token is not empty, just use this token to access v2 API directly. - issuedToken string - - // httpClient is the http client. - httpClient *http.Client - - // authConfig is the auth config. - authConfig *typesregistry.AuthConfig - - // interceptorTokenHandler is the token interceptor. - interceptorTokenHandler *interceptorTokenHandler -} - -// newImageAuthClient creates a new imageAuthClient. -func newImageAuthClient(image *preheatImage, httpClient *http.Client, authConfig *typesregistry.AuthConfig, opts ...imageAuthClientOption) (*imageAuthClient, error) { - d := &imageAuthClient{ - httpClient: httpClient, - authConfig: authConfig, - interceptorTokenHandler: newInterceptorTokenHandler(), - } - - for _, opt := range opts { - opt(d) - } - - // Return earlier if issued token is not empty. - if len(d.issuedToken) > 0 { - return d, nil - } - - // New a challenge manager for the supported authentication types. - challengeManager, err := registry.PingV2Registry(&url.URL{Scheme: image.protocol, Host: image.domain}, d.httpClient.Transport) - if err != nil { - return nil, err - } - - // New a credential store which always returns the same credential values. - creds := registry.NewStaticCredentialStore(d.authConfig) - - // Transport with authentication. - d.httpClient.Transport = transport.NewTransport( - d.httpClient.Transport, - auth.NewAuthorizer( - challengeManager, - auth.NewTokenHandlerWithOptions(auth.TokenHandlerOptions{ - Transport: d.httpClient.Transport, - Credentials: creds, - Scopes: []auth.Scope{auth.RepositoryScope{ - Repository: image.name, - Actions: []string{"pull"}, - }}, - ClientID: registry.AuthClientID, - }), - d.interceptorTokenHandler, - auth.NewBasicHandler(creds), - ), - ) - - return d, nil -} - -// Do sends an HTTP request and returns an HTTP response. -func (d *imageAuthClient) Do(req *http.Request) (*http.Response, error) { - return d.httpClient.Do(req) -} - -// GetAuthToken returns the bearer token. -func (d *imageAuthClient) GetAuthToken() string { - if len(d.issuedToken) > 0 { - return d.issuedToken - } - - return d.interceptorTokenHandler.GetAuthToken() -} - -// interceptorTokenHandler is a token interceptor -// intercept bearer token from auth handler. -type interceptorTokenHandler struct { - auth.AuthenticationHandler - token string -} - -// NewInterceptorTokenHandler returns a new InterceptorTokenHandler. -func newInterceptorTokenHandler() *interceptorTokenHandler { - return &interceptorTokenHandler{} -} - -// Scheme returns the authentication scheme. -func (h *interceptorTokenHandler) Scheme() string { - return "bearer" -} - -// AuthorizeRequest sets the Authorization header on the request. -func (h *interceptorTokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error { - h.token = req.Header.Get("Authorization") - return nil -} - -// GetAuthToken returns the bearer token. -func (h *interceptorTokenHandler) GetAuthToken() string { - return h.token -} diff --git a/manager/job/preheat_test.go b/manager/job/preheat_test.go deleted file mode 100644 index d96aca7ffc5..00000000000 --- a/manager/job/preheat_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package job - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - internaljob "d7y.io/dragonfly/v2/internal/job" - "d7y.io/dragonfly/v2/manager/types" -) - -func TestPreheat_GetImageLayers(t *testing.T) { - tests := []struct { - name string - args types.PreheatArgs - expect func(t *testing.T, layers []internaljob.PreheatRequest) - }{ - { - name: "get image layers with manifest url", - args: types.PreheatArgs{ - URL: "https://registry-1.docker.io/v2/dragonflyoss/busybox/manifests/1.35.0", - Type: "image", - }, - expect: func(t *testing.T, layers []internaljob.PreheatRequest) { - assert := assert.New(t) - assert.Equal(2, len(layers[0].URLs)) - }, - }, - { - name: "get image layers with multi arch image layers", - args: types.PreheatArgs{ - URL: "https://registry-1.docker.io/v2/dragonflyoss/scheduler/manifests/v2.1.0", - Platform: "linux/amd64", - }, - expect: func(t *testing.T, layers []internaljob.PreheatRequest) { - assert := assert.New(t) - assert.Equal(5, len(layers[0].URLs)) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - layers, err := GetImageLayers(context.Background(), tc.args, 30*time.Second, nil, true) - if err != nil { - t.Fatal(err) - } - - tc.expect(t, layers) - }) - } -} diff --git a/manager/job/types.go b/manager/job/types.go deleted file mode 100644 index 0d829b9ada4..00000000000 --- a/manager/job/types.go +++ /dev/null @@ -1,41 +0,0 @@ -package job - -import ( - "errors" - "fmt" - "regexp" -) - -// accessURLRegexp is the regular expression for parsing access url. -var accessURLRegexp, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)") - -// preheatImage is image information for preheat. -type preheatImage struct { - protocol string - domain string - name string - tag string -} - -func (p *preheatImage) manifestURL() string { - return fmt.Sprintf("%s://%s/v2/%s/manifests/%s", p.protocol, p.domain, p.name, p.tag) -} - -func (p *preheatImage) blobsURL(digest string) string { - return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", p.protocol, p.domain, p.name, digest) -} - -// parseManifestURL parses manifest url. -func parseManifestURL(url string) (*preheatImage, error) { - r := accessURLRegexp.FindStringSubmatch(url) - if len(r) != 5 { - return nil, errors.New("parse access url failed") - } - - return &preheatImage{ - protocol: r[1], - domain: r[2], - name: r[3], - tag: r[4], - }, nil -} diff --git a/manager/service/job.go b/manager/service/job.go index 536628a7a88..f3da1ce4f39 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -227,7 +227,7 @@ func (s *service) CreateGetImageDistributionJob(ctx context.Context, json types. json.Args.ConcurrentLayerCount = types.DefaultPreheatConcurrentLayerCount } - imageLayers, err := s.getImageLayers(ctx, json) + imageLayers, err := s.createPreheatRequestsByManifestURL(ctx, json) if err != nil { err = fmt.Errorf("get image layers failed: %w", err) logger.Error(err) @@ -281,7 +281,7 @@ func (s *service) CreateGetImageDistributionJob(ctx context.Context, json types. }, nil } -func (s *service) getImageLayers(ctx context.Context, json types.CreateGetImageDistributionJobRequest) ([]internaljob.PreheatRequest, error) { +func (s *service) createPreheatRequestsByManifestURL(ctx context.Context, json types.CreateGetImageDistributionJobRequest) ([]internaljob.PreheatRequest, error) { var certPool *x509.CertPool if len(s.config.Job.Preheat.TLS.CACert) != 0 { certPool = x509.NewCertPool() @@ -290,7 +290,7 @@ func (s *service) getImageLayers(ctx context.Context, json types.CreateGetImageD } } - layers, err := job.GetImageLayers(ctx, types.PreheatArgs{ + layers, err := internaljob.CreatePreheatRequestsByManifestURL(ctx, types.PreheatArgs{ Type: job.PreheatImageType.String(), URL: json.Args.URL, PieceLength: json.Args.PieceLength,