Skip to content

Commit a52074b

Browse files
committed
fix: port allocator issues
1 parent 56fe9aa commit a52074b

15 files changed

+179
-74
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
"schedulingconfigtemplates",
6666
"schedulingcorev",
6767
"shirou",
68+
"strategicpatches",
6869
"subresource",
6970
"tensorfusion",
7071
"tensorfusionaiv",

charts/tensor-fusion/crds/tensor-fusion.ai_gpupools.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,8 @@ spec:
546546
description: Default requests and limitsOverRequests are
547547
same, indicates normal on-demand serverless GPU usage,
548548
in hands-on lab low QoS case, limitsOverRequests should
549-
be cheaper, for example Low QoS, ratio should be 0.5
549+
be lower, so that user can get burstable GPU resources
550+
with very low cost
550551
type: string
551552
qos:
552553
enum:

charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionclusters.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,8 @@ spec:
650650
description: Default requests and limitsOverRequests
651651
are same, indicates normal on-demand serverless
652652
GPU usage, in hands-on lab low QoS case, limitsOverRequests
653-
should be cheaper, for example Low QoS, ratio
654-
should be 0.5
653+
should be lower, so that user can get burstable
654+
GPU resources with very low cost
655655
type: string
656656
qos:
657657
enum:

cmd/main.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,11 @@ func main() {
9292
flag.StringVar(&gpuInfoConfig, "gpu-info-config",
9393
"/etc/tensor-fusion/gpu-info.yaml", "specify the path to gpuInfoConfig file")
9494
flag.StringVar(&metricsPath, "metrics-path", "/logs/metrics.log", "specify the path to metrics file")
95-
flag.StringVar(&nodeLevelPortRange, "host-port-range", "40000-42000", "specify the port range for assigning ports to pre-scheduled Pods such as vGPU workers")
96-
flag.StringVar(&clusterLevelPortRange, "cluster-host-port-range", "42000-62000", "specify the port range for assigning ports to random Pods marked with `tensor-fusion.ai/host-port: auto` and `tensor-fusion.ai/port-name: ssh`")
95+
flag.StringVar(&nodeLevelPortRange, "host-port-range", "40000-42000",
96+
"specify the port range for assigning ports to pre-scheduled Pods such as vGPU workers")
97+
flag.StringVar(&clusterLevelPortRange, "cluster-host-port-range", "42000-62000",
98+
"specify the port range for assigning ports to random Pods"+
99+
" marked with `tensor-fusion.ai/host-port: auto` and `tensor-fusion.ai/port-name: ssh`")
97100
opts := zap.Options{
98101
Development: true,
99102
}
@@ -192,14 +195,8 @@ func main() {
192195
// Key is poolName, second level key is QoS level
193196
WorkerUnitPriceMap: make(map[string]map[string]metrics.RawBillingPricing),
194197
}
195-
if enableLeaderElection {
196-
go func() {
197-
<-mgr.Elected()
198-
metricsRecorder.Start()
199-
}()
200-
} else {
201-
go metricsRecorder.Start()
202-
}
198+
199+
startMetricsRecorder(enableLeaderElection, mgr, metricsRecorder)
203200

204201
// Initialize GPU allocator and set up watches
205202
allocator := gpuallocator.NewGpuAllocator(ctx, mgr.GetClient(), 10*time.Second)
@@ -235,7 +232,7 @@ func main() {
235232

236233
// nolint:goconst
237234
if os.Getenv("ENABLE_WEBHOOKS") != "false" {
238-
if err = webhookcorev1.SetupPodWebhookWithManager(mgr); err != nil {
235+
if err = webhookcorev1.SetupPodWebhookWithManager(mgr, portAllocator); err != nil {
239236
setupLog.Error(err, "unable to create webhook", "webhook", "Pod")
240237
os.Exit(1)
241238
}
@@ -291,8 +288,9 @@ func main() {
291288
os.Exit(1)
292289
}
293290
if err = (&controller.PodReconciler{
294-
Client: mgr.GetClient(),
295-
Scheme: mgr.GetScheme(),
291+
Client: mgr.GetClient(),
292+
Scheme: mgr.GetScheme(),
293+
PortAllocator: portAllocator,
296294
}).SetupWithManager(mgr); err != nil {
297295
setupLog.Error(err, "unable to create controller", "controller", "Pod")
298296
os.Exit(1)
@@ -376,6 +374,17 @@ func main() {
376374
}
377375
}
378376

377+
func startMetricsRecorder(enableLeaderElection bool, mgr manager.Manager, metricsRecorder metrics.MetricsRecorder) {
378+
if enableLeaderElection {
379+
go func() {
380+
<-mgr.Elected()
381+
metricsRecorder.Start()
382+
}()
383+
} else {
384+
go metricsRecorder.Start()
385+
}
386+
}
387+
379388
func watchGPUInfoChanges(gpuInfoConfig string, gpuInfos *[]config.GpuInfo, gpuPricingMap map[string]float64) {
380389
var lastModTime time.Time
381390
if fileInfo, err := os.Stat(gpuInfoConfig); err == nil {

config/crd/bases/tensor-fusion.ai_gpupools.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,8 @@ spec:
546546
description: Default requests and limitsOverRequests are
547547
same, indicates normal on-demand serverless GPU usage,
548548
in hands-on lab low QoS case, limitsOverRequests should
549-
be cheaper, for example Low QoS, ratio should be 0.5
549+
be lower, so that user can get burstable GPU resources
550+
with very low cost
550551
type: string
551552
qos:
552553
enum:

config/crd/bases/tensor-fusion.ai_tensorfusionclusters.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,8 @@ spec:
650650
description: Default requests and limitsOverRequests
651651
are same, indicates normal on-demand serverless
652652
GPU usage, in hands-on lab low QoS case, limitsOverRequests
653-
should be cheaper, for example Low QoS, ratio
654-
should be 0.5
653+
should be lower, so that user can get burstable
654+
GPU resources with very low cost
655655
type: string
656656
qos:
657657
enum:

internal/cloudprovider/common/utils.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
"strings"
99
"time"
1010

11+
"math/rand"
12+
1113
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
1214
"github.com/NexusGPU/tensor-fusion/internal/cloudprovider/types"
13-
"golang.org/x/exp/rand"
1415
corev1 "k8s.io/api/core/v1"
1516
"k8s.io/apimachinery/pkg/api/resource"
1617
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -205,7 +206,8 @@ func contains(slice []string, item string) bool {
205206

206207
func generateRandomString(length int) string {
207208
const charset = "abcdefghijklmnopqrstuvwxyz"
208-
rand.Seed(uint64(time.Now().UnixNano()))
209+
source := rand.NewSource(time.Now().UnixNano())
210+
rand := rand.New(source)
209211

210212
result := make([]byte, length)
211213
for i := range result {

internal/controller/pod_controller.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
2223

2324
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
2425
"github.com/NexusGPU/tensor-fusion/internal/constants"
26+
"github.com/NexusGPU/tensor-fusion/internal/portallocator"
2527
"github.com/NexusGPU/tensor-fusion/internal/utils"
2628
v1 "github.com/NexusGPU/tensor-fusion/internal/webhook/v1"
2729
"github.com/samber/lo"
@@ -40,7 +42,8 @@ import (
4042
// PodReconciler reconciles a Pod object
4143
type PodReconciler struct {
4244
client.Client
43-
Scheme *runtime.Scheme
45+
Scheme *runtime.Scheme
46+
PortAllocator *portallocator.PortAllocator
4447
}
4548

4649
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete;deletecollection
@@ -59,6 +62,15 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
5962
log.Error(err, "Failed to get Pod")
6063
return ctrl.Result{}, err
6164
}
65+
66+
// Release cluster level port when Pod deleted
67+
if !pod.DeletionTimestamp.IsZero() {
68+
if pod.Annotations[constants.GenHostPortLabel] == constants.GenHostPortLabelValue {
69+
podPortNumber, _ := strconv.Atoi(pod.Annotations[constants.GenPortNumberAnnotation])
70+
_ = r.PortAllocator.ReleaseClusterLevelHostPort(pod.Name, podPortNumber)
71+
log.Info("Released port", "pod", pod.Name, "port", podPortNumber)
72+
}
73+
}
6274
// generate tensor fusion connections and apply to cluster
6375
tfConnection := generateTensorFusionConnection(pod)
6476
if tfConnection == nil {

internal/controller/tensorfusioncluster_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ func (r *TensorFusionClusterReconciler) SetupWithManager(mgr ctrl.Manager) error
420420

421421
// Update metrics recorder's raw billing map
422422
func (r *TensorFusionClusterReconciler) updateMetricsRecorder(ctx context.Context, pool *tfv1.GPUPool) {
423+
log := log.FromContext(ctx)
423424
qosConfig := pool.Spec.QosConfig
424425
if _, ok := r.MetricsRecorder.WorkerUnitPriceMap[pool.Name]; !ok {
425426
r.MetricsRecorder.WorkerUnitPriceMap[pool.Name] = make(map[string]metrics.RawBillingPricing)
@@ -438,4 +439,6 @@ func (r *TensorFusionClusterReconciler) updateMetricsRecorder(ctx context.Contex
438439
VramOverRequestPerSecond: vramPerSecond / 3600 * limitOverRequestChargingRatio,
439440
}
440441
}
442+
443+
log.V(5).Info("Updated metrics recorder", "pool", pool.Name, "pricing", pricingDetail)
441444
}

internal/controller/tensorfusionworkload_controller.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"sort"
23+
"strconv"
2324
"time"
2425

2526
corev1 "k8s.io/api/core/v1"
@@ -58,6 +59,8 @@ type TensorFusionWorkloadReconciler struct {
5859
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=tensorfusionworkloads/finalizers,verbs=update
5960

6061
// TensorFusionWorkload Reconciler
62+
//
63+
//nolint:gocyclo
6164
func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6265
log := log.FromContext(ctx)
6366
log.Info("Reconciling TensorFusionWorkload", "request", req)
@@ -110,6 +113,8 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
110113

111114
if deleted {
112115
metrics.RemoveWorkerMetrics(pod.Name, pod.DeletionTimestamp.Time)
116+
podPort, _ := strconv.Atoi(pod.Annotations[constants.GenPortNumberAnnotation])
117+
_ = r.PortAllocator.ReleaseHostPort(pod.Spec.NodeName, podPort)
113118
}
114119

115120
// Handle our GPU resource cleanup finalizer
@@ -127,12 +132,7 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
127132
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
128133
}
129134

130-
// init metrics map if needed
131-
now := time.Now()
132-
for i := range podList.Items {
133-
pod := &podList.Items[i]
134-
metrics.SetWorkerMetricsByWorkload(pod, workload, now)
135-
}
135+
handleMetricsRecorder(podList, workload)
136136

137137
// Fetch the GPUPool
138138
pool := &tfv1.GPUPool{}
@@ -228,14 +228,22 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
228228
return ctrl.Result{}, nil
229229
}
230230

231+
func handleMetricsRecorder(podList *corev1.PodList, workload *tfv1.TensorFusionWorkload) {
232+
now := time.Now()
233+
for i := range podList.Items {
234+
pod := &podList.Items[i]
235+
metrics.SetWorkerMetricsByWorkload(pod, workload, now)
236+
}
237+
}
238+
231239
func (r *TensorFusionWorkloadReconciler) tryStartWorker(
232240
ctx context.Context,
233241
workerGenerator *worker.WorkerGenerator,
234242
gpu *tfv1.GPU,
235243
workload *tfv1.TensorFusionWorkload,
236244
hash string,
237245
) (*corev1.Pod, error) {
238-
port, err := r.PortAllocator.GetHostPort(gpu.Status.NodeSelector[constants.KubernetesHostNameLabel])
246+
port, err := r.PortAllocator.AssignHostPort(gpu.Status.NodeSelector[constants.KubernetesHostNameLabel])
239247
if err != nil {
240248
return nil, fmt.Errorf("get host port %w", err)
241249
}
@@ -270,7 +278,7 @@ func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, w
270278

271279
for i := range pods {
272280
podToDelete := &pods[i]
273-
log.Info("Scaling down worker pod", "name", podToDelete.Name)
281+
log.Info("Scaling down worker pod", "name", podToDelete.Name, "workload", workload.Name)
274282
// Delete the pod with foreground deletion policy
275283
// The finalizer will handle GPU resource cleanup
276284
if err := r.deletePod(ctx, podToDelete); err != nil {

internal/controller/workloadprofile_controller.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,9 @@ type WorkloadProfileReconciler struct {
3737
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=workloadprofiles/status,verbs=get;update;patch
3838
// +kubebuilder:rbac:groups=tensor-fusion.ai,resources=workloadprofiles/finalizers,verbs=update
3939

40-
// Reconcile is part of the main kubernetes reconciliation loop which aims to
41-
// move the current state of the cluster closer to the desired state.
42-
// TODO(user): Modify the Reconcile function to compare the state specified by
43-
// the WorkloadProfile object against the actual cluster state, and then
44-
// perform operations to make the cluster state reflect the state specified by
45-
// the user.
46-
//
47-
// For more details, check Reconcile and its Result here:
48-
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
40+
// WorkloadProfile is a template to be referred by TensorFusionWorkload, no logic for reconcile
4941
func (r *WorkloadProfileReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
5042
_ = log.FromContext(ctx)
51-
52-
// TODO(user): your logic here
53-
5443
return ctrl.Result{}, nil
5544
}
5645

0 commit comments

Comments
 (0)