Skip to content

Commit e7fdf2a

Browse files
committed
feat: add peer manager for persistent cache task
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 395fc6c commit e7fdf2a

File tree

5 files changed

+198
-62
lines changed

5 files changed

+198
-62
lines changed

internal/dflog/logger.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,15 @@ func WithTask(taskID, url string) *SugaredLoggerOnWith {
141141
}
142142
}
143143

144-
func WithPersistentCacheTask(taskID string) *SugaredLoggerOnWith {
144+
func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith {
145145
return &SugaredLoggerOnWith{
146-
withArgs: []any{"taskID", taskID},
146+
withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip},
147147
}
148148
}
149149

150-
func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith {
150+
func WithPeerID(peerID string) *SugaredLoggerOnWith {
151151
return &SugaredLoggerOnWith{
152-
withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip},
152+
withArgs: []any{"peerID", peerID},
153153
}
154154
}
155155

pkg/redis/redis.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,31 @@ func MakePersistentCacheTasksInScheduler(schedulerClusterID uint) string {
141141
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheTasksNamespace))
142142
}
143143

144+
// MakePersistentCachePeersOfPersistentCacheTaskInScheduler make persistent cache peers of persistent cache task in scheduler.
145+
func MakePersistentCachePeersOfPersistentCacheTaskInScheduler(schedulerClusterID uint, taskID string) string {
146+
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID, PersistentCachePeersNamespace))
147+
}
148+
144149
// MakePersistentCachePeerKeyInScheduler make persistent cache peer key in scheduler.
145150
func MakePersistentCachePeerKeyInScheduler(schedulerClusterID uint, peerID string) string {
146151
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID))
147152
}
148153

154+
// MakePersistentCachePeersInScheduler make persistent cache peers in scheduler.
149155
func MakePersistentCachePeersInScheduler(schedulerClusterID uint) string {
150156
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCachePeersNamespace))
151157
}
152158

159+
// MakePersistentCacheHostsOfPersistentCachePeerInScheduler make persistent cache hosts of persistent cache peer in scheduler.
160+
func MakePersistentCacheHostsOfPersistentCachePeerInScheduler(schedulerClusterID uint, peerID string) string {
161+
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID, PersistentCacheHostsNamespace))
162+
}
163+
164+
// MakePersistentCacheTasksOfPersistentCachePeerInScheduler make persistent cache tasks of persistent cache peer in scheduler.
165+
func MakePersistentCacheTasksOfPersistentCachePeerInScheduler(schedulerClusterID uint, peerID string) string {
166+
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID, PersistentCacheTasksNamespace))
167+
}
168+
153169
// MakePersistentCacheHostKeyInScheduler make persistent cache host key in scheduler.
154170
func MakePersistentCacheHostKeyInScheduler(schedulerClusterID uint, hostID string) string {
155171
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID))
@@ -159,3 +175,8 @@ func MakePersistentCacheHostKeyInScheduler(schedulerClusterID uint, hostID strin
159175
func MakePersistentCacheHostsInScheduler(schedulerClusterID uint) string {
160176
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheHostsNamespace))
161177
}
178+
179+
// MakePersistentCachePeersOfPersistentCacheHostInScheduler make persistent cache peers of persistent cache host in scheduler.
180+
func MakePersistentCachePeersOfPersistentCacheHostInScheduler(schedulerClusterID uint, hostID string) string {
181+
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID, PersistentCachePeersNamespace))
182+
}

scheduler/resource/persistentcache/host_manager.go

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package persistentcache
2020

2121
import (
2222
"context"
23-
"fmt"
2423
"strconv"
2524
"time"
2625

@@ -65,138 +64,139 @@ func newHostManager(cfg *config.Config, rdb redis.UniversalClient) HostManager {
6564

6665
// Load returns host for a key.
6766
func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
67+
log := logger.WithHostID(hostID)
6868
rawHost, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result()
6969
if err != nil {
70-
fmt.Println("getting host failed from Redis:", err)
70+
log.Errorf("getting host failed from redis:", err)
7171
return nil, false
7272
}
7373

7474
// Set integer fields from raw host.
7575
port, err := strconv.ParseInt(rawHost["port"], 10, 32)
7676
if err != nil {
77-
fmt.Println("parsing port failed:", err)
77+
log.Errorf("parsing port failed:", err)
7878
return nil, false
7979
}
8080

8181
downloadPort, err := strconv.ParseInt(rawHost["download_port"], 10, 32)
8282
if err != nil {
83-
fmt.Println("parsing download port failed:", err)
83+
log.Errorf("parsing download port failed:", err)
8484
return nil, false
8585
}
8686

8787
concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32)
8888
if err != nil {
89-
fmt.Println("parsing concurrent upload limit failed:", err)
89+
log.Errorf("parsing concurrent upload limit failed:", err)
9090
return nil, false
9191
}
9292

9393
concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32)
9494
if err != nil {
95-
fmt.Println("parsing concurrent upload count failed:", err)
95+
log.Errorf("parsing concurrent upload count failed:", err)
9696
return nil, false
9797
}
9898

