Skip to content

Commit ff59f21

Browse files
authored
feat(manager): support preheating by percentage (#4053)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent bba5c52 commit ff59f21

File tree

4 files changed

+63
-12
lines changed

4 files changed

+63
-12
lines changed

internal/job/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type PreheatRequest struct {
3030
Application string `json:"application" validate:"omitempty"`
3131
Priority int32 `json:"priority" validate:"omitempty"`
3232
Scope string `json:"scope" validate:"omitempty"`
33+
Percentage *uint8 `json:"percentage" validate:"omitempty,gte=1,lte=100"`
3334
ConcurrentCount int64 `json:"concurrent_count" validate:"omitempty"`
3435
CertificateChain [][]byte `json:"certificate_chain" validate:"omitempty"`
3536
InsecureSkipVerify bool `json:"insecure_skip_verify" validate:"omitempty"`

manager/job/preheat.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
144144
FilteredQueryParams: json.FilteredQueryParams,
145145
Headers: json.Headers,
146146
Scope: json.Scope,
147+
Percentage: json.Percentage,
147148
ConcurrentCount: json.ConcurrentCount,
148149
CertificateChain: p.certificateChain,
149150
InsecureSkipVerify: p.insecureSkipVerify,
@@ -370,6 +371,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
370371
FilteredQueryParams: args.FilteredQueryParams,
371372
Headers: nethttp.HeaderToMap(header),
372373
Scope: args.Scope,
374+
Percentage: args.Percentage,
373375
ConcurrentCount: args.ConcurrentCount,
374376
CertificateChain: p.certificateChain,
375377
InsecureSkipVerify: p.insecureSkipVerify,

manager/types/job.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ type PreheatArgs struct {
145145
// Scope is the scope for preheating, default is single_seed_peer.
146146
Scope string `json:"scope" binding:"omitempty"`
147147

148+
// Percentage is the percentage of the peer to be preheated.
149+
Percentage *uint8 `json:"percentage" binding:"omitempty,gte=1,lte=100"`
150+
148151
// BatchSize is the batch size for preheating all peers, default is 50.
149152
ConcurrentCount int64 `json:"concurrent_count" binding:"omitempty,gte=1,lte=500"`
150153

scheduler/job/job.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
4040
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
4141
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
42+
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
4243

4344
logger "d7y.io/dragonfly/v2/internal/dflog"
4445
internaljob "d7y.io/dragonfly/v2/internal/job"
@@ -256,15 +257,10 @@ func (j *job) preheatSinglePeer(ctx context.Context, taskID string, req *interna
256257
// If all the seed peers download task failed, return error. If some of the seed peers download task failed, return success tasks and failure tasks.
257258
// Notify the client that the preheat is successful.
258259
func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
259-
// If seed peer is disabled, return error.
260-
if !j.config.SeedPeer.Enable {
261-
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
262-
}
263-
264260
// If scheduler has no available seed peer, return error.
265-
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
266-
if len(seedPeers) == 0 {
267-
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
261+
seedPeers, err := j.selectSeedPeers(req.Percentage, log)
262+
if err != nil {
263+
return nil, err
268264
}
269265

270266
var (
@@ -401,14 +397,40 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
401397
return nil, fmt.Errorf("all peers preheat failed: %s", msg)
402398
}
403399

400+
// selectSeedPeers selects seed peers by percentage.
401+
func (j *job) selectSeedPeers(percentage *uint8, log *logger.SugaredLoggerOnWith) ([]*managerv2.SeedPeer, error) {
402+
if !j.config.SeedPeer.Enable {
403+
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
404+
}
405+
406+
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
407+
if len(seedPeers) == 0 {
408+
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
409+
}
410+
411+
if percentage == nil {
412+
log.Infof("percentage is nil, select all seed peers, length is %d", len(seedPeers))
413+
return seedPeers, nil
414+
}
415+
416+
count := (len(seedPeers) * int(*percentage)) / 100
417+
418+
// Ensure at least one peer is selected if percentage > 0.
419+
if count == 0 && *percentage > 0 {
420+
count = 1
421+
}
422+
423+
log.Infof("select %d seed peers from %d seed peers, percentage is %d", count, len(seedPeers), *percentage)
424+
return seedPeers[:count], nil
425+
}
426+
404427
// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
405428
// If all the peers download task failed, return error. If some of the peers download task failed, return success tasks and
406429
// failure tasks. Notify the client that the preheat is successful.
407430
func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
408-
// If scheduler has no available peer, return error.
409-
peers := j.resource.HostManager().LoadAll()
410-
if len(peers) == 0 {
411-
return nil, fmt.Errorf("cluster %d scheduler %s has no available peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
431+
peers, err := j.selectPeers(req.Percentage, log)
432+
if err != nil {
433+
return nil, err
412434
}
413435

414436
var (
@@ -545,6 +567,29 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
545567
return nil, fmt.Errorf("all peers preheat failed: %s", msg)
546568
}
547569

570+
// selectPeers selects peers by percentage.
571+
func (j *job) selectPeers(percentage *uint8, log *logger.SugaredLoggerOnWith) ([]*resource.Host, error) {
572+
peers := j.resource.HostManager().LoadAll()
573+
if len(peers) == 0 {
574+
return nil, fmt.Errorf("cluster %d scheduler %s has no available peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
575+
}
576+
577+
if percentage == nil {
578+
log.Infof("percentage is nil, select all peers, length is %d", len(peers))
579+
return peers, nil
580+
}
581+
582+
count := (len(peers) * int(*percentage)) / 100
583+
584+
// Ensure at least one peer is selected if percentage > 0.
585+
if count == 0 && *percentage > 0 {
586+
count = 1
587+
}
588+
589+
log.Infof("select %d peers from %d peers, percentage is %d", count, len(peers), *percentage)
590+
return peers[:count], nil
591+
}
592+
548593
// preheatV1 preheats job by v1 grpc protocol.
549594
func (j *job) preheatV1(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
550595
urlMeta := &commonv1.UrlMeta{

0 commit comments

Comments
 (0)