diff --git a/api/v1/workloadprofile_types.go b/api/v1/workloadprofile_types.go index f3d4b17..548f871 100644 --- a/api/v1/workloadprofile_types.go +++ b/api/v1/workloadprofile_types.go @@ -39,8 +39,8 @@ type WorkloadProfileSpec struct { PoolName string `json:"poolName,omitempty"` // +optional - Resources Resources `json:"resources,omitempty"` + Resources Resources `json:"resources,omitempty"` // +optional // Qos defines the quality of service level for the client. Qos QoSLevel `json:"qos,omitempty"` @@ -53,10 +53,8 @@ type WorkloadProfileSpec struct { // GPUModel specifies the required GPU model (e.g., "A100", "H100") GPUModel string `json:"gpuModel,omitempty"` - // +optional - // TODO, not implemented // The number of GPUs to be used by the workload, default to 1 - GPUCount int `json:"gpuCount,omitempty"` + GPUCount uint `json:"gpuCount,omitempty"` // +optional // TODO, not implemented diff --git a/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml b/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml index a2e3892..5eb8cb9 100644 --- a/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml +++ b/charts/tensor-fusion/crds/tensor-fusion.ai_tensorfusionworkloads.yaml @@ -175,6 +175,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml b/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml index 2b82a37..0afd69d 100644 --- a/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml +++ b/charts/tensor-fusion/crds/tensor-fusion.ai_workloadprofiles.yaml @@ -174,6 +174,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml b/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml index a2e3892..5eb8cb9 100644 --- a/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml +++ b/config/crd/bases/tensor-fusion.ai_tensorfusionworkloads.yaml @@ -175,6 +175,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml b/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml index 2b82a37..0afd69d 100644 --- a/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml +++ b/config/crd/bases/tensor-fusion.ai_workloadprofiles.yaml @@ -174,6 +174,10 @@ spec: description: The number of GPUs to be used by the workload, default to 1 type: integer + gpuModel: + description: GPUModel specifies the required GPU model (e.g., "A100", + "H100") + type: string isLocalGPU: description: Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false diff --git a/internal/controller/tensorfusionworkload_controller.go b/internal/controller/tensorfusionworkload_controller.go index d109614..1823312 100644 --- a/internal/controller/tensorfusionworkload_controller.go +++ b/internal/controller/tensorfusionworkload_controller.go @@ -20,11 +20,13 @@ import ( "context" "fmt" "sort" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -217,12 +219,12 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl func (r *TensorFusionWorkloadReconciler) tryStartWorker( ctx context.Context, workerGenerator *worker.WorkerGenerator, - gpu *tfv1.GPU, + gpus []*tfv1.GPU, workload *tfv1.TensorFusionWorkload, hash string, ) (*corev1.Pod, error) { port := workerGenerator.AllocPort() - pod, hash, err := workerGenerator.GenerateWorkerPod(gpu, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits, hash) + pod, hash, err := workerGenerator.GenerateWorkerPod(gpus, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits, hash) if err != nil { return nil, fmt.Errorf("generate worker pod %w", err) } @@ -231,9 +233,18 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker( if pod.Labels == nil { pod.Labels = make(map[string]string) } + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + + gpuNames := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) string { + return gpu.Name + }) + pod.Labels[constants.WorkloadKey] = workload.Name - pod.Labels[constants.GpuKey] = gpu.Name pod.Labels[constants.LabelKeyPodTemplateHash] = hash + pod.Annotations[constants.GpuKey] = strings.Join(gpuNames, ",") // Add finalizer for GPU resource cleanup pod.Finalizers = append(pod.Finalizers, constants.Finalizer) @@ -269,6 +280,7 @@ func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, w metrics.GpuTflopsLimit.Delete(labels) metrics.VramBytesRequest.Delete(labels) metrics.VramBytesLimit.Delete(labels) + metrics.GpuCount.Delete(labels) } return nil } @@ -279,26 +291,24 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context log.Info("Processing pod with GPU resource cleanup finalizer", "pod", pod.Name) - // Get GPU name from pod label - gpuName, ok := pod.Labels[constants.GpuKey] + // read the GPU names from the pod annotations + gpuNamesStr, ok := pod.Annotations[constants.GpuKey] if !ok { log.Info("Pod has finalizer but no GPU label", "pod", pod.Name) return true, nil } - // Get the GPU - gpu := &tfv1.GPU{} - if err := r.Get(ctx, client.ObjectKey{Name: gpuName}, gpu); err != nil { - if errors.IsNotFound(err) { - // GPU not found, just continue - log.Info("GPU not found", "gpu", gpuName, "pod", pod.Name) - return true, nil - } - // Error getting GPU, retry later - log.Error(err, "Failed to get GPU", "gpu", gpuName, "pod", pod.Name) + // Split GPU names by comma + gpuNames := strings.Split(gpuNamesStr, ",") + gpus := lo.Map(gpuNames, func(gpuName string, _ int) types.NamespacedName { + return types.NamespacedName{Name: gpuName} + }) + // Release GPU resources + if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpus); err != nil { + log.Error(err, "Failed to release GPU resources, will retry", "gpus", gpus, "pod", pod.Name) return false, err } - + log.Info("Released GPU resources via finalizer", "gpus", gpus, "pod", pod.Name) if pod.Annotations == nil { pod.Annotations = make(map[string]string) } @@ -310,17 +320,10 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context // not yet reflecting the finalizer's removal), Then this r.Update pod will fail. // Will not cause duplicate releases if err := r.Update(ctx, pod); err != nil { - log.Error(err, "Failed to mark that GPU cleanup of pod", "gpu", gpuName, "pod", pod.Name) - return false, err - } - - // Release GPU resources - if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu); err != nil { - log.Error(err, "Failed to release GPU resources, will retry", "gpu", gpuName, "pod", pod.Name) + log.Error(err, "Failed to mark that GPU cleanup of pod") return false, err } - log.Info("Released GPU resources via finalizer", "gpu", gpuName, "pod", pod.Name) return true, nil } @@ -344,21 +347,21 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor // Create worker pods for range count { // Schedule GPU for the worker - gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, 1, workload.Spec.GPUModel) + gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, workload.Spec.GPUCount, workload.Spec.GPUModel) if err != nil { r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err) return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil } - // Use the first GPU from the allocated array - gpu := gpus[0] - - pod, err := r.tryStartWorker(ctx, workerGenerator, gpu, workload, hash) + pod, err := r.tryStartWorker(ctx, workerGenerator, gpus, workload, hash) if err != nil { - // Try to release the GPU resource if pod creation fails - releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu) + // Try to release all allocated GPUs if pod creation fails + gpus := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) types.NamespacedName { + return client.ObjectKeyFromObject(gpu) + }) + releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpus) if releaseErr != nil { - log.Error(releaseErr, "Failed to release GPU after pod creation failure") + log.Error(releaseErr, "Failed to release GPU after pod creation failure", "gpus", gpus) } return ctrl.Result{}, fmt.Errorf("create worker pod: %w", err) } @@ -372,6 +375,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor metrics.GpuTflopsLimit.With(labels).Set(workload.Spec.Resources.Limits.Tflops.AsApproximateFloat64()) metrics.VramBytesRequest.With(labels).Set(workload.Spec.Resources.Requests.Vram.AsApproximateFloat64()) metrics.VramBytesLimit.With(labels).Set(workload.Spec.Resources.Limits.Vram.AsApproximateFloat64()) + metrics.GpuCount.With(labels).Set(float64(workload.Spec.GPUCount)) } return ctrl.Result{}, nil diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index 56d8f1e..053bccb 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "strings" "time" "github.com/aws/smithy-go/ptr" @@ -57,6 +58,51 @@ var _ = Describe("TensorFusionWorkload Controller", func() { checkWorkerPodCount(workload) checkWorkloadStatus(workload) }) + + It("Should allocate multiple GPUs per workload when GPUCount > 1", func() { + pool := tfEnv.GetGPUPool(0) + By("creating a workload that requests 2 GPUs") + workload := &tfv1.TensorFusionWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + Labels: map[string]string{ + constants.LabelKeyOwner: pool.Name, + }, + }, + Spec: tfv1.WorkloadProfileSpec{ + Replicas: ptr.Int32(1), + PoolName: pool.Name, + GPUCount: 2, + Resources: tfv1.Resources{ + Requests: tfv1.Resource{ + Tflops: resource.MustParse("10"), + Vram: resource.MustParse("8Gi"), + }, + Limits: tfv1.Resource{ + Tflops: resource.MustParse("20"), + Vram: resource.MustParse("16Gi"), + }, + }, + }, + } + + Expect(k8sClient.Create(ctx, workload)).To(Succeed()) + + // Check that pod is created with 2 GPUs + podList := &corev1.PodList{} + Eventually(func(g Gomega) { + g.Expect(k8sClient.List(ctx, podList, + client.InNamespace(key.Namespace), + client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed()) + g.Expect(podList.Items).Should(HaveLen(1)) + + gpuNames := strings.Split(podList.Items[0].Annotations[constants.GpuKey], ",") + g.Expect(gpuNames).Should(HaveLen(2)) + }, timeout, interval).Should(Succeed()) + + checkWorkloadStatus(workload) + }) }) Context("When scaling up a workload", func() { @@ -214,6 +260,10 @@ var _ = Describe("TensorFusionWorkload Controller", func() { g.Expect(k8sClient.List(ctx, podList, client.InNamespace(key.Namespace), client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed()) + // Filter out pods that are being deleted + podList.Items = lo.Filter(podList.Items, func(pod corev1.Pod, _ int) bool { + return pod.DeletionTimestamp == nil + }) g.Expect(podList.Items).Should(HaveLen(1)) }, timeout, interval).Should(Succeed()) @@ -225,10 +275,10 @@ var _ = Describe("TensorFusionWorkload Controller", func() { Namespace: podList.Items[0].Namespace, Name: podList.Items[0].Name, }, pod)).Should(Succeed()) - gpuName := pod.Labels[constants.GpuKey] + gpuNames := strings.Split(pod.Annotations[constants.GpuKey], ",") gpuList := tfEnv.GetPoolGpuList(0) gpu, ok := lo.Find(gpuList.Items, func(gpu tfv1.GPU) bool { - return gpu.Name == gpuName + return gpu.Name == gpuNames[0] }) g.Expect(ok).To(BeTrue()) g.Expect(gpu.Status.GPUModel).To(Equal("mock")) diff --git a/internal/gpuallocator/gpuallocator.go b/internal/gpuallocator/gpuallocator.go index 81db551..c9ddbd7 100644 --- a/internal/gpuallocator/gpuallocator.go +++ b/internal/gpuallocator/gpuallocator.go @@ -128,25 +128,26 @@ func (s *GpuAllocator) Alloc( return result, nil } -// Dealloc deallocates a request from a gpu. -func (s *GpuAllocator) Dealloc(ctx context.Context, request tfv1.Resource, gpu *tfv1.GPU) error { +// Dealloc deallocates a request from one or multiple gpus. +func (s *GpuAllocator) Dealloc(ctx context.Context, request tfv1.Resource, gpus []types.NamespacedName) error { log := log.FromContext(ctx) s.storeMutex.Lock() defer s.storeMutex.Unlock() - // Get the GPU from the store - key := types.NamespacedName{Name: gpu.Name, Namespace: gpu.Namespace} - storeGPU, exists := s.gpuStore[key] - if !exists { - log.Info("GPU not found in store during deallocation", "name", key.String()) - return fmt.Errorf("GPU %s not found in store", key.String()) - } + for _, gpu := range gpus { + // Get the GPU from the store + storeGPU, exists := s.gpuStore[gpu] + if !exists { + log.Error(fmt.Errorf("GPU not found in store"), "Failed to deallocate GPU", "name", gpu.String()) + continue + } - // Add resources back to the GPU - storeGPU.Status.Available.Tflops.Add(request.Tflops) - storeGPU.Status.Available.Vram.Add(request.Vram) + // Add resources back to the GPU + storeGPU.Status.Available.Tflops.Add(request.Tflops) + storeGPU.Status.Available.Vram.Add(request.Vram) - s.markGPUDirty(key) + s.markGPUDirty(gpu) + } return nil } diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index cd4c235..9e15076 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -23,9 +23,11 @@ import ( "github.com/NexusGPU/tensor-fusion/internal/constants" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/samber/lo" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = Describe("GPU Allocator", func() { @@ -155,7 +157,7 @@ var _ = Describe("GPU Allocator", func() { allocatedVram := allocatedGPU.Status.Available.Vram.DeepCopy() // Now deallocate - err = allocator.Dealloc(ctx, request, allocatedGPU) + err = allocator.Dealloc(ctx, request, []types.NamespacedName{client.ObjectKeyFromObject(gpus[0])}) Expect(err).NotTo(HaveOccurred()) // Verify resources were restored @@ -165,9 +167,69 @@ var _ = Describe("GPU Allocator", func() { expectedTflops.Add(request.Tflops) expectedVram.Add(request.Vram) - Expect(deallocatedGPU.Status.Available.Tflops.Cmp(allocatedTflops)).To(Equal(1)) + Expect(deallocatedGPU.Status.Available.Tflops.Cmp(expectedTflops)).To(Equal(0)) + Expect(deallocatedGPU.Status.Available.Vram.Cmp(expectedVram)).To(Equal(0)) Expect(deallocatedGPU.Status.Available.Vram.Cmp(allocatedVram)).To(Equal(1)) }) + + It("should continue deallocating when some GPUs don't exist", func() { + // First allocate resources to multiple GPUs + request := tfv1.Resource{ + Tflops: resource.MustParse("20"), + Vram: resource.MustParse("4Gi"), + } + + // Allocate 2 GPUs + allocatedGPUs, err := allocator.Alloc(ctx, "test-pool", request, 2, "") + Expect(err).NotTo(HaveOccurred()) + Expect(allocatedGPUs).To(HaveLen(2)) + + // Create a non-existent GPU + nonExistentGPU := &tfv1.GPU{ + ObjectMeta: metav1.ObjectMeta{ + Name: "non-existent-gpu", + Namespace: "default", + }, + } + + // Add the non-existent GPU to the list + gpusToDealloc := append(allocatedGPUs, nonExistentGPU) + + // Store the allocated values for existing GPUs + initialStates := make(map[string]struct { + tflops resource.Quantity + vram resource.Quantity + }) + for _, gpu := range allocatedGPUs { + initialStates[gpu.Name] = struct { + tflops resource.Quantity + vram resource.Quantity + }{ + tflops: gpu.Status.Available.Tflops.DeepCopy(), + vram: gpu.Status.Available.Vram.DeepCopy(), + } + } + gpusToDeallocKeys := lo.Map(gpusToDealloc, func(gpu *tfv1.GPU, _ int) types.NamespacedName { + return client.ObjectKeyFromObject(gpu) + }) + // Now deallocate all GPUs including the non-existent one + err = allocator.Dealloc(ctx, request, gpusToDeallocKeys) + Expect(err).NotTo(HaveOccurred()) + + // Verify resources were restored for existing GPUs + for _, allocatedGPU := range allocatedGPUs { + deallocatedGPU := getGPU(allocatedGPU.Name, allocatedGPU.Namespace) + initialState := initialStates[allocatedGPU.Name] + + expectedTflops := initialState.tflops.DeepCopy() + expectedVram := initialState.vram.DeepCopy() + expectedTflops.Add(request.Tflops) + expectedVram.Add(request.Vram) + + Expect(deallocatedGPU.Status.Available.Tflops.Cmp(initialState.tflops)).To(Equal(1)) + Expect(deallocatedGPU.Status.Available.Vram.Cmp(initialState.vram)).To(Equal(1)) + } + }) }) Context("Event Handling", func() { diff --git a/internal/metrics/worker.go b/internal/metrics/worker.go index f28d7d8..3e5bf84 100644 --- a/internal/metrics/worker.go +++ b/internal/metrics/worker.go @@ -42,6 +42,14 @@ var ( labels, ) + GpuCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gpu_count", + Help: "Number of GPUs allocated to the workload", + }, + labels, + ) + AllocatedTflopsPercent = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "allocated_compute_percentage", @@ -58,5 +66,13 @@ var ( ) func init() { - metrics.Registry.MustRegister(GpuTflopsRequest, GpuTflopsLimit, VramBytesRequest, VramBytesLimit) + metrics.Registry.MustRegister( + GpuTflopsRequest, + GpuTflopsLimit, + VramBytesRequest, + VramBytesLimit, + AllocatedTflopsPercent, + AllocatedVramBytes, + GpuCount, + ) } diff --git a/internal/webhook/v1/pod_webhook.go b/internal/webhook/v1/pod_webhook.go index 5fa3a09..cd46022 100644 --- a/internal/webhook/v1/pod_webhook.go +++ b/internal/webhook/v1/pod_webhook.go @@ -181,7 +181,9 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(ctx context.Context, pod Replicas: &replicas, PoolName: tfInfo.Profile.PoolName, Resources: tfInfo.Profile.Resources, + GPUCount: tfInfo.Profile.GPUCount, Qos: tfInfo.Profile.Qos, + GPUModel: tfInfo.Profile.GPUModel, IsLocalGPU: tfInfo.Profile.IsLocalGPU, }, } @@ -210,6 +212,8 @@ func (m *TensorFusionPodMutator) createOrUpdateWorkload(ctx context.Context, pod Resources: tfInfo.Profile.Resources, Qos: tfInfo.Profile.Qos, IsLocalGPU: tfInfo.Profile.IsLocalGPU, + GPUCount: tfInfo.Profile.GPUCount, + GPUModel: tfInfo.Profile.GPUModel, } // Compare the entire spec at once diff --git a/internal/webhook/v1/tf_parser.go b/internal/webhook/v1/tf_parser.go index 47d7e4b..860c183 100644 --- a/internal/webhook/v1/tf_parser.go +++ b/internal/webhook/v1/tf_parser.go @@ -107,7 +107,7 @@ func ParseTensorFusionInfo(ctx context.Context, k8sClient client.Client, pod *co if err != nil { return info, fmt.Errorf("invalid gpuCount value: %w", err) } - workloadProfile.Spec.GPUCount = int(val) + workloadProfile.Spec.GPUCount = uint(val) } localGPU, ok := pod.Annotations[constants.IsLocalGPUAnnotation] diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 08a27d1..1d44717 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "strconv" + "strings" "time" tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" @@ -56,7 +57,7 @@ func (wg *WorkerGenerator) PodTemplateHash(workloadSpec any) (string, error) { } func (wg *WorkerGenerator) GenerateWorkerPod( - gpu *tfv1.GPU, + gpus []*tfv1.GPU, generateName string, namespace string, port int, @@ -69,10 +70,10 @@ func (wg *WorkerGenerator) GenerateWorkerPod( return nil, "", fmt.Errorf("failed to unmarshal pod template: %w", err) } spec := podTmpl.Template.Spec - if spec.NodeSelector == nil { - spec.NodeSelector = make(map[string]string) - } - spec.NodeSelector = gpu.Status.NodeSelector + + // all the gpus are on the same node + spec.NodeSelector = gpus[0].Status.NodeSelector + spec.Volumes = append(spec.Volumes, corev1.Volume{ Name: constants.DataVolumeName, VolumeSource: corev1.VolumeSource{ @@ -81,35 +82,62 @@ func (wg *WorkerGenerator) GenerateWorkerPod( }, }, }) + spec.Containers[0].VolumeMounts = append(spec.Containers[0].VolumeMounts, corev1.VolumeMount{ Name: constants.DataVolumeName, MountPath: constants.TFDataPath, SubPathExpr: fmt.Sprintf("${%s}", constants.WorkerPodNameEnv), }) + firstGPU := gpus[0] info, ok := lo.Find(*wg.GpuInfos, func(info config.GpuInfo) bool { - return info.FullModelName == gpu.Status.GPUModel + return info.FullModelName == firstGPU.Status.GPUModel }) if !ok { - return nil, "", fmt.Errorf("gpu info(%s) not found", gpu.Status.GPUModel) + return nil, "", fmt.Errorf("gpu info(%s) not found", firstGPU.Status.GPUModel) } + gpuUUIDs := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) string { + return gpu.Status.UUID + }) + spec.Containers[0].Env = append(spec.Containers[0].Env, corev1.EnvVar{ Name: "NVIDIA_VISIBLE_DEVICES", - Value: gpu.Status.UUID, + Value: strings.Join(gpuUUIDs, ","), }, corev1.EnvVar{ Name: constants.WorkerPortEnv, Value: strconv.Itoa(port), }, corev1.EnvVar{ - Name: constants.WorkerCudaUpLimitTflopsEnv, - Value: strconv.FormatInt(limits.Tflops.Value(), 10), + Name: constants.WorkerCudaUpLimitTflopsEnv, + Value: func() string { + tflopsMap := make(map[string]int64) + for _, gpu := range gpus { + tflopsMap[gpu.Status.UUID] = limits.Tflops.Value() + } + jsonBytes, _ := json.Marshal(tflopsMap) + return string(jsonBytes) + }(), }, corev1.EnvVar{ - Name: constants.WorkerCudaUpLimitEnv, - Value: strconv.FormatInt(int64(math.Ceil(float64(limits.Tflops.Value())/float64(info.Fp16TFlops.Value())*100)), 10), + Name: constants.WorkerCudaUpLimitEnv, + Value: func() string { + upLimitMap := make(map[string]int64) + for _, gpu := range gpus { + upLimitMap[gpu.Status.UUID] = int64(math.Ceil(float64(limits.Tflops.Value()) / float64(info.Fp16TFlops.Value()) * 100)) + } + jsonBytes, _ := json.Marshal(upLimitMap) + return string(jsonBytes) + }(), }, corev1.EnvVar{ Name: constants.WorkerCudaMemLimitEnv, // bytesize - Value: strconv.FormatInt(limits.Vram.Value(), 10), + Value: func() string { + memLimitMap := make(map[string]int64) + for _, gpu := range gpus { + memLimitMap[gpu.Status.UUID] = limits.Vram.Value() + } + jsonBytes, _ := json.Marshal(memLimitMap) + return string(jsonBytes) + }(), }, corev1.EnvVar{ Name: constants.WorkerPodNameEnv, ValueFrom: &corev1.EnvVarSource{ @@ -118,6 +146,7 @@ func (wg *WorkerGenerator) GenerateWorkerPod( }, }, }) + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ GenerateName: generateName, @@ -136,63 +165,46 @@ func SelectWorker( if len(workload.Status.WorkerStatuses) == 0 { return nil, fmt.Errorf("no available worker") } - usageMapping := make(map[string]int, len(workload.Status.WorkerStatuses)) - for _, workerStatus := range workload.Status.WorkerStatuses { - usageMapping[workerStatus.WorkerName] = 0 - } + + usageMapping := lo.SliceToMap(workload.Status.WorkerStatuses, func(status tfv1.WorkerStatus) (string, int) { + return status.WorkerName, 0 + }) connectionList := tfv1.TensorFusionConnectionList{} if err := k8sClient.List(ctx, &connectionList, client.MatchingLabels{constants.WorkloadKey: workload.Name}); err != nil { return nil, fmt.Errorf("list TensorFusionConnection: %w", err) } - for _, connection := range connectionList.Items { - if connection.Status.WorkerName != "" { - usageMapping[connection.Status.WorkerName]++ - } - } - - // First find the minimum usage - minUsage := int(^uint(0) >> 1) - // Initialize with max int value - for _, workerStatus := range workload.Status.WorkerStatuses { - if workerStatus.WorkerPhase == tfv1.WorkerFailed { - continue - } - usage := usageMapping[workerStatus.WorkerName] - if usage < minUsage { - minUsage = usage + lo.ForEach(connectionList.Items, func(conn tfv1.TensorFusionConnection, _ int) { + if conn.Status.WorkerName != "" { + usageMapping[conn.Status.WorkerName]++ } - } + }) - // Collect all eligible workers that are within maxSkew of the minimum usage - var eligibleWorkers []*tfv1.WorkerStatus - for _, workerStatus := range workload.Status.WorkerStatuses { - if workerStatus.WorkerPhase == tfv1.WorkerFailed { - continue - } - usage := usageMapping[workerStatus.WorkerName] - // Worker is eligible if its usage is within maxSkew of the minimum usage - if usage <= minUsage+int(maxSkew) { - eligibleWorkers = append(eligibleWorkers, &workerStatus) - } - } + // filter out failed workers and get the usage of available workers + activeWorkers := lo.Filter(workload.Status.WorkerStatuses, func(status tfv1.WorkerStatus, _ int) bool { + return status.WorkerPhase != tfv1.WorkerFailed + }) - if len(eligibleWorkers) == 0 { + if len(activeWorkers) == 0 { return nil, fmt.Errorf("no available worker") } - // Choose the worker with the minimum usage among eligible workers - selectedWorker := eligibleWorkers[0] - selectedUsage := usageMapping[selectedWorker.WorkerName] - for i := 1; i < len(eligibleWorkers); i++ { - worker := eligibleWorkers[i] - usage := usageMapping[worker.WorkerName] - if usage < selectedUsage { - selectedWorker = worker - selectedUsage = usage - } - } + // find the worker with the minimum usage + minUsage := lo.MinBy(activeWorkers, func(a, b tfv1.WorkerStatus) bool { + return usageMapping[a.WorkerName] < usageMapping[b.WorkerName] + }) + minUsageValue := usageMapping[minUsage.WorkerName] + + // collect all workers within the minimum usage plus maxSkew range + eligibleWorkers := lo.Filter(activeWorkers, func(status tfv1.WorkerStatus, _ int) bool { + return usageMapping[status.WorkerName] <= minUsageValue+int(maxSkew) + }) + + // select the worker with the minimum usage among eligible workers + selectedWorker := lo.MinBy(eligibleWorkers, func(a, b tfv1.WorkerStatus) bool { + return usageMapping[a.WorkerName] < usageMapping[b.WorkerName] + }) - return selectedWorker, nil + return &selectedWorker, nil }