Skip to content

Commit 41da9b7

Browse files
authored
feat: add RemoteIp for calling client (#4231)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 688f64c commit 41da9b7

File tree

4 files changed

+20
-6
lines changed

4 files changed

+20
-6
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.8
44

55
require (
66
cloud.google.com/go/storage v1.50.0
7-
d7y.io/api/v2 v2.1.44
7+
d7y.io/api/v2 v2.1.47
88
github.com/MysteriousPotato/go-lockable v1.0.0
99
github.com/Showmax/go-fqdn v1.0.0
1010
github.com/VividCortex/mysqlerr v1.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
6363
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
6464
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
6565
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
66-
d7y.io/api/v2 v2.1.44 h1:h762BncKXXKBd+LgU58wzFgNwLpqPxW7wrgcZJNcV+4=
67-
d7y.io/api/v2 v2.1.44/go.mod h1:IbhylQWRkqRka+oUl73Fzz331fHFIAwS2m4cMNpFWdk=
66+
d7y.io/api/v2 v2.1.47 h1:HpH8iPpaVmIP5c01yaJzMM1HytkRttij91SU8/+b4u0=
67+
d7y.io/api/v2 v2.1.47/go.mod h1:IbhylQWRkqRka+oUl73Fzz331fHFIAwS2m4cMNpFWdk=
6868
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
6969
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
7070
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=

scheduler/job/job.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ func (j *job) preheatV2SingleSeedPeer(ctx context.Context, req *internaljob.Preh
329329
func (j *job) preheatV2SingleSeedPeerByURL(ctx context.Context, url string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
330330
taskID := idgen.TaskIDV2ByURLBased(url, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
331331
filteredQueryParams := strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)
332+
advertiseIP := j.config.Server.AdvertiseIP.String()
332333
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
333334
Download: &commonv2.Download{
334335
Url: url,
@@ -341,6 +342,7 @@ func (j *job) preheatV2SingleSeedPeerByURL(ctx context.Context, url string, req
341342
RequestHeader: req.Headers,
342343
CertificateChain: req.CertificateChain,
343344
LoadToCache: req.LoadToCache,
345+
RemoteIp: &advertiseIP,
344346
}})
345347
if err != nil {
346348
log.Errorf("[preheat]: preheat failed: %s", err.Error())
@@ -421,6 +423,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
421423
return err
422424
}
423425

426+
advertiseIP := j.config.Server.AdvertiseIP.String()
424427
stream, err := dfdaemonClient.DownloadTask(
425428
ctx,
426429
taskID,
@@ -436,6 +439,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
436439
Timeout: durationpb.New(req.Timeout),
437440
CertificateChain: req.CertificateChain,
438441
LoadToCache: req.LoadToCache,
442+
RemoteIp: &advertiseIP,
439443
}})
440444
if err != nil {
441445
log.Errorf("[preheat]: preheat failed: %s", err.Error())
@@ -626,6 +630,7 @@ func (j *job) preheatAllPeers(ctx context.Context, req *internaljob.PreheatReque
626630
return err
627631
}
628632

633+
advertiseIP := j.config.Server.AdvertiseIP.String()
629634
stream, err := dfdaemonClient.DownloadTask(
630635
ctx,
631636
taskID,
@@ -641,6 +646,7 @@ func (j *job) preheatAllPeers(ctx context.Context, req *internaljob.PreheatReque
641646
Timeout: durationpb.New(req.Timeout),
642647
CertificateChain: req.CertificateChain,
643648
LoadToCache: req.LoadToCache,
649+
RemoteIp: &advertiseIP,
644650
}})
645651
if err != nil {
646652
log.Errorf("[preheat]: preheat failed: %s", err.Error())
@@ -890,8 +896,10 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
890896
continue
891897
}
892898

899+
advertiseIP := j.config.Server.AdvertiseIP.String()
893900
if err = dfdaemonClient.DeleteTask(ctx, &dfdaemonv2.DeleteTaskRequest{
894-
TaskId: req.TaskID,
901+
TaskId: req.TaskID,
902+
RemoteIp: &advertiseIP,
895903
}); err != nil {
896904
log.Errorf("delete task failed: %s", err.Error())
897905
failureTasks = append(failureTasks, &internaljob.DeleteFailureTask{

scheduler/service/service_v2.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2447,7 +2447,8 @@ func (v *V2) DeletePersistentCachePeer(_ctx context.Context, req *schedulerv2.De
24472447
return err
24482448
}
24492449

2450-
if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
2450+
advertiseIP := v.config.Server.AdvertiseIP.String()
2451+
if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID, RemoteIp: &advertiseIP}); err != nil {
24512452
peer.Log.Errorf("delete persistent cache task %s from peer %s failed %s", peer.Task.ID, peer.ID, err)
24522453
}
24532454

@@ -2653,11 +2654,13 @@ func (v *V2) downloadPersistentCacheTaskByPeer(ctx context.Context, task *persis
26532654
return err
26542655
}
26552656

2657+
advertiseIP := v.config.Server.AdvertiseIP.String()
26562658
stream, err := dfdaemonClient.DownloadPersistentCacheTask(ctx, &dfdaemonv2.DownloadPersistentCacheTaskRequest{
26572659
TaskId: task.ID,
26582660
Persistent: true,
26592661
Tag: &task.Tag,
26602662
Application: &task.Application,
2663+
RemoteIp: &advertiseIP,
26612664
})
26622665
if err != nil {
26632666
task.Log.Errorf("download persistent cache task failed %s", err)
@@ -2689,9 +2692,11 @@ func (v *V2) persistPersistentCacheTaskByPeer(ctx context.Context, peer *persist
26892692
return err
26902693
}
26912694

2695+
advertiseIP := v.config.Server.AdvertiseIP.String()
26922696
if err := dfdaemonClient.UpdatePersistentCacheTask(ctx, &dfdaemonv2.UpdatePersistentCacheTaskRequest{
26932697
TaskId: peer.Task.ID,
26942698
Persistent: true,
2699+
RemoteIp: &advertiseIP,
26952700
}); err != nil {
26962701
peer.Log.Errorf("update persistent cache task failed %s", err)
26972702
return err
@@ -2826,7 +2831,8 @@ func (v *V2) DeletePersistentCacheTask(_ctx context.Context, req *schedulerv2.De
28262831
continue
28272832
}
28282833

2829-
if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
2834+
advertiseIP := v.config.Server.AdvertiseIP.String()
2835+
if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID, RemoteIp: &advertiseIP}); err != nil {
28302836
peer.Log.Errorf("delete persistent cache task %s from peer %s failed %s", peer.Task.ID, peer.ID, err)
28312837
continue
28322838
}

0 commit comments

Comments
 (0)