Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.16
d7y.io/api/v2 v2.1.18
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.16 h1:ql4PaC17eG0NSteu+4cijrQ7vA/P/Xki4we66Q7FbQw=
d7y.io/api/v2 v2.1.16/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.18 h1:5fpA94N7CihRdQxWPzUFx3qO7ScY76d0R2oxBtOt9PE=
d7y.io/api/v2 v2.1.18/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
8 changes: 4 additions & 4 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
port = seedPeer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
addr := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(idgen.HostIDV2(ip, hostname, true), hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
Expand Down Expand Up @@ -416,13 +416,13 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
port = peer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
addr := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(peer.ID, hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
Expand Down
66 changes: 48 additions & 18 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type PeerManager interface {
// LoadAllByTaskID returns all peers by task id.
LoadAllByTaskID(context.Context, string) ([]*Peer, error)

// LoadAllIDsByTaskID returns all peer ids by task id.
LoadAllIDsByTaskID(context.Context, string) ([]string, error)

// LoadPersistentAllByTaskID returns all persistent peers by task id.
LoadPersistentAllByTaskID(context.Context, string) ([]*Peer, error)

Expand All @@ -60,6 +63,9 @@ type PeerManager interface {
// LoadAllByHostID returns all peers by host id.
LoadAllByHostID(context.Context, string) ([]*Peer, error)

// LoadAllIDsByHostID returns all peer ids by host id.
LoadAllIDsByHostID(context.Context, string) ([]string, error)

// DeleteAllByHostID deletes all peers by host id.
DeleteAllByHostID(context.Context, string) error
}
Expand Down Expand Up @@ -301,7 +307,7 @@ func (p *peerManager) LoadAll(ctx context.Context) ([]*Peer, error) {

peerKeys, cursor, err = p.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCachePeersInScheduler(p.config.Manager.SchedulerClusterID), 10).Result()
if err != nil {
logger.Error("scan tasks failed")
logger.Errorf("scan tasks failed: %v", err)
return nil, err
}

Expand All @@ -328,15 +334,15 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
log.Errorf("load peer %s failed: %v", peerID, err)
continue
}

Expand All @@ -346,20 +352,32 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
return peers, nil
}

// LoadAllIDsByTaskID returns all peer ids by task id.
func (p *peerManager) LoadAllIDsByTaskID(ctx context.Context, taskID string) ([]string, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

return peerIDs, nil
}

// LoadPersistentAllByTaskID returns all persistent cache peers by task id.
func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
log.Errorf("load peer %s failed: %v", peerID, err)
continue
}

Expand All @@ -372,15 +390,15 @@ func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID stri
// DeleteAllByTaskID deletes all persistent cache peers by task id.
func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error {
log := logger.WithTaskID(taskID)
peers, err := p.LoadAllByTaskID(ctx, taskID)
ids, err := p.LoadAllIDsByTaskID(ctx, taskID)
if err != nil {
log.Error("load peers failed")
log.Errorf("load peers failed: %v", err)
return err
}

for _, peer := range peers {
if err := p.Delete(ctx, peer.ID); err != nil {
log.Errorf("delete peer %s failed", peer.ID)
for _, id := range ids {
if err := p.Delete(ctx, id); err != nil {
log.Errorf("delete peer %s failed: %v", id, err)
continue
}
}
Expand All @@ -393,15 +411,15 @@ func (p *peerManager) LoadAllByHostID(ctx context.Context, hostID string) ([]*Pe
log := logger.WithHostID(hostID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
log.Error("get peer ids failed")
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
log.Errorf("load peer %s failed: %v", peerID, err)
continue
}

Expand All @@ -411,18 +429,30 @@ func (p *peerManager) LoadAllByHostID(ctx context.Context, hostID string) ([]*Pe
return peers, nil
}

// LoadAllIDsByHostID returns all persistent cache peers by host id.
func (p *peerManager) LoadAllIDsByHostID(ctx context.Context, hostID string) ([]string, error) {
log := logger.WithHostID(hostID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
log.Errorf("get peer ids failed: %v", err)
return nil, err
}

return peerIDs, nil
}

// DeleteAllByHostID deletes all persistent cache peers by host id.
func (p *peerManager) DeleteAllByHostID(ctx context.Context, hostID string) error {
log := logger.WithTaskID(hostID)
peers, err := p.LoadAllByHostID(ctx, hostID)
ids, err := p.LoadAllIDsByHostID(ctx, hostID)
if err != nil {
log.Error("load peers failed")
log.Errorf("load peers failed: %v", err)
return err
}

for _, peer := range peers {
if err := p.Delete(ctx, peer.ID); err != nil {
log.Errorf("delete peer %s failed", peer.ID)
for _, id := range ids {
if err := p.Delete(ctx, id); err != nil {
log.Errorf("delete peer %s failed: %v", id, err)
continue
}
}
Expand Down
52 changes: 45 additions & 7 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,25 +2322,38 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del
log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
log.Info("delete persistent cache peer")

task, founded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !founded {
log.Errorf("persistent cache task %s not found", req.GetTaskId())
return status.Errorf(codes.NotFound, "persistent cache task %s not found", req.GetTaskId())
peer, found := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !found {
log.Errorf("persistent cache peer %s not found", req.GetPeerId())
return status.Errorf(codes.NotFound, "persistent cache peer %s not found", req.GetPeerId())
}

if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil {
log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err)
return status.Error(codes.Internal, err.Error())
}

// Delete the persistent cache task from the peer, if delete failed, skip it.
addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.DownloadPort)
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
peer.Log.Errorf("get dfdaemon client failed %s", err)
return err
}

if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
peer.Log.Errorf("delete persistent cache task %s from peer %s failed %s", peer.Task.ID, peer.ID, err)
}

// Select the remote peer to copy the replica and trigger the download task with asynchronous.
blocklist := set.NewSafeSet[string]()
blocklist.Add(req.GetHostId())
go func(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) {
if err := v.replicatePersistentCacheTask(ctx, task, blocklist); err != nil {
log.Errorf("replicate persistent cache task failed %s", err)
}
}(context.Background(), task, blocklist)
}(context.Background(), peer.Task, blocklist)

return nil
}
Expand Down Expand Up @@ -2630,10 +2643,35 @@ func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.Del
log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
log.Info("delete persistent cache task")

if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err)
// Delete the persistent cache peers in the redis and peer.
peers, err := v.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, req.GetTaskId())
if err != nil {
log.Errorf("load persistent cache peers by task %s error %s", req.GetTaskId(), err)
return status.Error(codes.Internal, err.Error())
}

for _, peer := range peers {
if err := v.persistentCacheResource.PeerManager().Delete(ctx, peer.ID); err != nil {
log.Errorf("delete persistent cache peer %s error %s", peer.ID, err)
continue
}

// Delete the persistent cache task from the peer, if delete failed, skip it.
addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.DownloadPort)
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...)
if err != nil {
peer.Log.Errorf("get dfdaemon client failed %s", err)
continue
}

if err := dfdaemonClient.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
peer.Log.Errorf("delete persistent cache task %s from peer %s failed %s", peer.Task.ID, peer.ID, err)
continue
}
}

// Delete the persistent cache task in the redis.
if err := v.persistentCacheResource.TaskManager().Delete(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache task %s error %s", req.GetTaskId(), err)
return status.Error(codes.Internal, err.Error())
Expand Down
Loading