Skip to content

Commit f01949c

Browse files
committed
feat: seed max concurrent
Signed-off-by: Jim Ma <majinjing3@gmail.com>
1 parent 75d6242 commit f01949c

File tree

5 files changed

+40
-5
lines changed

5 files changed

+40
-5
lines changed

client/daemon/daemon.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
360360
}
361361

362362
rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, peerExchangeRPC, schedulerClient,
363-
opt.Download.RecursiveConcurrent.GoroutineCount, opt.Download.CacheRecursiveMetadata, downloadServerOption, peerServerOption)
363+
opt.Download.RecursiveConcurrent.GoroutineCount, opt.Download.SeedConcurrent,
364+
opt.Download.CacheRecursiveMetadata, downloadServerOption, peerServerOption)
364365
if err != nil {
365366
return nil, err
366367
}

client/daemon/rpcserver/rpcserver.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ func init() {
103103
}
104104

105105
func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager,
106-
storageManager storage.Manager, peerExchanger pex.PeerExchangeRPC, schedulerClient schedulerclient.V1, recursiveConcurrent int, cacheRecursiveMetadata time.Duration,
106+
storageManager storage.Manager, peerExchanger pex.PeerExchangeRPC, schedulerClient schedulerclient.V1,
107+
recursiveConcurrent int, seedConcurrent int64, cacheRecursiveMetadata time.Duration,
107108
downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) {
108109
s := &server{
109110
KeepAlive: util.NewKeepAlive("rpc server"),
@@ -120,7 +121,9 @@ func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager,
120121
}
121122

122123
sd := &seeder{
123-
server: s,
124+
server: s,
125+
maxConcurrent: seedConcurrent,
126+
concurrent: atomic.NewInt64(0),
124127
}
125128

126129
// set not serving by default

client/daemon/rpcserver/rpcserver_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestServer_New(t *testing.T) {
7676
mockSchedulerClient := schedulerclientmocks.NewMockV1(ctrl)
7777
var mockdownloadOpts []grpc.ServerOption
7878
var mockpeerOpts []grpc.ServerOption
79-
_, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, nil, mockSchedulerClient, 16, 0, mockdownloadOpts, mockpeerOpts)
79+
_, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, nil, mockSchedulerClient, 16, 0, 0, mockdownloadOpts, mockpeerOpts)
8080
tc.expect(t, err)
8181
})
8282
}

client/daemon/rpcserver/seeder.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"math"
2323
"time"
2424

25+
"go.uber.org/atomic"
2526
"google.golang.org/grpc/codes"
2627
"google.golang.org/grpc/status"
2728

@@ -39,7 +40,9 @@ import (
3940
)
4041

4142
type seeder struct {
42-
server *server
43+
maxConcurrent int64
44+
concurrent *atomic.Int64
45+
server *server
4346
}
4447

4548
func (s *seeder) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
@@ -55,6 +58,16 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystemv1.SeedRequest, seedsServer c
5558
printAuthInfo(seedsServer.Context())
5659
}
5760

61+
if s.maxConcurrent > 0 {
62+
if s.concurrent.Inc() > s.maxConcurrent {
63+
s.concurrent.Dec()
64+
logger.Infof("seed peer is busying, return ResourceExhausted")
65+
return status.Errorf(codes.ResourceExhausted, "seed peer is busying, limit is %d", s.maxConcurrent)
66+
}
67+
68+
defer s.concurrent.Dec()
69+
}
70+
5871
metrics.SeedPeerConcurrentDownloadGauge.Inc()
5972
defer metrics.SeedPeerConcurrentDownloadGauge.Dec()
6073
metrics.SeedPeerDownloadCount.Add(1)

client/daemon/rpcserver/seeder_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package rpcserver
1919
import (
2020
"context"
2121
"fmt"
22+
"go.uber.org/atomic"
23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
2225
"io"
2326
"net"
2427
"sync"
@@ -396,3 +399,18 @@ func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *t
396399

397400
return port, client
398401
}
402+
403+
func Test_ObtainSeedsResourceExhausted(t *testing.T) {
404+
sd := &seeder{
405+
maxConcurrent: 10,
406+
concurrent: atomic.NewInt64(10),
407+
}
408+
409+
assert := testifyassert.New(t)
410+
411+
err := sd.ObtainSeeds(nil, nil)
412+
assert.Error(err, "ObtainSeeds should return error")
413+
st, ok := status.FromError(err)
414+
assert.True(ok, "error should be status")
415+
assert.Equal(codes.ResourceExhausted, st.Code())
416+
}

0 commit comments

Comments
 (0)