Skip to content

Commit b94a1c4

Browse files
authored
feat(manager): change refresh interval of the job rate limiter (#4063)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 6a6ea2a commit b94a1c4

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

internal/ratelimiter/job_ratelimiter.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const (
3737
jobRateLimiterSuffix = "job"
3838

3939
// defaultRefreshInterval is the default interval to refresh the rate limiters.
40-
defaultRefreshInterval = 10 * time.Minute
40+
defaultRefreshInterval = 3 * time.Minute
4141
)
4242

4343
// JobRateLimiter is an interface for a job rate limiter.
@@ -94,11 +94,13 @@ func NewJobRateLimiter(database *database.Database) (JobRateLimiter, error) {
9494
func (j *jobRateLimiter) TakeByClusterID(ctx context.Context, clusterID uint, tokens int64) (time.Duration, error) {
9595
rawLimiter, loaded := j.clusters.Load(clusterID)
9696
if !loaded {
97+
logger.Errorf("[job-rate-limiter]: cluster %d not found", clusterID)
9798
return 0, fmt.Errorf("cluster %d not found", clusterID)
9899
}
99100

100101
limiter, ok := rawLimiter.(*limiters.TokenBucket)
101102
if !ok {
103+
logger.Errorf("[job-rate-limiter]: cluster %d is not a distributed rate limiter", clusterID)
102104
return 0, fmt.Errorf("cluster %d is not a distributed rate limiter", clusterID)
103105
}
104106

@@ -123,9 +125,9 @@ func (j *jobRateLimiter) Serve() {
123125
for {
124126
select {
125127
case <-tick.C:
126-
logger.Infof("refresh job rate limiter started")
128+
logger.Infof("[job-rate-limiter]: refresh job rate limiter started")
127129
if err := j.refresh(context.Background()); err != nil {
128-
logger.Errorf("refresh job rate limiter failed: %v", err)
130+
logger.Errorf("[job-rate-limiter]: refresh job rate limiter failed: %v", err)
129131
}
130132
case <-j.done:
131133
return
@@ -149,13 +151,13 @@ func (j *jobRateLimiter) refresh(ctx context.Context) error {
149151
for _, schedulerCluster := range schedulerClusters {
150152
b, err := schedulerCluster.Config.MarshalJSON()
151153
if err != nil {
152-
logger.Errorf("marshal scheduler cluster %d config failed: %v", schedulerCluster.ID, err)
154+
logger.Errorf("[job-rate-limiter]: marshal scheduler cluster %d config failed: %v", schedulerCluster.ID, err)
153155
return err
154156
}
155157

156158
var schedulerClusterConfig types.SchedulerClusterConfig
157159
if err := json.Unmarshal(b, &schedulerClusterConfig); err != nil {
158-
logger.Errorf("unmarshal scheduler cluster %d config failed: %v", schedulerCluster.ID, err)
160+
logger.Errorf("[job-rate-limiter]: unmarshal scheduler cluster %d config failed: %v", schedulerCluster.ID, err)
159161
return err
160162
}
161163

@@ -165,7 +167,7 @@ func (j *jobRateLimiter) refresh(ctx context.Context) error {
165167
jobRateLimit = int(schedulerClusterConfig.JobRateLimit)
166168
}
167169

168-
logger.Debugf("create job rate limiter for scheduler cluster %d with rate limit %d", schedulerCluster.ID, jobRateLimit)
170+
logger.Debugf("[job-rate-limiter]: create job rate limiter for scheduler cluster %d with rate limit %d", schedulerCluster.ID, jobRateLimit)
169171
j.clusters.Store(schedulerCluster.ID,
170172
NewDistributedRateLimiter(j.database.RDB, j.key(schedulerCluster.ID)).TokenBucket(ctx, int64(jobRateLimit), time.Second))
171173
}

manager/config/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ const (
7878
DefaultLFUCacheTTL = 3 * time.Minute
7979

8080
// DefaultLFUCacheSize is default size for lfu cache.
81-
DefaultLFUCacheSize = 30 * 1000
81+
DefaultLFUCacheSize = 10 * 1000
8282
)
8383

8484
const (

0 commit comments

Comments
 (0)