9999
uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64)
100100
if err != nil {
101-
fmt.Println("parsing upload count failed:", err)
101+
log.Errorf("parsing upload count failed:", err)
102102
return nil, false
103103
}
104104

105105
uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64)
106106
if err != nil {
107-
fmt.Println("parsing upload failed count failed:", err)
107+
log.Errorf("parsing upload failed count failed:", err)
108108
return nil, false
109109
}
110110

111111
// Set boolean fields from raw host.
112112
diableShared, err := strconv.ParseBool(rawHost["disable_shared"])
113113
if err != nil {
114-
fmt.Println("parsing disable shared failed:", err)
114+
log.Errorf("parsing disable shared failed:", err)
115115
return nil, false
116116
}
117117

118118
// Set cpu fields from raw host.
119119
cpuLogicalCount, err := strconv.ParseUint(rawHost["cpu_logical_count"], 10, 32)
120120
if err != nil {
121-
fmt.Println("parsing cpu logical count failed:", err)
121+
log.Errorf("parsing cpu logical count failed:", err)
122122
return nil, false
123123
}
124124

125125
cpuPhysicalCount, err := strconv.ParseUint(rawHost["cpu_physical_count"], 10, 32)
126126
if err != nil {
127-
fmt.Println("parsing cpu physical count failed:", err)
127+
log.Errorf("parsing cpu physical count failed:", err)
128128
return nil, false
129129
}
130130

131131
cpuPercent, err := strconv.ParseFloat(rawHost["cpu_percent"], 64)
132132
if err != nil {
133-
fmt.Println("parsing cpu percent failed:", err)
133+
log.Errorf("parsing cpu percent failed:", err)
134134
return nil, false
135135
}
136136

137137
cpuProcessPercent, err := strconv.ParseFloat(rawHost["cpu_processe_percent"], 64)
138138
if err != nil {
139-
fmt.Println("parsing cpu process percent failed:", err)
139+
log.Errorf("parsing cpu process percent failed:", err)
140140
return nil, false
141141
}
142142

143143
cpuTimesUser, err := strconv.ParseFloat(rawHost["cpu_times_user"], 64)
144144
if err != nil {
145-
fmt.Println("parsing cpu times user failed:", err)
145+
log.Errorf("parsing cpu times user failed:", err)
146146
return nil, false
147147
}
148148

149149
cpuTimesSystem, err := strconv.ParseFloat(rawHost["cpu_times_system"], 64)
150150
if err != nil {
151-
fmt.Println("parsing cpu times system failed:", err)
151+
log.Errorf("parsing cpu times system failed:", err)
152152
return nil, false
153153
}
154154

155155
cpuTimesIdle, err := strconv.ParseFloat(rawHost["cpu_times_idle"], 64)
156156
if err != nil {
157-
fmt.Println("parsing cpu times idle failed:", err)
157+
log.Errorf("parsing cpu times idle failed:", err)
158158
return nil, false
159159
}
160160

161161
cpuTimesNice, err := strconv.ParseFloat(rawHost["cpu_times_nice"], 64)
162162
if err != nil {
163-
fmt.Println("parsing cpu times nice failed:", err)
163+
log.Errorf("parsing cpu times nice failed:", err)
164164
return nil, false
165165
}
166166

167167
cpuTimesIowait, err := strconv.ParseFloat(rawHost["cpu_times_iowait"], 64)
168168
if err != nil {
169-
fmt.Println("parsing cpu times iowait failed:", err)
169+
log.Errorf("parsing cpu times iowait failed:", err)
170170
return nil, false
171171
}
172172

