Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions manager/models/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package models

import "time"

const (
// SchedulerStateActive represents the scheduler whose state is active.
SchedulerStateActive = "active"
Expand All @@ -34,6 +36,7 @@ type Scheduler struct {
Port int32 `gorm:"column:port;not null;comment:grpc service listening port" json:"port"`
State string `gorm:"column:state;type:varchar(256);default:'inactive';comment:service state" json:"state"`
Features Array `gorm:"column:features;comment:feature flags" json:"features"`
LastKeepAliveAt time.Time `gorm:"column:last_keep_alive_at;comment:last keep alive time" json:"last_keep_alive_at"`
SchedulerClusterID uint `gorm:"index:uk_scheduler,unique;not null;comment:scheduler cluster id" json:"scheduler_cluster_id"`
SchedulerCluster SchedulerCluster `json:"scheduler_cluster"`
}
20 changes: 18 additions & 2 deletions manager/rpcserver/manager_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"io"
"time"

cachev9 "github.com/go-redis/cache/v9"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -476,6 +477,7 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up
Port: req.GetPort(),
SchedulerClusterID: uint(req.GetSchedulerClusterId()),
Features: schedulerFeatures,
LastKeepAliveAt: time.Now(),
}).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -522,6 +524,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up
Port: req.GetPort(),
Features: schedulerFeatures,
SchedulerClusterID: uint(req.GetSchedulerClusterId()),
LastKeepAliveAt: time.Now(),
}

if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
Expand Down Expand Up @@ -816,8 +819,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
}

for {
_, err := stream.Recv()
if err != nil {
if _, err := stream.Recv(); err != nil {
// Inactive scheduler.
if sourceType == managerv2.SourceType_SCHEDULER_SOURCE {
scheduler := models.Scheduler{}
Expand Down Expand Up @@ -868,5 +870,19 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
log.Errorf("keepalive failed: %s", err.Error())
return status.Error(codes.Unknown, err.Error())
}

// Keepalive successful, update last heartbeat time.
if sourceType == managerv2.SourceType_SCHEDULER_SOURCE {
scheduler := models.Scheduler{}
if err := s.db.First(&scheduler, models.Scheduler{
Hostname: hostname,
IP: ip,
SchedulerClusterID: clusterID,
}).Updates(models.Scheduler{
LastKeepAliveAt: time.Now(),
}).Error; err != nil {
log.Errorf("update scheduler last heartbeat failed: %s", err.Error())
}
}
}
}