Skip to content

Commit 3f0b7b0

Browse files
committed
feat: enhance preheat job concurrency control (#4233)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 6c0ebc4 commit 3f0b7b0

File tree

7 files changed

+221
-168
lines changed

7 files changed

+221
-168
lines changed

.github/workflows/compatibility-e2e.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,22 @@ jobs:
3131
include:
3232
- module: manager
3333
image: manager
34-
image-tag: v2.3.1-beta.0
34+
image-tag: v2.3.1-beta.1
3535
chart-name: manager
3636
skip: "Rate Limit | preheat files in cache"
3737
- module: scheduler
3838
image: scheduler
39-
image-tag: v2.3.1-beta.0
39+
image-tag: v2.3.1-beta.1
4040
chart-name: scheduler
4141
skip: "Rate Limit | preheat files in cache"
4242
- module: client
4343
image: client
44-
image-tag: v1.0.1
44+
image-tag: v1.0.5
4545
chart-name: client
4646
skip: "Rate Limit"
4747
- module: seed-client
4848
image: client
49-
image-tag: v1.0.1
49+
image-tag: v1.0.5
5050
chart-name: seed-client
5151
skip: "Rate Limit"
5252

internal/job/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ type PreheatRequest struct {
3737
IPs []string `json:"ips" validate:"omitempty"`
3838
Percentage *uint8 `json:"percentage" validate:"omitempty,gte=1,lte=100"`
3939
Count *uint32 `json:"count" validate:"omitempty,gte=1,lte=200"`
40-
ConcurrentCount int64 `json:"concurrent_count" validate:"omitempty"`
40+
ConcurrentTaskCount int64 `json:"concurrent_task_count" validate:"omitempty"`
41+
ConcurrentPeerCount int64 `json:"concurrent_peer_count" validate:"omitempty"`
4142
CertificateChain [][]byte `json:"certificate_chain" validate:"omitempty"`
4243
InsecureSkipVerify bool `json:"insecure_skip_verify" validate:"omitempty"`
4344
Timeout time.Duration `json:"timeout" validate:"omitempty"`

manager/job/preheat.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
149149
IPs: json.IPs,
150150
Percentage: json.Percentage,
151151
Count: json.Count,
152-
ConcurrentCount: json.ConcurrentCount,
152+
ConcurrentTaskCount: json.ConcurrentTaskCount,
153+
ConcurrentPeerCount: json.ConcurrentPeerCount,
153154
CertificateChain: certificateChain,
154155
InsecureSkipVerify: p.insecureSkipVerify,
155156
Timeout: json.Timeout,
@@ -390,7 +391,8 @@ func parseLayers(manifests []distribution.Manifest, args types.PreheatArgs, head
390391
IPs: args.IPs,
391392
Percentage: args.Percentage,
392393
Count: args.Count,
393-
ConcurrentCount: args.ConcurrentCount,
394+
ConcurrentTaskCount: args.ConcurrentTaskCount,
395+
ConcurrentPeerCount: args.ConcurrentPeerCount,
394396
CertificateChain: certificateChain,
395397
InsecureSkipVerify: insecureSkipVerify,
396398
Timeout: args.Timeout,

manager/service/job.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ const (
5050

5151
// DefaultGCJobPollingInterval is the default interval for polling GC job.
5252
DefaultGCJobPollingInterval = 5 * time.Second
53-
54-
// DefaultGetImageDistributionJobConcurrentCount is the default concurrent count for getting image distribution job.
55-
DefaultGetImageDistributionJobConcurrentCount = 12
5653
)
5754

5855
func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest) (*models.Job, error) {
@@ -119,8 +116,12 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
119116
json.Args.Scope = types.SingleSeedPeerScope
120117
}
121118

122-
if json.Args.ConcurrentCount == 0 {
123-
json.Args.ConcurrentCount = types.DefaultPreheatConcurrentCount
119+
if json.Args.ConcurrentTaskCount == 0 {
120+
json.Args.ConcurrentTaskCount = types.DefaultPreheatConcurrentTaskCount
121+
}
122+
123+
if json.Args.ConcurrentPeerCount == 0 {
124+
json.Args.ConcurrentPeerCount = types.DefaultPreheatConcurrentPeerCount
124125
}
125126

126127
if json.Args.Timeout == 0 {
@@ -222,6 +223,10 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask
222223
}
223224

224225
func (s *service) CreateGetImageDistributionJob(ctx context.Context, json types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error) {
226+
if json.Args.ConcurrentLayerCount == 0 {
227+
json.Args.ConcurrentLayerCount = types.DefaultPreheatConcurrentLayerCount
228+
}
229+
225230
imageLayers, err := s.getImageLayers(ctx, json)
226231
if err != nil {
227232
err = fmt.Errorf("get image layers failed: %w", err)
@@ -312,7 +317,7 @@ func (s *service) createGetTaskJobsSync(ctx context.Context, layers []internaljo
312317
var mu sync.Mutex
313318
jobs := make([]*models.Job, 0, len(layers))
314319
eg, ctx := errgroup.WithContext(ctx)
315-
eg.SetLimit(DefaultGetImageDistributionJobConcurrentCount)
320+
eg.SetLimit(int(json.Args.ConcurrentLayerCount))
316321
for _, file := range layers {
317322
eg.Go(func() error {
318323
job, err := s.createGetTaskJobSync(ctx, types.CreateGetTaskJobRequest{

manager/types/job.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,14 @@ const (
3030
)
3131

3232
const (
33-
// DefaultPreheatConcurrentCount is the default concurrent count for preheating all peers.
34-
DefaultPreheatConcurrentCount = 1000
33+
// DefaultPreheatConcurrentPeerCount is the default concurrent peer count for preheating all peers.
34+
DefaultPreheatConcurrentPeerCount = 500
35+
36+
// DefaultPreheatConcurrentTaskCount is the default concurrent task count for preheating all peers.
37+
DefaultPreheatConcurrentTaskCount = 8
38+
39+
// DefaultPreheatConcurrentLayerCount is the default concurrent layer count for getting image distribution.
40+
DefaultPreheatConcurrentLayerCount = 8
3541

3642
// DefaultJobTimeout is the default timeout for executing job.
3743
DefaultJobTimeout = 60 * time.Minute
@@ -161,8 +167,16 @@ type PreheatArgs struct {
161167
// Applies to 'all_peers' and 'all_seed_peers' scopes.
162168
Count *uint32 `json:"count" binding:"omitempty,gte=1,lte=200"`
163169

164-
// BatchSize is the batch size for preheating all peers, default is 50.
165-
ConcurrentCount int64 `json:"concurrent_count" binding:"omitempty,gte=1,lte=500"`
170+
// ConcurrentTaskCount specifies the maximum number of tasks (e.g., image layers) to preheat concurrently.
171+
// For example, if preheating 100 layers with ConcurrentTaskCount set to 10, up to 10 layers are processed simultaneously.
172+
// If ConcurrentPeerCount is 10 for 1000 peers, each layer is preheated by 10 peers concurrently.
173+
// Default is 8, maximum is 100.
174+
ConcurrentTaskCount int64 `json:"concurrent_task_count" binding:"omitempty,gte=1,lte=100"`
175+
176+
// ConcurrentPeerCount specifies the maximum number of peers to preheat concurrently for a single task (e.g., an image layer).
177+
// For example, if preheating a layer with ConcurrentPeerCount set to 10, up to 10 peers process that layer simultaneously.
178+
// Default is 500, maximum is 1000.
179+
ConcurrentPeerCount int64 `json:"concurrent_peer_count" binding:"omitempty,gte=1,lte=1000"`
166180

167181
// Timeout is the timeout for preheating, default is 30 minutes.
168182
Timeout time.Duration `json:"timeout" binding:"omitempty"`
@@ -277,6 +291,9 @@ type GetImageDistributionArgs struct {
277291

278292
// The image type preheating task can specify the image architecture type. eg: linux/amd64.
279293
Platform string `json:"platform" binding:"omitempty"`
294+
295+
// ConcurrentLayerCount specifies the maximum number of layers to get concurrently.
296+
ConcurrentLayerCount int64 `json:"concurrent_layer_count" binding:"omitempty,gte=1,lte=100"`
280297
}
281298

282299
// CreateGetImageDistributionJobResponse is the response for creating a get image job.

0 commit comments

Comments
 (0)