173173
cpuTimesIrq, err := strconv.ParseFloat(rawHost["cpu_times_irq"], 64)
174174
if err != nil {
175-
fmt.Println("parsing cpu times irq failed:", err)
175+
log.Errorf("parsing cpu times irq failed:", err)
176176
return nil, false
177177
}
178178

179179
cpuTimesSoftirq, err := strconv.ParseFloat(rawHost["cpu_times_softirq"], 64)
180180
if err != nil {
181-
fmt.Println("parsing cpu times softirq failed:", err)
181+
log.Errorf("parsing cpu times softirq failed:", err)
182182
return nil, false
183183
}
184184

185185
cpuTimesSteal, err := strconv.ParseFloat(rawHost["cpu_times_steal"], 64)
186186
if err != nil {
187-
fmt.Println("parsing cpu times steal failed:", err)
187+
log.Errorf("parsing cpu times steal failed:", err)
188188
return nil, false
189189
}
190190

191191
cpuTimesGuest, err := strconv.ParseFloat(rawHost["cpu_times_guest"], 64)
192192
if err != nil {
193-
fmt.Println("parsing cpu times guest failed:", err)
193+
log.Errorf("parsing cpu times guest failed:", err)
194194
return nil, false
195195
}
196196

197197
cpuTimesGuestNice, err := strconv.ParseFloat(rawHost["cpu_times_guest_nice"], 64)
198198
if err != nil {
199-
fmt.Println("parsing cpu times guest nice failed:", err)
199+
log.Errorf("parsing cpu times guest nice failed:", err)
200200
return nil, false
201201
}
202202

@@ -222,37 +222,37 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
222222
// Set memory fields from raw host.
223223
memoryTotal, err := strconv.ParseUint(rawHost["memory_total"], 10, 64)
224224
if err != nil {
225-
fmt.Println("parsing memory total failed:", err)
225+
log.Errorf("parsing memory total failed:", err)
226226
return nil, false
227227
}
228228

229229
memoryAvailable, err := strconv.ParseUint(rawHost["memory_available"], 10, 64)
230230
if err != nil {
231-
fmt.Println("parsing memory available failed:", err)
231+
log.Errorf("parsing memory available failed:", err)
232232
return nil, false
233233
}
234234

235235
memoryUsed, err := strconv.ParseUint(rawHost["memory_used"], 10, 64)
236236
if err != nil {
237-
fmt.Println("parsing memory used failed:", err)
237+
log.Errorf("parsing memory used failed:", err)
238238
return nil, false
239239
}
240240

241241
memoryUsedPercent, err := strconv.ParseFloat(rawHost["memory_used_percent"], 64)
242242
if err != nil {
243-
fmt.Println("parsing memory used percent failed:", err)
243+
log.Errorf("parsing memory used percent failed:", err)
244244
return nil, false
245245
}
246246

247247
memoryProcessUsedPercent, err := strconv.ParseFloat(rawHost["memory_processe_used_percent"], 64)
248248
if err != nil {
249-
fmt.Println("parsing memory process used percent failed:", err)
249+
log.Errorf("parsing memory process used percent failed:", err)
250250
return nil, false
251251
}
252252

253253
memoryFree, err := strconv.ParseUint(rawHost["memory_free"], 10, 64)
254254
if err != nil {
255-
fmt.Println("parsing memory free failed:", err)
255+
log.Errorf("parsing memory free failed:", err)
256256
return nil, false
257257
}
258258

@@ -268,37 +268,37 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
268268
// Set network fields from raw host.
269269
networkTCPConnectionCount, err := strconv.ParseUint(rawHost["network_tcp_connection_count"], 10, 32)
270270
if err != nil {
271-
fmt.Println("parsing network tcp connection count failed:", err)
271+
log.Errorf("parsing network tcp connection count failed:", err)
272272
return nil, false
273273
}
274274

275275
networkUploadTCPConnectionCount, err := strconv.ParseUint(rawHost["network_upload_tcp_connection_count"], 10, 32)
276276
if err != nil {
277-
fmt.Println("parsing network upload tcp connection count failed:", err)
277+
log.Errorf("parsing network upload tcp connection count failed:", err)
278278
return nil, false
279279
}
280280

281281
downloadRate, err := strconv.ParseUint(rawHost["network_download_rate"], 10, 64)
282282
if err != nil {
283-
fmt.Println("parsing download rate failed:", err)
283+
log.Errorf("parsing download rate failed:", err)
284284
return nil, false
285285
}
286286

