Skip to content

Commit b94e596

Browse files
committed
fix: hypervisor permission issue; assign port from leader
1 parent 6a3a365 commit b94e596

File tree

13 files changed

+215
-36
lines changed

13 files changed

+215
-36
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"AMDCDNA",
77
"AMDRDNA",
88
"apimachinery",
9+
"automount",
910
"AWSGPU",
1011
"batchv",
1112
"burstable",
@@ -41,6 +42,7 @@
4142
"greptime",
4243
"greptimedb",
4344
"healthz",
45+
"iface",
4446
"karpenter",
4547
"kubebuilder",
4648
"KUBECONFIG",
@@ -71,6 +73,7 @@
7173
"tensorfusionaiv",
7274
"tensorfusioncluster",
7375
"tensorfusionclusters",
76+
"tensorfusionworkload",
7477
"Tera",
7578
"tflops",
7679
"Tmpl",

charts/tensor-fusion/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 1.3.1
18+
version: 1.3.2
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to

charts/tensor-fusion/templates/controller-deployment.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ spec:
9191
requests:
9292
cpu: 50m
9393
memory: 64Mi
94+
limits:
95+
cpu: 1000m
96+
memory: 512Mi
9497
volumeMounts:
9598
- name: logs
9699
mountPath: /logs
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: rbac.authorization.k8s.io/v1
2+
kind: ClusterRole
3+
metadata:
4+
name: tensor-fusion-hypervisor-role
5+
rules:
6+
- apiGroups:
7+
- ""
8+
resources:
9+
- nodes
10+
- pods
11+
verbs:
12+
- get
13+
- list
14+
- watch
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
apiVersion: v1
2+
kind: ServiceAccount
3+
metadata:
4+
# Service account for watch vGPU worker auto scaling event and collect Pod log metadata
5+
# The name is fixed and only needs pods/nodes read permission
6+
name: tensor-fusion-hypervisor-sa
7+
namespace: {{ include "tensor-fusion.namespace" . }}
8+
labels:
9+
{{- include "tensor-fusion.labels" . | nindent 4 }}
10+
automountServiceAccountToken: true

