Skip to content

Commit 75d6242

Browse files
authored
feat: add ratelimit for job in manager (#3480)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 2c0ae78 commit 75d6242

File tree

6 files changed

+139
-2
lines changed

6 files changed

+139
-2
lines changed

manager/config/config.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,17 @@ type GRPCConfig struct {
283283
}
284284

285285
type TCPListenPortRange struct {
286+
// Start is the start port.
286287
Start int
287-
End int
288+
289+
// End is the end port.
290+
End int
288291
}
289292

290293
type JobConfig struct {
294+
// RateLimit configuration.
295+
RateLimit RateLimitConfig `yaml:"rateLimit" mapstructure:"rateLimit"`
296+
291297
// Preheat configuration.
292298
Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"`
293299

@@ -303,6 +309,18 @@ type PreheatConfig struct {
303309
TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"`
304310
}
305311

312+
// RateLimitConfig is the configuration for rate limit.
313+
type RateLimitConfig struct {
314+
// FillInterval is the interval between each token added to the bucket.
315+
FillInterval time.Duration `yaml:"fillInterval" mapstructure:"fillInterval"`
316+
317+
// Capacity is the maximum number of tokens in the bucket.
318+
Capacity int64 `yaml:"capacity" mapstructure:"capacity"`
319+
320+
// Quantum is the number of tokens taken from the bucket for each request.
321+
Quantum int64 `yaml:"quantum" mapstructure:"quantum"`
322+
}
323+
306324
type SyncPeersConfig struct {
307325
// Interval is the interval for syncing all peers information from the scheduler and
308326
// display peers information in the manager console.
@@ -437,6 +455,11 @@ func New() *Config {
437455
},
438456
},
439457
Job: JobConfig{
458+
RateLimit: RateLimitConfig{
459+
FillInterval: DefaultJobRateLimitFillInterval,
460+
Capacity: DefaultJobRateLimitCapacity,
461+
Quantum: DefaultJobRateLimitQuantum,
462+
},
440463
Preheat: PreheatConfig{
441464
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
442465
},
@@ -606,6 +629,18 @@ func (cfg *Config) Validate() error {
606629
return errors.New("local requires parameter ttl")
607630
}
608631

632+
if cfg.Job.RateLimit.FillInterval == 0 {
633+
return errors.New("rateLimit requires parameter fillInterval")
634+
}
635+
636+
if cfg.Job.RateLimit.Capacity == 0 {
637+
return errors.New("rateLimit requires parameter capacity")
638+
}
639+
640+
if cfg.Job.RateLimit.Quantum == 0 {
641+
return errors.New("rateLimit requires parameter quantum")
642+
}
643+
609644
if cfg.Job.Preheat.TLS != nil {
610645
if cfg.Job.Preheat.TLS.CACert == "" {
611646
return errors.New("preheat requires parameter caCert")

manager/config/config_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ func TestConfig_Load(t *testing.T) {
187187
},
188188
},
189189
Job: JobConfig{
190+
RateLimit: RateLimitConfig{
191+
FillInterval: 1 * time.Second,
192+
Capacity: 1000,
193+
Quantum: 1000,
194+
},
190195
Preheat: PreheatConfig{
191196
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
192197
TLS: &PreheatTLSClientConfig{
@@ -697,6 +702,51 @@ func TestConfig_Validate(t *testing.T) {
697702
assert.EqualError(err, "local requires parameter ttl")
698703
},
699704
},
705+
{
706+
name: "rateLimit requires parameter fillInterval",
707+
config: New(),
708+
mock: func(cfg *Config) {
709+
cfg.Auth.JWT = mockJWTConfig
710+
cfg.Database.Type = DatabaseTypeMysql
711+
cfg.Database.Mysql = mockMysqlConfig
712+
cfg.Database.Redis = mockRedisConfig
713+
cfg.Job.RateLimit.FillInterval = 0
714+
},
715+
expect: func(t *testing.T, err error) {
716+
assert := assert.New(t)
717+
assert.EqualError(err, "rateLimit requires parameter fillInterval")
718+
},
719+
},
720+
{
721+
name: "rateLimit requires parameter capacity",
722+
config: New(),
723+
mock: func(cfg *Config) {
724+
cfg.Auth.JWT = mockJWTConfig
725+
cfg.Database.Type = DatabaseTypeMysql
726+
cfg.Database.Mysql = mockMysqlConfig
727+
cfg.Database.Redis = mockRedisConfig
728+
cfg.Job.RateLimit.Capacity = 0
729+
},
730+
expect: func(t *testing.T, err error) {
731+
assert := assert.New(t)
732+
assert.EqualError(err, "rateLimit requires parameter capacity")
733+
},
734+
},
735+
{
736+
name: "rateLimit requires parameter quantum",
737+
config: New(),
738+
mock: func(cfg *Config) {
739+
cfg.Auth.JWT = mockJWTConfig
740+
cfg.Database.Type = DatabaseTypeMysql
741+
cfg.Database.Mysql = mockMysqlConfig
742+
cfg.Database.Redis = mockRedisConfig
743+
cfg.Job.RateLimit.Quantum = 0
744+
},
745+
expect: func(t *testing.T, err error) {
746+
assert := assert.New(t)
747+
assert.EqualError(err, "rateLimit requires parameter quantum")
748+
},
749+
},
700750
{
701751
name: "preheat requires parameter caCert",
702752
config: New(),

manager/config/constants.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ const (
8787
)
8888

8989
const (
90+
// DefaultJobRateLimitFillInterval is the default fill interval for job rate limit.
91+
DefaultJobRateLimitFillInterval = 1 * time.Minute
92+
93+
// DefaultJobRateLimitCapacity is the default capacity for job rate limit.
94+
DefaultJobRateLimitCapacity = 100
95+
96+
// DefaultJobRateLimitQuantum is the default quantum for job rate limit.
97+
DefaultJobRateLimitQuantum = 100
98+
9099
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
91100
DefaultJobPreheatRegistryTimeout = 1 * time.Minute
92101

manager/config/testdata/manager.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ cache:
6666
ttl: 1s
6767

6868
job:
69+
rateLimit:
70+
fillInterval: 1s
71+
capacity: 1000
72+
quantum: 1000
6973
preheat:
7074
registryTimeout: 1m
7175
tls:

manager/middlewares/rate_limit.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2024 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package middlewares
18+
19+
import (
20+
"net/http"
21+
"time"
22+
23+
"github.com/gin-gonic/gin"
24+
"github.com/juju/ratelimit"
25+
)
26+
27+
func RateLimit(fillInterval time.Duration, capacity, quantum int64) gin.HandlerFunc {
28+
bucket := ratelimit.NewBucketWithQuantum(fillInterval, capacity, quantum)
29+
30+
return func(c *gin.Context) {
31+
if bucket.TakeAvailable(1) < 1 {
32+
c.String(http.StatusTooManyRequests, "rate limit exceeded")
33+
c.Abort()
34+
return
35+
}
36+
37+
c.Next()
38+
}
39+
}

0 commit comments

Comments
 (0)