Skip to content

Commit 41716e4

Browse files
committed
feat(persistentcache): add concurrent piece count option for peers
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 18695a8 commit 41716e4

File tree

12 files changed

+264
-150
lines changed

12 files changed

+264
-150
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.8
44

55
require (
66
cloud.google.com/go/storage v1.50.0
7-
d7y.io/api/v2 v2.1.60
7+
d7y.io/api/v2 v2.1.65
88
github.com/MysteriousPotato/go-lockable v1.0.0
99
github.com/Showmax/go-fqdn v1.0.0
1010
github.com/VividCortex/mysqlerr v1.0.0
@@ -96,7 +96,7 @@ require (
9696
golang.org/x/time v0.12.0
9797
google.golang.org/api v0.248.0
9898
google.golang.org/grpc v1.75.0
99-
google.golang.org/protobuf v1.36.7
99+
google.golang.org/protobuf v1.36.8
100100
gopkg.in/natefinch/lumberjack.v2 v2.0.0
101101
gopkg.in/yaml.v3 v3.0.1
102102
gorm.io/driver/mysql v1.4.7

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
6363
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
6464
cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4=
6565
cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI=
66-
d7y.io/api/v2 v2.1.60 h1:Imk1mw30wmTJNdqKFF5VO/eqI/t1H5PfIPPK4ryKX2U=
67-
d7y.io/api/v2 v2.1.60/go.mod h1:WzakPywEgs27gr/TwrlRarPRRK2o5nN0YrTzHDdFii8=
66+
d7y.io/api/v2 v2.1.65 h1:1o13i+aF3E566tu6zrDTl3kV3/l09VkhSgythVFc2wM=
67+
d7y.io/api/v2 v2.1.65/go.mod h1:SQgYPAV1yW4Jo0cXFE7lJyGFy4CBn5cLpQTBLed6+6o=
6868
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
6969
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
7070
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
@@ -2186,8 +2186,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
21862186
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
21872187
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
21882188
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
2189-
google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
2190-
google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
2189+
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
2190+
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
21912191
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
21922192
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
21932193
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

scheduler/resource/persistentcache/peer.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ import (
2626
logger "d7y.io/dragonfly/v2/internal/dflog"
2727
)
2828

29+
const (
30+
// defaultConcurrentPieceCount is the fallback value for concurrent pieces per peer
31+
// when ConcurrentPieceCount is not reported by the client.
32+
defaultConcurrentPieceCount uint32 = 8
33+
)
34+
2935
const (
3036
// Peer has been created but did not start running.
3137
PeerStatePending = "Pending"
@@ -69,6 +75,16 @@ const (
6975
PeerEventFailed = "Failed"
7076
)
7177

78+
// PeerOption is a functional option for persistent cache peer.
79+
type PeerOption func(peer *Peer)
80+
81+
// WithConcurrentPieceCount set ConcurrentPieceCount for peer.
82+
func WithConcurrentPieceCount(count uint32) PeerOption {
83+
return func(p *Peer) {
84+
p.ConcurrentPieceCount = count
85+
}
86+
}
87+
7288
// Peer contains content for persistent cache peer.
7389
type Peer struct {
7490
// ID is persistent cache peer id.
@@ -77,6 +93,9 @@ type Peer struct {
7793
// Persistent is whether the peer is persistent.
7894
Persistent bool
7995

96+
// ConcurrentPieceCount is the count of pieces that can be downloaded concurrently.
97+
ConcurrentPieceCount uint32
98+
8099
// Pieces is finished pieces bitset.
81100
FinishedPieces *bitset.BitSet
82101

@@ -107,18 +126,19 @@ type Peer struct {
107126

108127
// New persistent cache peer instance.
109128
func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
110-
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
129+
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, options ...PeerOption) *Peer {
111130
p := &Peer{
112-
ID: id,
113-
Persistent: persistent,
114-
FinishedPieces: finishedPieces,
115-
Task: task,
116-
Host: host,
117-
BlockParents: blockParents,
118-
Cost: cost,
119-
CreatedAt: createdAt,
120-
UpdatedAt: updatedAt,
121-
Log: logger.WithPeer(host.ID, task.ID, id),
131+
ID: id,
132+
Persistent: persistent,
133+
ConcurrentPieceCount: defaultConcurrentPieceCount,
134+
FinishedPieces: finishedPieces,
135+
Task: task,
136+
Host: host,
137+
BlockParents: blockParents,
138+
Cost: cost,
139+
CreatedAt: createdAt,
140+
UpdatedAt: updatedAt,
141+
Log: logger.WithPeer(host.ID, task.ID, id),
122142
}
123143

124144
// Initialize state machine.
@@ -155,5 +175,9 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
155175
)
156176
p.FSM.SetState(state)
157177

178+
for _, opt := range options {
179+
opt(p)
180+
}
181+
158182
return p
159183
}

scheduler/resource/persistentcache/peer_manager.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
157157
return nil, false
158158
}
159159

160+
concurrentPieceCount, err := strconv.ParseUint(rawPeer["concurrent_piece_count"], 10, 32)
161+
if err != nil {
162+
log.Errorf("parsing concurrent piece count failed: %v", err)
163+
return nil, false
164+
}
165+
160166
return NewPeer(
161167
rawPeer["id"],
162168
rawPeer["state"],
@@ -169,6 +175,7 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
169175
createdAt,
170176
updatedAt,
171177
logger.WithPeer(host.ID, task.ID, rawPeer["id"]),
178+
WithConcurrentPieceCount(uint32(concurrentPieceCount)),
172179
), true
173180
}
174181

@@ -211,6 +218,7 @@ local cost = ARGV[8]
211218
local created_at = ARGV[9]
212219
local updated_at = ARGV[10]
213220
local ttl_seconds = tonumber(ARGV[11])
221+
local concurrent_piece_count = ARGV[12]
214222
215223
-- Store peer information
216224
redis.call("HSET", peer_key,
@@ -223,7 +231,8 @@ redis.call("HSET", peer_key,
223231
"host_id", host_id,
224232
"cost", cost,
225233
"created_at", created_at,
226-
"updated_at", updated_at)
234+
"updated_at", updated_at,
235+
"concurrent_piece_count", concurrent_piece_count)
227236
228237
-- Set expiration for the peer key
229238
redis.call("EXPIRE", peer_key, ttl_seconds)
@@ -268,6 +277,7 @@ return true
268277
peer.Cost.Nanoseconds(),
269278
peer.CreatedAt.Format(time.RFC3339),
270279
peer.UpdatedAt.Format(time.RFC3339),
280+
peer.ConcurrentPieceCount,
271281
remainingTTLSeconds,
272282
}
273283

0 commit comments

Comments
 (0)