cmd/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,12 @@ func main() {
347347
setupLog.Error(err, "failed to create connection router")
348348
os.Exit(1)
349349
}
350-
httpServer := server.NewHTTPServer(connectionRouter)
350+
assignHostPortRouter, err := router.NewAssignHostPortRouter(ctx, portAllocator)
351+
if err != nil {
352+
setupLog.Error(err, "failed to create assign host port router")
353+
os.Exit(1)
354+
}
355+
httpServer := server.NewHTTPServer(connectionRouter, assignHostPortRouter)
351356
go func() {
352357
err := httpServer.Run()
353358
if err != nil {

internal/constants/constants.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,9 @@ const (
8585
NamespaceEnv = "OPERATOR_NAMESPACE"
8686
NamespaceDefaultVal = "tensor-fusion-sys"
8787

88-
KubernetesHostNameLabel = "kubernetes.io/hostname"
89-
GiBToBytes = 1024 * 1024 * 1024
88+
KubernetesHostNameLabel = "kubernetes.io/hostname"
89+
GiBToBytes = 1024 * 1024 * 1024
90+
HypervisorServiceAccountName = "tensor-fusion-hypervisor-sa"
9091
)
9192

9293
const (
@@ -138,3 +139,8 @@ const (
138139
const TFDataPath = "/tmp/tensor-fusion/data"
139140
const DataVolumeName = "tf-data"
140141
const TensorFusionPoolManualCompaction = Domain + "/manual-compaction"
142+
143+
const (
144+
LeaderInfoConfigMapName = "tensor-fusion-operator-leader-info"
145+
LeaderInfoConfigMapLeaderIPKey = "leader-ip"
146+
)

internal/controller/gpunode_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ func (r *GPUNodeReconciler) createHypervisorPod(ctx context.Context, key client.
469469
ReadOnly: false,
470470
MountPath: constants.TFDataPath,
471471
})
472+
spec.ServiceAccountName = constants.HypervisorServiceAccountName
472473
newPod := &corev1.Pod{
473474
ObjectMeta: metav1.ObjectMeta{
474475
Name: key.Name,

internal/portallocator/portallocator.go

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,33 @@ import (
99
"sync"
1010

1111
"github.com/NexusGPU/tensor-fusion/internal/constants"
12+
"github.com/NexusGPU/tensor-fusion/internal/utils"
1213
v1 "k8s.io/api/core/v1"
14+
"k8s.io/client-go/util/retry"
1315

16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1417
"sigs.k8s.io/controller-runtime/pkg/client"
18+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1519
"sigs.k8s.io/controller-runtime/pkg/log"
1620
"sigs.k8s.io/controller-runtime/pkg/manager"
1721
)
1822

19-
// offer API for host port allocation, range from user configured port range
20-
// when started, fetch all allocated TENSOR_FUSION_WORKER_PORT
21-
22-
// - label: tensor-fusion.ai/component=worker
23-
// portStart: 40000
24-
// portEnd: 41000
25-
// byNode: true
26-
// - label: tensor-fusion.ai/workload-type=lab
27-
// portStart: 41001
28-
// portEnd: 60000
29-
// byNode: false
30-
31-
// PodLabel => NodeName => HostPort
32-
// Annotation: tensor-fusion.ai/host-port: assigned port
33-
// Annotation: tensor-fusion.ai/host-port: assigned pod name
23+
// Offer API for host port allocation, range from user configured port range
24+
// Use label: `tensor-fusion.ai/host-port: auto` to assigned port at cluster level
25+
// vGPU worker's hostPort will be managed by operator
3426
type PortAllocator struct {
3527
PortRangeStartNode int
3628
PortRangeEndNode int
3729

3830
PortRangeStartCluster int
3931
PortRangeEndCluster int
4032

41-
client client.Client
33+
IsLeader bool
4234

4335
BitmapPerNode map[string][]uint64
4436
BitmapCluster []uint64
37+
38+
Client client.Client
4539
}
4640

4741
var storeMutexNode sync.RWMutex
@@ -72,10 +66,10 @@ func NewPortAllocator(ctx context.Context, client client.Client, nodeLevelPortRa
7266
PortRangeEndNode: portRangeEndNode,
7367
PortRangeStartCluster: portRangeStartCluster,
7468
PortRangeEndCluster: portRangeEndCluster,
75-
client: client,
76-
77-
BitmapPerNode: make(map[string][]uint64),
78-
BitmapCluster: make([]uint64, (portRangeEndCluster-portRangeStartCluster)/64+1),
69+
Client: client,
70+
IsLeader: false,
71+
BitmapPerNode: make(map[string][]uint64),
72+
BitmapCluster: make([]uint64, (portRangeEndCluster-portRangeStartCluster)/64+1),
7973
}
8074

8175
return allocator, nil
@@ -85,6 +79,23 @@ func (s *PortAllocator) SetupWithManager(ctx context.Context, mgr manager.Manage
8579
go func() {
8680
<-mgr.Elected()
8781

82+
s.IsLeader = true
83+
leaderInfo := &v1.ConfigMap{
84+
ObjectMeta: metav1.ObjectMeta{
85+
Name: constants.LeaderInfoConfigMapName,
86+
Namespace: utils.CurrentNamespace(),
87+
},
88+
}
89+
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
90+
_, err := controllerutil.CreateOrUpdate(ctx, s.Client, leaderInfo, func() error {
91+
leaderInfo.Data = map[string]string{
92+
constants.LeaderInfoConfigMapLeaderIPKey: utils.CurrentIP(),
93+
}
94+
return nil
95+
})
96+
return err
97+
})
98+
8899
storeMutexNode.Lock()
89100
storeMutexCluster.Lock()
90101
defer storeMutexNode.Unlock()
@@ -98,6 +109,18 @@ func (s *PortAllocator) SetupWithManager(ctx context.Context, mgr manager.Manage
98109
}()
99110
}
100111

112+
func (s *PortAllocator) GetLeaderIP() string {
113+
leaderInfo := &v1.ConfigMap{}
114+
s.Client.Get(context.Background(), client.ObjectKey{
115+
Name: constants.LeaderInfoConfigMapName,
116+
Namespace: utils.CurrentNamespace(),
117+
}, leaderInfo)
118+
if leaderInfo.Data == nil {
119+
return ""
120+
}
121+
return leaderInfo.Data[constants.LeaderInfoConfigMapLeaderIPKey]
122+
}
123+
101124
// AssignHostPort always called by operator itself, thus no Leader-Follower inconsistency issue
102125
func (s *PortAllocator) AssignHostPort(nodeName string) (int, error) {
103126
if nodeName == "" {
@@ -179,7 +202,7 @@ func (s *PortAllocator) ReleaseClusterLevelHostPort(podName string, port int) er
179202
func (s *PortAllocator) initBitMapForClusterLevelPortAssign(ctx context.Context) {
180203
log := log.FromContext(ctx)
181204
podList := &v1.PodList{}
182-
err := s.client.List(ctx, podList, client.MatchingLabels{constants.GenHostPortLabel: constants.GenHostPortLabelValue})
205+
err := s.Client.List(ctx, podList, client.MatchingLabels{constants.GenHostPortLabel: constants.GenHostPortLabelValue})
183206
if err != nil {
184207
log.Error(err, "failed to list pods with port allocation label")
185208
return
@@ -199,7 +222,7 @@ func (s *PortAllocator) initBitMapForClusterLevelPortAssign(ctx context.Context)
199222
func (s *PortAllocator) initBitMapForNodeLevelPortAssign(ctx context.Context) {
200223
log := log.FromContext(ctx)
201224
podList := &v1.PodList{}
202-
err := s.client.List(ctx, podList, client.MatchingLabels{constants.LabelComponent: constants.ComponentWorker})
225+
err := s.Client.List(ctx, podList, client.MatchingLabels{constants.LabelComponent: constants.ComponentWorker})
203226
if err != nil {
204227
log.Error(err, "failed to list pods with port allocation label")
205228
return
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package router
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
8+
"github.com/NexusGPU/tensor-fusion/internal/portallocator"
9+
"github.com/gin-gonic/gin"
10+
"sigs.k8s.io/controller-runtime/pkg/log"
11+
)
12+
13+
type AssignHostPortRouter struct {
14+
allocator *portallocator.PortAllocator
15+
}
16+
17+
func NewAssignHostPortRouter(ctx context.Context, allocator *portallocator.PortAllocator) (*AssignHostPortRouter, error) {
18+
return &AssignHostPortRouter{allocator: allocator}, nil
19+
}
20+
21+
func (r *AssignHostPortRouter) AssignHostPort(ctx *gin.Context) {
22+
// TODO verify service account token, issuer must be the same as current instance
23+
// namely the request must comes from peer operator Pod
24+
25+
podName := ctx.Query("podName")
26+
port, err := r.allocator.AssignClusterLevelHostPort(podName)
27+
if err != nil {
28+
ctx.String(http.StatusInternalServerError, err.Error())
29+
return
30+
}
31+
log.FromContext(ctx).Info("assigned host port", "podName", podName, "port", port)
32+
ctx.String(http.StatusOK, fmt.Sprintf("%d", port))
33+
}

internal/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
func NewHTTPServer(
1010
cr *router.ConnectionRouter,
11+
ahp *router.AssignHostPortRouter,
1112
) *gin.Engine {
1213

1314
r := gin.New()
@@ -17,5 +18,6 @@ func NewHTTPServer(
1718

1819
apiGroup := r.Group("/api")
1920
apiGroup.GET("/connection", cr.Get)
21+
apiGroup.POST("/assign-host-port", ahp.AssignHostPort)
2022
return r
2123
}

internal/utils/net.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package utils
2+
3+
import "net"
4+
5+
func CurrentIP() string {
6+
interfaces, err := net.Interfaces()
7+
if err != nil {
8+
panic(err)
9+
}
10+
11+
for _, iface := range interfaces {
12+
if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
13+
continue
14+
}
15+
16+
addrs, err := iface.Addrs()
17+
if err != nil {
18+
continue
19+
}
20+
21+
for _, addr := range addrs {
22+
ipNet, ok := addr.(*net.IPNet)
23+
if !ok {
24+
continue
25+
}
26+
27+
ip := ipNet.IP
28+
if ip.IsLoopback() || ip.To4() == nil {
29+
continue
30+
}
31+
32+
return ip.String()
33+
}
34+
}
35+
36+
panic("no internal IP address found")
37+
}

0 commit comments

Comments
 (0)