Skip to content

Commit 3646f17

Browse files
committed
feat: Moved preheatImage struct and parseManifestURL from manager/job/types.go to internal/job/image.go for better encapsulation
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 1de067f commit 3646f17

File tree

6 files changed

+406
-370
lines changed

6 files changed

+406
-370
lines changed

internal/job/image.go

Lines changed: 397 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,397 @@
1+
/*
2+
* Copyright 2025 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package job
18+
19+
import (
20+
"context"
21+
"crypto/tls"
22+
"crypto/x509"
23+
"errors"
24+
"fmt"
25+
"io"
26+
"net/http"
27+
"net/url"
28+
"regexp"
29+
"time"
30+
31+
"github.com/containerd/containerd/platforms"
32+
"github.com/docker/distribution"
33+
"github.com/docker/distribution/manifest/manifestlist"
34+
"github.com/docker/distribution/manifest/ocischema"
35+
"github.com/docker/distribution/manifest/schema1"
36+
"github.com/docker/distribution/manifest/schema2"
37+
registryclient "github.com/docker/distribution/registry/client"
38+
"github.com/docker/distribution/registry/client/auth"
39+
"github.com/docker/distribution/registry/client/transport"
40+
typesregistry "github.com/docker/docker/api/types/registry"
41+
"github.com/docker/docker/registry"
42+
specs "github.com/opencontainers/image-spec/specs-go/v1"
43+
"go.opentelemetry.io/otel/trace"
44+
45+
"d7y.io/dragonfly/v2/manager/config"
46+
"d7y.io/dragonfly/v2/manager/types"
47+
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
48+
)
49+
50+
// defaultHTTPTransport is the default http transport.
51+
var defaultHTTPTransport = &http.Transport{
52+
MaxIdleConns: 400,
53+
MaxIdleConnsPerHost: 20,
54+
MaxConnsPerHost: 50,
55+
IdleConnTimeout: 120 * time.Second,
56+
}
57+
58+
// accessURLRegexp is the regular expression for parsing access url.
59+
var accessURLRegexp, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")
60+
61+
// preheatImage is image information for preheat.
62+
type preheatImage struct {
63+
protocol string
64+
domain string
65+
name string
66+
tag string
67+
}
68+
69+
func (p *preheatImage) manifestURL() string {
70+
return fmt.Sprintf("%s://%s/v2/%s/manifests/%s", p.protocol, p.domain, p.name, p.tag)
71+
}
72+
73+
func (p *preheatImage) blobsURL(digest string) string {
74+
return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", p.protocol, p.domain, p.name, digest)
75+
}
76+
77+
// parseManifestURL parses a container image manifest URL into its components.
78+
// It extracts the protocol, domain, image name, and tag from the provided URL
79+
// using a regular expression.
80+
func parseManifestURL(url string) (*preheatImage, error) {
81+
r := accessURLRegexp.FindStringSubmatch(url)
82+
if len(r) != 5 {
83+
return nil, errors.New("parse access url failed")
84+
}
85+
86+
return &preheatImage{
87+
protocol: r[1],
88+
domain: r[2],
89+
name: r[3],
90+
tag: r[4],
91+
}, nil
92+
}
93+
94+
// CreatePreheatRequestsByManifestURL generates a list of preheat requests for a container image
95+
// by fetching and parsing its manifest from a registry. It handles authentication, platform-specific
96+
// manifest filtering, and layer extraction for preheating.
97+
func CreatePreheatRequestsByManifestURL(ctx context.Context, args types.PreheatArgs, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]PreheatRequest, error) {
98+
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
99+
defer span.End()
100+
101+
// Parse image manifest url.
102+
image, err := parseManifestURL(args.URL)
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
// Background:
108+
// Harbor uses the V1 preheat request and will carry the auth info in the headers.
109+
options := []imageAuthClientOption{}
110+
header := nethttp.MapToHeader(args.Headers)
111+
if token := header.Get("Authorization"); len(token) > 0 {
112+
options = append(options, withIssuedToken(token))
113+
header.Set("Authorization", token)
114+
}
115+
116+
httpClient := &http.Client{
117+
Timeout: registryTimeout,
118+
Transport: &http.Transport{
119+
DialContext: nethttp.NewSafeDialer().DialContext,
120+
TLSClientConfig: &tls.Config{RootCAs: rootCAs, InsecureSkipVerify: insecureSkipVerify},
121+
MaxIdleConns: defaultHTTPTransport.MaxIdleConns,
122+
MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost,
123+
MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost,
124+
IdleConnTimeout: defaultHTTPTransport.IdleConnTimeout,
125+
},
126+
}
127+
128+
// Init docker auth client.
129+
client, err := newImageAuthClient(image, httpClient, &typesregistry.AuthConfig{Username: args.Username, Password: args.Password}, options...)
130+
if err != nil {
131+
return nil, err
132+
}
133+
134+
// Get platform.
135+
platform := platforms.DefaultSpec()
136+
if args.Platform != "" {
137+
platform, err = platforms.Parse(args.Platform)
138+
if err != nil {
139+
return nil, err
140+
}
141+
}
142+
143+
// Resolve manifests for the image.
144+
manifests, err := resolveManifests(ctx, client, image, header.Clone(), platform)
145+
if err != nil {
146+
return nil, err
147+
}
148+
149+
// No matching manifest for platform in the manifest list entries
150+
if len(manifests) == 0 {
151+
return nil, fmt.Errorf("no matching manifest for platform %s", platforms.Format(platform))
152+
}
153+
154+
// Set authorization header
155+
header.Set("Authorization", client.GetAuthToken())
156+
157+
// Build preheat requests for container image layers from the provided manifests.
158+
req, err := buildPreheatRequestFromManifests(manifests, args, header.Clone(), image, rootCAs, insecureSkipVerify)
159+
if err != nil {
160+
return nil, err
161+
}
162+
163+
return req, nil
164+
}
165+
166+
// resolveManifests fetches and resolves container image manifests from a registry for a specified platform.
167+
// It constructs an HTTP request to retrieve the manifest, handles authentication via headers, and processes the response
168+
// to return manifests matching the given platform. Supports single manifests and manifest lists.
169+
func resolveManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) {
170+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, image.manifestURL(), nil)
171+
if err != nil {
172+
return nil, err
173+
}
174+
175+
// Set header from the user request.
176+
for key, values := range header {
177+
for _, value := range values {
178+
req.Header.Add(key, value)
179+
}
180+
}
181+
182+
// Set accept header with media types.
183+
for _, mediaType := range distribution.ManifestMediaTypes() {
184+
req.Header.Add("Accept", mediaType)
185+
}
186+
187+
resp, err := client.Do(req)
188+
if err != nil {
189+
return nil, err
190+
}
191+
defer resp.Body.Close()
192+
193+
// Handle response.
194+
if resp.StatusCode == http.StatusNotModified {
195+
return nil, distribution.ErrManifestNotModified
196+
} else if !registryclient.SuccessStatus(resp.StatusCode) {
197+
return nil, registryclient.HandleErrorResponse(resp)
198+
}
199+
200+
ctHeader := resp.Header.Get("Content-Type")
201+
body, err := io.ReadAll(resp.Body)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
// Unmarshal manifest.
207+
manifest, _, err := distribution.UnmarshalManifest(ctHeader, body)
208+
if err != nil {
209+
return nil, err
210+
}
211+
212+
switch v := manifest.(type) {
213+
case *schema1.SignedManifest, *schema2.DeserializedManifest, *ocischema.DeserializedManifest:
214+
return []distribution.Manifest{v}, nil
215+
case *manifestlist.DeserializedManifestList:
216+
var result []distribution.Manifest
217+
for _, v := range filterManifests(v.Manifests, platform) {
218+
image.tag = v.Digest.String()
219+
manifests, err := resolveManifests(ctx, client, image, header.Clone(), platform)
220+
if err != nil {
221+
return nil, err
222+
}
223+
224+
result = append(result, manifests...)
225+
}
226+
227+
return result, nil
228+
}
229+
230+
return nil, errors.New("unknown manifest type")
231+
}
232+
233+
// filterManifests filters a list of manifest descriptors to return only those matching the specified platform.
234+
// It compares the architecture and operating system of each manifest descriptor against the target platform.
235+
func filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor {
236+
var matches []manifestlist.ManifestDescriptor
237+
for _, desc := range manifests {
238+
if desc.Platform.Architecture == platform.Architecture && desc.Platform.OS == platform.OS {
239+
matches = append(matches, desc)
240+
}
241+
}
242+
243+
return matches
244+
}
245+
246+
// buildPreheatRequestFromManifests constructs preheat requests for container image layers from
247+
// the provided manifests. It extracts layer URLs from the manifests and builds a PreheatRequest
248+
// using the specified arguments, headers, and TLS settings.
249+
func buildPreheatRequestFromManifests(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]PreheatRequest, error) {
250+
var certificateChain [][]byte
251+
if rootCAs != nil {
252+
certificateChain = rootCAs.Subjects()
253+
}
254+
255+
var layerURLs []string
256+
for _, m := range manifests {
257+
for _, v := range m.References() {
258+
header.Set("Accept", v.MediaType)
259+
layerURLs = append(layerURLs, image.blobsURL(v.Digest.String()))
260+
}
261+
}
262+
263+
layers := PreheatRequest{
264+
URLs: layerURLs,
265+
PieceLength: args.PieceLength,
266+
Tag: args.Tag,
267+
Application: args.Application,
268+
FilteredQueryParams: args.FilteredQueryParams,
269+
Headers: nethttp.HeaderToMap(header),
270+
Scope: args.Scope,
271+
IPs: args.IPs,
272+
Percentage: args.Percentage,
273+
Count: args.Count,
274+
ConcurrentTaskCount: args.ConcurrentTaskCount,
275+
ConcurrentPeerCount: args.ConcurrentPeerCount,
276+
CertificateChain: certificateChain,
277+
InsecureSkipVerify: insecureSkipVerify,
278+
Timeout: args.Timeout,
279+
}
280+
281+
return []PreheatRequest{layers}, nil
282+
}
283+
284+
// imageAuthClientOption is an option for imageAuthClient.
285+
type imageAuthClientOption func(*imageAuthClient)
286+
287+
// withIssuedToken sets the issuedToken for imageAuthClient.
288+
func withIssuedToken(token string) imageAuthClientOption {
289+
return func(c *imageAuthClient) {
290+
c.issuedToken = token
291+
}
292+
}
293+
294+
// imageAuthClient is a client for image authentication.
295+
type imageAuthClient struct {
296+
// issuedToken is the issued token specified in header from user request,
297+
// there is no need to go through v2 authentication to get a new token
298+
// if the token is not empty, just use this token to access v2 API directly.
299+
issuedToken string
300+
301+
// httpClient is the http client.
302+
httpClient *http.Client
303+
304+
// authConfig is the auth config.
305+
authConfig *typesregistry.AuthConfig
306+
307+
// interceptorTokenHandler is the token interceptor.
308+
interceptorTokenHandler *interceptorTokenHandler
309+
}
310+
311+
// newImageAuthClient creates a new imageAuthClient.
312+
func newImageAuthClient(image *preheatImage, httpClient *http.Client, authConfig *typesregistry.AuthConfig, opts ...imageAuthClientOption) (*imageAuthClient, error) {
313+
d := &imageAuthClient{
314+
httpClient: httpClient,
315+
authConfig: authConfig,
316+
interceptorTokenHandler: newInterceptorTokenHandler(),
317+
}
318+
319+
for _, opt := range opts {
320+
opt(d)
321+
}
322+
323+
if len(d.issuedToken) > 0 {
324+
return d, nil
325+
}
326+
327+
// New a challenge manager for the supported authentication types.
328+
challengeManager, err := registry.PingV2Registry(&url.URL{Scheme: image.protocol, Host: image.domain}, d.httpClient.Transport)
329+
if err != nil {
330+
return nil, err
331+
}
332+
333+
// New a credential store which always returns the same credential values.
334+
creds := registry.NewStaticCredentialStore(d.authConfig)
335+
336+
// Transport with authentication.
337+
d.httpClient.Transport = transport.NewTransport(
338+
d.httpClient.Transport,
339+
auth.NewAuthorizer(
340+
challengeManager,
341+
auth.NewTokenHandlerWithOptions(auth.TokenHandlerOptions{
342+
Transport: d.httpClient.Transport,
343+
Credentials: creds,
344+
Scopes: []auth.Scope{auth.RepositoryScope{
345+
Repository: image.name,
346+
Actions: []string{"pull"},
347+
}},
348+
ClientID: registry.AuthClientID,
349+
}),
350+
d.interceptorTokenHandler,
351+
auth.NewBasicHandler(creds),
352+
),
353+
)
354+
355+
return d, nil
356+
}
357+
358+
// Do sends an HTTP request and returns an HTTP response.
359+
func (d *imageAuthClient) Do(req *http.Request) (*http.Response, error) {
360+
return d.httpClient.Do(req)
361+
}
362+
363+
// GetAuthToken returns the bearer token.
364+
func (d *imageAuthClient) GetAuthToken() string {
365+
if len(d.issuedToken) > 0 {
366+
return d.issuedToken
367+
}
368+
369+
return d.interceptorTokenHandler.GetAuthToken()
370+
}
371+
372+
// interceptorTokenHandler is a token interceptor intercept bearer token from auth handler.
373+
type interceptorTokenHandler struct {
374+
auth.AuthenticationHandler
375+
token string
376+
}
377+
378+
// NewInterceptorTokenHandler returns a new InterceptorTokenHandler.
379+
func newInterceptorTokenHandler() *interceptorTokenHandler {
380+
return &interceptorTokenHandler{}
381+
}
382+
383+
// Scheme returns the authentication scheme.
384+
func (h *interceptorTokenHandler) Scheme() string {
385+
return "bearer"
386+
}
387+
388+
// AuthorizeRequest sets the Authorization header on the request.
389+
func (h *interceptorTokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
390+
h.token = req.Header.Get("Authorization")
391+
return nil
392+
}
393+
394+
// GetAuthToken returns the bearer token.
395+
func (h *interceptorTokenHandler) GetAuthToken() string {
396+
return h.token
397+
}

0 commit comments

Comments
 (0)