From 59f2c907d12c6b7fae639797ab09abe734320a47 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 16 May 2025 22:38:55 +0800 Subject: [PATCH 01/12] feat: add GPUCount field to TensorFusionWorkload and WorkloadProfile specs, update GPU allocation logic --- api/v1/workloadprofile_types.go | 5 +- .../tensorfusionworkload_controller.go | 64 ++++++------ internal/gpuallocator/gpuallocator.go | 27 ++--- internal/gpuallocator/gpuallocator_test.go | 66 ++++++++++++- internal/metrics/worker.go | 18 +++- internal/webhook/v1/pod_webhook.go | 1 + internal/webhook/v1/tf_parser.go | 2 +- internal/worker/worker.go | 99 +++++++++---------- 8 files changed, 176 insertions(+), 106 deletions(-) diff --git a/api/v1/workloadprofile_types.go b/api/v1/workloadprofile_types.go index 9e20610..812f51e 100644 --- a/api/v1/workloadprofile_types.go +++ b/api/v1/workloadprofile_types.go @@ -39,8 +39,8 @@ type WorkloadProfileSpec struct { PoolName string `json:"poolName,omitempty"` // +optional - Resources Resources `json:"resources,omitempty"` + Resources Resources `json:"resources,omitempty"` // +optional // Qos defines the quality of service level for the client. Qos QoSLevel `json:"qos,omitempty"` @@ -50,9 +50,8 @@ type WorkloadProfileSpec struct { IsLocalGPU bool `json:"isLocalGPU,omitempty"` // +optional - // TODO, not implemented // The number of GPUs to be used by the workload, default to 1 - GPUCount int `json:"gpuCount,omitempty"` + GPUCount uint `json:"gpuCount,omitempty"` // +optional // TODO, not implemented diff --git a/internal/controller/tensorfusionworkload_controller.go b/internal/controller/tensorfusionworkload_controller.go index 472d12b..1512f38 100644 --- a/internal/controller/tensorfusionworkload_controller.go +++ b/internal/controller/tensorfusionworkload_controller.go @@ -20,11 +20,13 @@ import ( "context" "fmt" "sort" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -217,12 +219,12 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl func (r *TensorFusionWorkloadReconciler) tryStartWorker( ctx context.Context, workerGenerator *worker.WorkerGenerator, - gpu *tfv1.GPU, + gpus []*tfv1.GPU, workload *tfv1.TensorFusionWorkload, hash string, ) (*corev1.Pod, error) { port := workerGenerator.AllocPort() - pod, hash, err := workerGenerator.GenerateWorkerPod(gpu, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits, hash) + pod, hash, err := workerGenerator.GenerateWorkerPod(gpus, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits, hash) if err != nil { return nil, fmt.Errorf("generate worker pod %w", err) } @@ -231,8 +233,13 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker( if pod.Labels == nil { pod.Labels = make(map[string]string) } + + gpuNames := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) string { + return gpu.Name + }) + pod.Labels[constants.WorkloadKey] = workload.Name - pod.Labels[constants.GpuKey] = gpu.Name + pod.Labels[constants.GpuKey] = strings.Join(gpuNames, ",") pod.Labels[constants.LabelKeyPodTemplateHash] = hash // Add finalizer for GPU resource cleanup @@ -269,6 +276,7 @@ func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, w metrics.GpuTflopsLimit.Delete(labels) metrics.VramBytesRequest.Delete(labels) metrics.VramBytesLimit.Delete(labels) + metrics.GpuCount.Delete(labels) } return nil } @@ -279,26 +287,24 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context log.Info("Processing pod with GPU resource cleanup finalizer", "pod", pod.Name) - // Get GPU name from pod label - gpuName, ok := pod.Labels[constants.GpuKey] + // Get GPU names from pod label + gpuNamesStr, ok := pod.Labels[constants.GpuKey] if !ok { log.Info("Pod has finalizer but no GPU label", "pod", pod.Name) return true, nil } - // Get the GPU - gpu := &tfv1.GPU{} - if err := r.Get(ctx, client.ObjectKey{Name: gpuName}, gpu); err != nil { - if errors.IsNotFound(err) { - // GPU not found, just continue - log.Info("GPU not found", "gpu", gpuName, "pod", pod.Name) - return true, nil - } - // Error getting GPU, retry later - log.Error(err, "Failed to get GPU", "gpu", gpuName, "pod", pod.Name) + // Split GPU names by comma + gpuNames := strings.Split(gpuNamesStr, ",") + gpus := lo.Map(gpuNames, func(gpuName string, _ int) types.NamespacedName { + return types.NamespacedName{Name: gpuName} + }) + // Release GPU resources + if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpus); err != nil { + log.Error(err, "Failed to release GPU resources, will retry", "gpus", gpus, "pod", pod.Name) return false, err } - + log.Info("Released GPU resources via finalizer", "gpus", gpus, "pod", pod.Name) if pod.Annotations == nil { pod.Annotations = make(map[string]string) } @@ -310,17 +316,10 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context // not yet reflecting the finalizer's removal), Then this r.Update pod will fail. // Will not cause duplicate releases if err := r.Update(ctx, pod); err != nil { - log.Error(err, "Failed to mark that GPU cleanup of pod", "gpu", gpuName, "pod", pod.Name) - return false, err - } - - // Release GPU resources - if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu); err != nil { - log.Error(err, "Failed to release GPU resources, will retry", "gpu", gpuName, "pod", pod.Name) + log.Error(err, "Failed to mark that GPU cleanup of pod") return false, err } - log.Info("Released GPU resources via finalizer", "gpu", gpuName, "pod", pod.Name) return true, nil } @@ -344,21 +343,21 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor // Create worker pods for range count { // Schedule GPU for the worker - gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, 1) + gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, workload.Spec.GPUCount) if err != nil { r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err) return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil } - // Use the first GPU from the allocated array - gpu := gpus[0] - - pod, err := r.tryStartWorker(ctx, workerGenerator, gpu, workload, hash) + pod, err := r.tryStartWorker(ctx, workerGenerator, gpus, workload, hash) if err != nil { - // Try to release the GPU resource if pod creation fails - releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu) + // Try to release all allocated GPUs if pod creation fails + gpus := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) types.NamespacedName { + return client.ObjectKeyFromObject(gpu) + }) + releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpus) if releaseErr != nil { - log.Error(releaseErr, "Failed to release GPU after pod creation failure") + log.Error(releaseErr, "Failed to release GPU after pod creation failure", "gpus", gpus) } return ctrl.Result{}, fmt.Errorf("create worker pod: %w", err) } @@ -372,6 +371,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor metrics.GpuTflopsLimit.With(labels).Set(workload.Spec.Resources.Limits.Tflops.AsApproximateFloat64()) metrics.VramBytesRequest.With(labels).Set(workload.Spec.Resources.Requests.Vram.AsApproximateFloat64()) metrics.VramBytesLimit.With(labels).Set(workload.Spec.Resources.Limits.Vram.AsApproximateFloat64()) + metrics.GpuCount.With(labels).Set(float64(workload.Spec.GPUCount)) } return ctrl.Result{}, nil diff --git a/internal/gpuallocator/gpuallocator.go b/internal/gpuallocator/gpuallocator.go index 9aa7980..eb33110 100644 --- a/internal/gpuallocator/gpuallocator.go +++ b/internal/gpuallocator/gpuallocator.go @@ -121,25 +121,26 @@ func (s *GpuAllocator) Alloc( return result, nil } -// Dealloc deallocates a request from a gpu. -func (s *GpuAllocator) Dealloc(ctx context.Context, request tfv1.Resource, gpu *tfv1.GPU) error { +// Dealloc deallocates a request from one or multiple gpus. +func (s *GpuAllocator) Dealloc(ctx context.Context, request tfv1.Resource, gpus []types.NamespacedName) error { log := log.FromContext(ctx) s.storeMutex.Lock() defer s.storeMutex.Unlock() - // Get the GPU from the store - key := types.NamespacedName{Name: gpu.Name, Namespace: gpu.Namespace} - storeGPU, exists := s.gpuStore[key] - if !exists { - log.Info("GPU not found in store during deallocation", "name", key.String()) - return fmt.Errorf("GPU %s not found in store", key.String()) - } + for _, gpu := range gpus { + // Get the GPU from the store + storeGPU, exists := s.gpuStore[gpu] + if !exists { + log.Error(fmt.Errorf("GPU not found in store"), "Failed to deallocate GPU", "name", gpu.String()) + continue + } - // Add resources back to the GPU - storeGPU.Status.Available.Tflops.Add(request.Tflops) - storeGPU.Status.Available.Vram.Add(request.Vram) + // Add resources back to the GPU + storeGPU.Status.Available.Tflops.Add(request.Tflops) + storeGPU.Status.Available.Vram.Add(request.Vram) - s.markGPUDirty(key) + s.markGPUDirty(gpu) + } return nil } diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index 5f7d472..ae66662 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -23,9 +23,11 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/constants" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/samber/lo" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = Describe("GPU Allocator", func() { @@ -138,7 +140,7 @@ var _ = Describe("GPU Allocator", func() { allocatedVram := allocatedGPU.Status.Available.Vram.DeepCopy() // Now deallocate - err = allocator.Dealloc(ctx, request, allocatedGPU) + err = allocator.Dealloc(ctx, request, []types.NamespacedName{client.ObjectKeyFromObject(gpus[0])}) Expect(err).NotTo(HaveOccurred()) // Verify resources were restored @@ -148,9 +150,69 @@ var _ = Describe("GPU Allocator", func() { expectedTflops.Add(request.Tflops) expectedVram.Add(request.Vram) - Expect(deallocatedGPU.Status.Available.Tflops.Cmp(allocatedTflops)).To(Equal(1)) + Expect(deallocatedGPU.Status.Available.Tflops.Cmp(expectedTflops)).To(Equal(0)) + Expect(deallocatedGPU.Status.Available.Vram.Cmp(expectedVram)).To(Equal(0)) Expect(deallocatedGPU.Status.Available.Vram.Cmp(allocatedVram)).To(Equal(1)) }) + + It("should continue deallocating when some GPUs don't exist", func() { + // First allocate resources to multiple GPUs + request := tfv1.Resource{ + Tflops: resource.MustParse("20"), + Vram: resource.MustParse("4Gi"), + } + + // Allocate 2 GPUs + allocatedGPUs, err := allocator.Alloc(ctx, "test-pool", request, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(allocatedGPUs).To(HaveLen(2)) + + // Create a non-existent GPU + nonExistentGPU := &tfv1.GPU{ + ObjectMeta: metav1.ObjectMeta{ + Name: "non-existent-gpu", + Namespace: "default", + }, + } + + // Add the non-existent GPU to the list + gpusToDealloc := append(allocatedGPUs, nonExistentGPU) + + // Store the allocated values for existing GPUs + initialStates := make(map[string]struct { + tflops resource.Quantity + vram resource.Quantity + }) + for _, gpu := range allocatedGPUs { + initialStates[gpu.Name] = struct { + tflops resource.Quantity + vram resource.Quantity + }{ + tflops: gpu.Status.Available.Tflops.DeepCopy(), + vram: gpu.Status.Available.Vram.DeepCopy(), + } + } + gpusToDeallocKeys := lo.Map(gpusToDealloc, func(gpu *tfv1.GPU, _ int) types.NamespacedName { + return client.ObjectKeyFromObject(gpu) + }) + // Now deallocate all GPUs including the non-existent one + err = allocator.Dealloc(ctx, request, gpusToDeallocKeys) + Expect(err).NotTo(HaveOccurred()) + + // Verify resources were restored for existing GPUs + for _, allocatedGPU := range allocatedGPUs { + deallocatedGPU := getGPU(allocatedGPU.Name, allocatedGPU.Namespace) + initialState := initialStates[allocatedGPU.Name] + + expectedTflops := initialState.tflops.DeepCopy() + expectedVram := initialState.vram.DeepCopy() + expectedTflops.Add(request.Tflops) + expectedVram.Add(request.Vram) + + Expect(deallocatedGPU.Status.Available.Tflops.Cmp(initialState.tflops)).To(Equal(1)) + Expect(deallocatedGPU.Status.Available.Vram.Cmp(initialState.vram)).To(Equal(1)) + } + }) }) Context("Event Handling", func() { diff --git a/internal/metrics/worker.go b/internal/metrics/worker.go index f28d7d8..3e5bf84 100644 --- a/internal/metrics/worker.go +++ b/internal/metrics/worker.go @@ -42,6 +42,14 @@ var ( labels, ) + GpuCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gpu_count", + Help: "Number of GPUs allocated to the workload", + }, + labels, + ) + AllocatedTflopsPercent = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "allocated_compute_percentage", @@ -58,5 +66,13 @@ var ( ) func init() { - metrics.Registry.MustRegister(GpuTflopsRequest, GpuTflopsLimit, VramBytesRequest, VramBytesLimit) + metrics.Registry.MustRegister( + GpuTflopsRequest, + GpuTflopsLimit, + VramBytesRequest, + VramBytesLimit, + AllocatedTflopsPercent, + AllocatedVramBytes, + GpuCount, + ) } diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 9f4ecd4..53e7f62 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -181,6 +181,7 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(ctx context.Context, pod Replicas: &replicas, PoolName: tfInfo.Profile.PoolName, Resources: tfInfo.Profile.Resources, + GPUCount: tfInfo.Profile.GPUCount, Qos: tfInfo.Profile.Qos, IsLocalGPU: tfInfo.Profile.IsLocalGPU, }, diff --git a/internal/webhook/v1/tf_parser.go b/internal/webhook/v1/tf_parser.go index 0b50992..0a965be 100644 --- a/internal/webhook/v1/tf_parser.go +++ b/internal/webhook/v1/tf_parser.go @@ -106,7 +106,7 @@ func ParseTensorFusionInfo(ctx context.Context, k8sClient client.Client, pod *co if err != nil { return info, fmt.Errorf("invalid gpuCount value: %w", err) } - workloadProfile.Spec.GPUCount = int(val) + workloadProfile.Spec.GPUCount = uint(val) } localGPU, ok := pod.Annotations[constants.IsLocalGPUAnnotation] diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 08a27d1..75ce011 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "strconv" + "strings" "time" tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" @@ -56,7 +57,7 @@ func (wg *WorkerGenerator) PodTemplateHash(workloadSpec any) (string, error) { } func (wg *WorkerGenerator) GenerateWorkerPod( - gpu *tfv1.GPU, + gpus []*tfv1.GPU, generateName string, namespace string, port int, @@ -69,10 +70,10 @@ func (wg *WorkerGenerator) GenerateWorkerPod( return nil, "", fmt.Errorf("failed to unmarshal pod template: %w", err) } spec := podTmpl.Template.Spec - if spec.NodeSelector == nil { - spec.NodeSelector = make(map[string]string) - } - spec.NodeSelector = gpu.Status.NodeSelector + + // all the gpus are on the same node + spec.NodeSelector = gpus[0].Status.NodeSelector + spec.Volumes = append(spec.Volumes, corev1.Volume{ Name: constants.DataVolumeName, VolumeSource: corev1.VolumeSource{ @@ -81,22 +82,28 @@ func (wg *WorkerGenerator) GenerateWorkerPod( }, }, }) + spec.Containers[0].VolumeMounts = append(spec.Containers[0].VolumeMounts, corev1.VolumeMount{ Name: constants.DataVolumeName, MountPath: constants.TFDataPath, SubPathExpr: fmt.Sprintf("${%s}", constants.WorkerPodNameEnv), }) + firstGPU := gpus[0] info, ok := lo.Find(*wg.GpuInfos, func(info config.GpuInfo) bool { - return info.FullModelName == gpu.Status.GPUModel + return info.FullModelName == firstGPU.Status.GPUModel }) if !ok { - return nil, "", fmt.Errorf("gpu info(%s) not found", gpu.Status.GPUModel) + return nil, "", fmt.Errorf("gpu info(%s) not found", firstGPU.Status.GPUModel) } + gpuUUIDs := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) string { + return gpu.Status.UUID + }) + spec.Containers[0].Env = append(spec.Containers[0].Env, corev1.EnvVar{ Name: "NVIDIA_VISIBLE_DEVICES", - Value: gpu.Status.UUID, + Value: strings.Join(gpuUUIDs, ","), }, corev1.EnvVar{ Name: constants.WorkerPortEnv, Value: strconv.Itoa(port), @@ -118,6 +125,7 @@ func (wg *WorkerGenerator) GenerateWorkerPod( }, }, }) + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ GenerateName: generateName, @@ -136,63 +144,46 @@ func SelectWorker( if len(workload.Status.WorkerStatuses) == 0 { return nil, fmt.Errorf("no available worker") } - usageMapping := make(map[string]int, len(workload.Status.WorkerStatuses)) - for _, workerStatus := range workload.Status.WorkerStatuses { - usageMapping[workerStatus.WorkerName] = 0 - } + + usageMapping := lo.SliceToMap(workload.Status.WorkerStatuses, func(status tfv1.WorkerStatus) (string, int) { + return status.WorkerName, 0 + }) connectionList := tfv1.TensorFusionConnectionList{} if err := k8sClient.List(ctx, &connectionList, client.MatchingLabels{constants.WorkloadKey: workload.Name}); err != nil { return nil, fmt.Errorf("list TensorFusionConnection: %w", err) } - for _, connection := range connectionList.Items { - if connection.Status.WorkerName != "" { - usageMapping[connection.Status.WorkerName]++ - } - } - - // First find the minimum usage - minUsage := int(^uint(0) >> 1) - // Initialize with max int value - for _, workerStatus := range workload.Status.WorkerStatuses { - if workerStatus.WorkerPhase == tfv1.WorkerFailed { - continue - } - usage := usageMapping[workerStatus.WorkerName] - if usage < minUsage { - minUsage = usage + lo.ForEach(connectionList.Items, func(conn tfv1.TensorFusionConnection, _ int) { + if conn.Status.WorkerName != "" { + usageMapping[conn.Status.WorkerName]++ } - } + }) - // Collect all eligible workers that are within maxSkew of the minimum usage - var eligibleWorkers []*tfv1.WorkerStatus - for _, workerStatus := range workload.Status.WorkerStatuses { - if workerStatus.WorkerPhase == tfv1.WorkerFailed { - continue - } - usage := usageMapping[workerStatus.WorkerName] - // Worker is eligible if its usage is within maxSkew of the minimum usage - if usage <= minUsage+int(maxSkew) { - eligibleWorkers = append(eligibleWorkers, &workerStatus) - } - } + // filter out failed workers and get the usage of available workers + activeWorkers := lo.Filter(workload.Status.WorkerStatuses, func(status tfv1.WorkerStatus, _ int) bool { + return status.WorkerPhase != tfv1.WorkerFailed + }) - if len(eligibleWorkers) == 0 { + if len(activeWorkers) == 0 { return nil, fmt.Errorf("no available worker") } - // Choose the worker with the minimum usage among eligible workers - selectedWorker := eligibleWorkers[0] - selectedUsage := usageMapping[selectedWorker.WorkerName] - for i := 1; i < len(eligibleWorkers); i++ { - worker := eligibleWorkers[i] - usage := usageMapping[worker.WorkerName] - if usage < selectedUsage { - selectedWorker = worker - selectedUsage = usage - } - } + // find the worker with the minimum usage + minUsage := lo.MinBy(activeWorkers, func(a, b tfv1.WorkerStatus) bool { + return usageMapping[a.WorkerName] < usageMapping[b.WorkerName] + }) + minUsageValue := usageMapping[minUsage.WorkerName] + + // collect all workers within the minimum usage plus maxSkew range + eligibleWorkers := lo.Filter(activeWorkers, func(status tfv1.WorkerStatus, _ int) bool { + return usageMapping[status.WorkerName] <= minUsageValue+int(maxSkew) + }) + + // select the worker with the minimum usage among eligible workers + selectedWorker := lo.MinBy(eligibleWorkers, func(a, b tfv1.WorkerStatus) bool { + return usageMapping[a.WorkerName] < usageMapping[b.WorkerName] + }) - return selectedWorker, nil + return &selectedWorker, nil } From 638d500cca0181ea5b5283907c18326be30494bf Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 16 May 2025 23:18:20 +0800 Subject: [PATCH 02/12] feat: update GPU allocation to use pod annotations and add test for multiple GPU allocation --- .../tensorfusionworkload_controller.go | 10 ++-- .../tensorfusionworkload_controller_test.go | 46 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/internal/controller/tensorfusionworkload_controller.go b/internal/controller/tensorfusionworkload_controller.go index 1512f38..cf7e782 100644 --- a/internal/controller/tensorfusionworkload_controller.go +++ b/internal/controller/tensorfusionworkload_controller.go @@ -234,13 +234,17 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker( pod.Labels = make(map[string]string) } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + gpuNames := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) string { return gpu.Name }) pod.Labels[constants.WorkloadKey] = workload.Name - pod.Labels[constants.GpuKey] = strings.Join(gpuNames, ",") pod.Labels[constants.LabelKeyPodTemplateHash] = hash + pod.Annotations[constants.GpuKey] = strings.Join(gpuNames, ",") // Add finalizer for GPU resource cleanup pod.Finalizers = append(pod.Finalizers, constants.Finalizer) @@ -287,8 +291,8 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context log.Info("Processing pod with GPU resource cleanup finalizer", "pod", pod.Name) - // Get GPU names from pod label - gpuNamesStr, ok := pod.Labels[constants.GpuKey] + // read the GPU names from the pod annotations + gpuNamesStr, ok := pod.Annotations[constants.GpuKey] if !ok { log.Info("Pod has finalizer but no GPU label", "pod", pod.Name) return true, nil diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index a36a00e..b22e0f9 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "strings" "time" "github.com/aws/smithy-go/ptr" @@ -57,6 +58,51 @@ var _ = Describe("TensorFusionWorkload Controller", func() { checkWorkerPodCount(workload) checkWorkloadStatus(workload) }) + + It("Should allocate multiple GPUs per workload when GPUCount > 1", func() { + pool := tfEnv.GetGPUPool(0) + By("creating a workload that requests 2 GPUs") + workload := &tfv1.TensorFusionWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + Labels: map[string]string{ + constants.LabelKeyOwner: pool.Name, + }, + }, + Spec: tfv1.TensorFusionWorkloadSpec{ + Replicas: ptr.Int32(1), + PoolName: pool.Name, + GPUCount: 2, + Resources: tfv1.Resources{ + Requests: tfv1.Resource{ + Tflops: resource.MustParse("10"), + Vram: resource.MustParse("8Gi"), + }, + Limits: tfv1.Resource{ + Tflops: resource.MustParse("20"), + Vram: resource.MustParse("16Gi"), + }, + }, + }, + } + + Expect(k8sClient.Create(ctx, workload)).To(Succeed()) + + // Check that pod is created with 2 GPUs + podList := &corev1.PodList{} + Eventually(func(g Gomega) { + g.Expect(k8sClient.List(ctx, podList, + client.InNamespace(key.Namespace), + client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed()) + g.Expect(podList.Items).Should(HaveLen(1)) + + gpuNames := strings.Split(podList.Items[0].Annotations[constants.GpuKey], ",") + g.Expect(gpuNames).Should(HaveLen(2)) + }, timeout, interval).Should(Succeed()) + + checkWorkloadStatus(workload) + }) }) Context("When scaling up a workload", func() { From a55ca0ecc4de93050bca52be7c75284311853777 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Wed, 21 May 2025 14:31:42 +0800 Subject: [PATCH 03/12] chore: replace TensorFusionWorkloadSpec to WorkloadProfileSpec in test file --- internal/controller/tensorfusionworkload_controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index b22e0f9..c5d9708 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -70,7 +70,7 @@ var _ = Describe("TensorFusionWorkload Controller", func() { constants.LabelKeyOwner: pool.Name, }, }, - Spec: tfv1.TensorFusionWorkloadSpec{ + Spec: tfv1.WorkloadProfileSpec{ Replicas: ptr.Int32(1), PoolName: pool.Name, GPUCount: 2, From c41129d3091235dbe6ea9f5da0792a991d428685 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 23 May 2025 16:23:11 +0800 Subject: [PATCH 04/12] feat: convert GPU resource limits from scalar values to per-device JSON maps --- internal/worker/worker.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 75ce011..bc827a5 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -109,14 +109,35 @@ func (wg *WorkerGenerator) GenerateWorkerPod( Value: strconv.Itoa(port), }, corev1.EnvVar{ Name: constants.WorkerCudaUpLimitTflopsEnv, - Value: strconv.FormatInt(limits.Tflops.Value(), 10), + Value: func() string { + tflopsMap := make(map[string]int64) + for _, gpu := range gpus { + tflopsMap[gpu.Status.UUID] = limits.Tflops.Value() + } + jsonBytes, _ := json.Marshal(tflopsMap) + return string(jsonBytes) + }(), }, corev1.EnvVar{ Name: constants.WorkerCudaUpLimitEnv, - Value: strconv.FormatInt(int64(math.Ceil(float64(limits.Tflops.Value())/float64(info.Fp16TFlops.Value())*100)), 10), + Value: func() string { + upLimitMap := make(map[string]int64) + for _, gpu := range gpus { + upLimitMap[gpu.Status.UUID] = int64(math.Ceil(float64(limits.Tflops.Value())/float64(info.Fp16TFlops.Value())*100)) + } + jsonBytes, _ := json.Marshal(upLimitMap) + return string(jsonBytes) + }(), }, corev1.EnvVar{ Name: constants.WorkerCudaMemLimitEnv, // bytesize - Value: strconv.FormatInt(limits.Vram.Value(), 10), + Value: func() string { + memLimitMap := make(map[string]int64) + for _, gpu := range gpus { + memLimitMap[gpu.Status.UUID] = limits.Vram.Value() + } + jsonBytes, _ := json.Marshal(memLimitMap) + return string(jsonBytes) + }(), }, corev1.EnvVar{ Name: constants.WorkerPodNameEnv, ValueFrom: &corev1.EnvVarSource{ From 5b67496813ccac4fe29f181f80669b6396772c15 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 23 May 2025 16:56:59 +0800 Subject: [PATCH 05/12] go fmt --- internal/worker/worker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index bc827a5..1d44717 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -108,7 +108,7 @@ func (wg *WorkerGenerator) GenerateWorkerPod( Name: constants.WorkerPortEnv, Value: strconv.Itoa(port), }, corev1.EnvVar{ - Name: constants.WorkerCudaUpLimitTflopsEnv, + Name: constants.WorkerCudaUpLimitTflopsEnv, Value: func() string { tflopsMap := make(map[string]int64) for _, gpu := range gpus { @@ -118,11 +118,11 @@ func (wg *WorkerGenerator) GenerateWorkerPod( return string(jsonBytes) }(), }, corev1.EnvVar{ - Name: constants.WorkerCudaUpLimitEnv, + Name: constants.WorkerCudaUpLimitEnv, Value: func() string { upLimitMap := make(map[string]int64) for _, gpu := range gpus { - upLimitMap[gpu.Status.UUID] = int64(math.Ceil(float64(limits.Tflops.Value())/float64(info.Fp16TFlops.Value())*100)) + upLimitMap[gpu.Status.UUID] = int64(math.Ceil(float64(limits.Tflops.Value()) / float64(info.Fp16TFlops.Value()) * 100)) } jsonBytes, _ := json.Marshal(upLimitMap) return string(jsonBytes) From a7bab2cfa76b83a7f87bbfaae7e265acc9be69f9 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Thu, 29 May 2025 10:29:59 +0800 Subject: [PATCH 06/12] fix: add missing GPUModel parameter to GPU allocation test case --- internal/controller/tensorfusionworkload_controller.go | 2 +- internal/gpuallocator/gpuallocator_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/tensorfusionworkload_controller.go b/internal/controller/tensorfusionworkload_controller.go index 5fa603e..1823312 100644 --- a/internal/controller/tensorfusionworkload_controller.go +++ b/internal/controller/tensorfusionworkload_controller.go @@ -347,7 +347,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor // Create worker pods for range count { // Schedule GPU for the worker - gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, workload.Spec.GPUCount, workload.Spec.GPUModel) + gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, workload.Spec.GPUCount, workload.Spec.GPUModel) if err != nil { r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err) return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index 38c1c75..9e15076 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -180,7 +180,7 @@ var _ = Describe("GPU Allocator", func() { } // Allocate 2 GPUs - allocatedGPUs, err := allocator.Alloc(ctx, "test-pool", request, 2) + allocatedGPUs, err := allocator.Alloc(ctx, "test-pool", request, 2, "") Expect(err).NotTo(HaveOccurred()) Expect(allocatedGPUs).To(HaveLen(2)) From 8fc3cc42fa2dc75b2e37357b984fedb1eb7a4ca0 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Thu, 29 May 2025 11:24:41 +0800 Subject: [PATCH 07/12] test: filter out deleted pods when checking workload pod count --- internal/controller/tensorfusionworkload_controller_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index f637b38..765fdfe 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -260,6 +260,10 @@ var _ = Describe("TensorFusionWorkload Controller", func() { g.Expect(k8sClient.List(ctx, podList, client.InNamespace(key.Namespace), client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed()) + // Filter out pods that are being deleted + podList.Items = lo.Filter(podList.Items, func(pod corev1.Pod, _ int) bool { + return pod.DeletionTimestamp == nil + }) g.Expect(podList.Items).Should(HaveLen(1)) }, timeout, interval).Should(Succeed()) From 9d00e61873fb0bc0bd0cf7223bd2f28829676db1 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Thu, 29 May 2025 15:11:42 +0800 Subject: [PATCH 08/12] fix: update GPU lookup to handle comma-separated GPU names in pod annotations --- internal/controller/tensorfusionworkload_controller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index 765fdfe..053bccb 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -275,10 +275,10 @@ var _ = Describe("TensorFusionWorkload Controller", func() { Namespace: podList.Items[0].Namespace, Name: podList.Items[0].Name, }, pod)).Should(Succeed()) - gpuName := pod.Labels[constants.GpuKey] + gpuNames := strings.Split(pod.Annotations[constants.GpuKey], ",") gpuList := tfEnv.GetPoolGpuList(0) gpu, ok := lo.Find(gpuList.Items, func(gpu tfv1.GPU) bool { - return gpu.Name == gpuName + return gpu.Name == gpuNames[0] }) g.Expect(ok).To(BeTrue()) g.Expect(gpu.Status.GPUModel).To(Equal("mock")) From dedbc25df12b108d4922ca341f6a298a4a894964 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 30 May 2025 18:19:39 +0800 Subject: [PATCH 09/12] fix: add GPUCount to workload spec in createOrUpdateWorkload function --- internal/webhook/v1/pod_webhook.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 4fe8741..2097d3a 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -211,6 +211,7 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(ctx context.Context, pod Resources: tfInfo.Profile.Resources, Qos: tfInfo.Profile.Qos, IsLocalGPU: tfInfo.Profile.IsLocalGPU, + GPUCount: tfInfo.Profile.GPUCount, } // Compare the entire spec at once From 756755c4c4d716738cef6ab18fc17b3ed524ae79 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 30 May 2025 18:38:33 +0800 Subject: [PATCH 10/12] fix: add GPUModel to workload spec in createOrUpdateWorkload function --- internal/webhook/v1/pod_webhook.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 2097d3a..2e4e839 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -183,6 +183,7 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(ctx context.Context, pod Resources: tfInfo.Profile.Resources, GPUCount: tfInfo.Profile.GPUCount, Qos: tfInfo.Profile.Qos, + GPUModel: tfInfo.Profile.GPUModel, IsLocalGPU: tfInfo.Profile.IsLocalGPU, }, } From 95f12fa509ce1fe3423f130116a35b26c26c68ad Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 30 May 2025 18:45:43 +0800 Subject: [PATCH 11/12] fix: add gpuModel field to workload and workload profile specifications --- .../crds/tensor-fusion.ai_tensorfusionworkloads.yaml | 4 ++++ .../tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml | 4 ++++ config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml | 4 ++++ config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml b/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml index a2e3892..5eb8cb9 100644 --- a/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml +++ b/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml @@ -175,6 +175,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml b/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml index 2b82a37..0afd69d 100644 --- a/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml +++ b/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml @@ -174,6 +174,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml b/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml index a2e3892..5eb8cb9 100644 --- a/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml +++ b/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml @@ -175,6 +175,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml b/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml index 2b82a37..0afd69d 100644 --- a/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml +++ b/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml @@ -174,6 +174,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false From 33a4f7d3c2f3594b3117517f0de4f1c0a4d8e785 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Fri, 30 May 2025 18:50:05 +0800 Subject: [PATCH 12/12] fix: add GPUModel to workload spec in createOrUpdateWorkload function --- internal/webhook/v1/pod_webhook.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 2e4e839..cd46022 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -213,6 +213,7 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(ctx context.Context, pod Qos: tfInfo.Profile.Qos, IsLocalGPU: tfInfo.Profile.IsLocalGPU, GPUCount: tfInfo.Profile.GPUCount, + GPUModel: tfInfo.Profile.GPUModel, } // Compare the entire spec at once