287287
downloadRateLimit, err := strconv.ParseUint(rawHost["network_download_rate_limit"], 10, 64)
288288
if err != nil {
289-
fmt.Println("parsing download rate limit failed:", err)
289+
log.Errorf("parsing download rate limit failed:", err)
290290
return nil, false
291291
}
292292

293293
uploadRate, err := strconv.ParseUint(rawHost["network_upload_rate"], 10, 64)
294294
if err != nil {
295-
fmt.Println("parsing upload rate failed:", err)
295+
log.Errorf("parsing upload rate failed:", err)
296296
return nil, false
297297
}
298298

299299
uploadRateLimit, err := strconv.ParseUint(rawHost["network_upload_rate_limit"], 10, 64)
300300
if err != nil {
301-
fmt.Println("parsing upload rate limit failed:", err)
301+
log.Errorf("parsing upload rate limit failed:", err)
302302
return nil, false
303303
}
304304

@@ -316,49 +316,49 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
316316
// Set disk fields from raw host.
317317
diskTotal, err := strconv.ParseUint(rawHost["disk_total"], 10, 64)
318318
if err != nil {
319-
fmt.Println("parsing disk total failed:", err)
319+
log.Errorf("parsing disk total failed:", err)
320320
return nil, false
321321
}
322322

323323
diskFree, err := strconv.ParseUint(rawHost["disk_free"], 10, 64)
324324
if err != nil {
325-
fmt.Println("parsing disk free failed:", err)
325+
log.Errorf("parsing disk free failed:", err)
326326
return nil, false
327327
}
328328

329329
diskUsed, err := strconv.ParseUint(rawHost["disk_used"], 10, 64)
330330
if err != nil {
331-
fmt.Println("parsing disk used failed:", err)
331+
log.Errorf("parsing disk used failed:", err)
332332
return nil, false
333333
}
334334

335335
diskUsedPercent, err := strconv.ParseFloat(rawHost["disk_used_percent"], 64)
336336
if err != nil {
337-
fmt.Println("parsing disk used percent failed:", err)
337+
log.Errorf("parsing disk used percent failed:", err)
338338
return nil, false
339339
}
340340

341341
diskInodesTotal, err := strconv.ParseUint(rawHost["disk_inodes_total"], 10, 64)
342342
if err != nil {
343-
fmt.Println("parsing disk inodes total failed:", err)
343+
log.Errorf("parsing disk inodes total failed:", err)
344344
return nil, false
345345
}
346346

347347
diskInodesUsed, err := strconv.ParseUint(rawHost["disk_inodes_used"], 10, 64)
348348
if err != nil {
349-
fmt.Println("parsing disk inodes used failed:", err)
349+
log.Errorf("parsing disk inodes used failed:", err)
350350
return nil, false
351351
}
352352

353353
diskInodesFree, err := strconv.ParseUint(rawHost["disk_inodes_free"], 10, 64)
354354
if err != nil {
355-
fmt.Println("parsing disk inodes free failed:", err)
355+
log.Errorf("parsing disk inodes free failed:", err)
356356
return nil, false
357357
}
358358

359359
diskInodesUsedPercent, err := strconv.ParseFloat(rawHost["disk_inodes_used_percent"], 64)
360360
if err != nil {
361-
fmt.Println("parsing disk inodes used percent failed:", err)
361+
log.Errorf("parsing disk inodes used percent failed:", err)
362362
return nil, false
363363
}
364364

@@ -383,19 +383,19 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
383383
// Set time fields from raw host.
384384
announceInterval, err := strconv.ParseInt(rawHost["announce_interval"], 10, 32)
385385
if err != nil {
386-
fmt.Println("parsing announce interval failed:", err)
386+
log.Errorf("parsing announce interval failed:", err)
387387
return nil, false
388388
}
389389

390390
createdAt, err := time.Parse(time.RFC3339, rawHost["created_at"])
391391
if err != nil {
392-
fmt.Println("parsing created at failed:", err)
392+
log.Errorf("parsing created at failed:", err)
393393
return nil, false
394394
}
395395

396396
updatedAt, err := time.Parse(time.RFC3339, rawHost["updated_at"])
397397
if err != nil {
398-
fmt.Println("parsing updated at failed:", err)
398+
log.Errorf("parsing updated at failed:", err)
399399
return nil, false
400400
}
401401

0 commit comments

Comments
 (0)