From 8dc2621341f1884213534864317095cbff816606 Mon Sep 17 00:00:00 2001 From: haiping Date: Sun, 25 May 2025 15:55:32 +0800 Subject: [PATCH 1/3] featu: issue-187: support gpu model filter --- api/v1/workloadprofile_types.go | 4 + internal/constants/constants.go | 3 + .../tensorfusionworkload_controller.go | 2 +- .../tensorfusionworkload_controller_test.go | 27 +++++ .../gpuallocator/filter/gpu_model_filter.go | 34 ++++++ .../filter/gpu_model_filter_test.go | 102 ++++++++++++++++++ internal/gpuallocator/gpuallocator.go | 7 ++ internal/gpuallocator/gpuallocator_test.go | 29 +++-- internal/webhook/v1/tf_parser.go | 6 ++ 9 files changed, 207 insertions(+), 7 deletions(-) create mode 100644 internal/gpuallocator/filter/gpu_model_filter.go create mode 100644 internal/gpuallocator/filter/gpu_model_filter_test.go diff --git a/api/v1/workloadprofile_types.go b/api/v1/workloadprofile_types.go index 9e20610..f3d4b17 100644 --- a/api/v1/workloadprofile_types.go +++ b/api/v1/workloadprofile_types.go @@ -49,6 +49,10 @@ type WorkloadProfileSpec struct { // Schedule the workload to the same GPU server that runs vGPU worker for best performance, default to false IsLocalGPU bool `json:"isLocalGPU,omitempty"` + // +optional + // GPUModel specifies the required GPU model (e.g., "A100", "H100") + GPUModel string `json:"gpuModel,omitempty"` + // +optional // TODO, not implemented // The number of GPUs to be used by the workload, default to 1 diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 275d9ac..ed60ae7 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -49,6 +49,9 @@ const ( AutoScaleRequestsAnnotation = Domain + "/auto-requests" AutoScaleReplicasAnnotation = Domain + "/auto-replicas" + // GPUModelAnnotation specifies the required GPU model (e.g., "A100", "H100") + GPUModelAnnotation = Domain + "/gpu-model" + GpuReleasedAnnotation = Domain + "/gpu-released" TensorFusionPodCounterKeyAnnotation = Domain + "/pod-counter-key" diff --git a/internal/controller/tensorfusionworkload_controller.go b/internal/controller/tensorfusionworkload_controller.go index 472d12b..d109614 100644 --- a/internal/controller/tensorfusionworkload_controller.go +++ b/internal/controller/tensorfusionworkload_controller.go @@ -344,7 +344,7 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor // Create worker pods for range count { // Schedule GPU for the worker - gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, 1) + gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, 1, workload.Spec.GPUModel) if err != nil { r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err) return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index a36a00e..f7b3e53 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -189,6 +189,33 @@ var _ = Describe("TensorFusionWorkload Controller", func() { }) }) + Context("When specifying GPU model in workload", func() { + It("Should allocate GPUs of the specified model", func() { + pool := tfEnv.GetGPUPool(0) + + // Create a workload requesting specific GPU model + workload := createTensorFusionWorkload(pool.Name, key, 1) + workload.Spec.GPUModel = "A100" + Expect(k8sClient.Update(ctx, workload)).To(Succeed()) + + checkWorkerPodCount(workload) + checkWorkloadStatus(workload) + + // Verify pods got GPUs of the correct model + podList := &corev1.PodList{} + Eventually(func(g Gomega) { + g.Expect(k8sClient.List(ctx, podList, + client.InNamespace(key.Namespace), + client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed()) + g.Expect(podList.Items).Should(HaveLen(1)) + + // Check if pod has the correct GPU model annotation + pod := podList.Items[0] + g.Expect(pod.Annotations[constants.GPUModelAnnotation]).To(Equal("A100")) + }, timeout, interval).Should(Succeed()) + }) + }) + Context("When deleting workload directly", func() { It("Should delete all pods and the workload itself", func() { pool := tfEnv.GetGPUPool(0) diff --git a/internal/gpuallocator/filter/gpu_model_filter.go b/internal/gpuallocator/filter/gpu_model_filter.go new file mode 100644 index 0000000..8d286fc --- /dev/null +++ b/internal/gpuallocator/filter/gpu_model_filter.go @@ -0,0 +1,34 @@ +package filter + +import ( + "context" + + tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" +) + +// GPUModelFilter filters GPUs based on their model (e.g., A100, H100) +type GPUModelFilter struct { + requiredModel string +} + +// NewGPUModelFilter creates a new filter that matches GPUs with the specified model +func NewGPUModelFilter(model string) *GPUModelFilter { + return &GPUModelFilter{ + requiredModel: model, + } +} + +// Filter implements GPUFilter interface +func (f *GPUModelFilter) Filter(ctx context.Context, gpus []tfv1.GPU) ([]tfv1.GPU, error) { + if f.requiredModel == "" { + return gpus, nil + } + + var filtered []tfv1.GPU + for _, gpu := range gpus { + if gpu.Status.GPUModel == f.requiredModel { + filtered = append(filtered, gpu) + } + } + return filtered, nil +} diff --git a/internal/gpuallocator/filter/gpu_model_filter_test.go b/internal/gpuallocator/filter/gpu_model_filter_test.go new file mode 100644 index 0000000..45fdae4 --- /dev/null +++ b/internal/gpuallocator/filter/gpu_model_filter_test.go @@ -0,0 +1,102 @@ +package filter + +import ( + "context" + "testing" + + tfv1 "github.com/NexusGPU/tensor-fusion/api/v1" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestGPUModelFilter(t *testing.T) { + tests := []struct { + name string + requiredModel string + gpus []tfv1.GPU + want int + wantErr bool + }{ + { + name: "filter A100 GPUs", + requiredModel: "A100", + gpus: []tfv1.GPU{ + { + Status: tfv1.GPUStatus{ + GPUModel: "A100", + Available: &tfv1.Resource{ + Tflops: resource.MustParse("100"), + Vram: resource.MustParse("40Gi"), + }, + }, + }, + { + Status: tfv1.GPUStatus{ + GPUModel: "H100", + Available: &tfv1.Resource{ + Tflops: resource.MustParse("200"), + Vram: resource.MustParse("80Gi"), + }, + }, + }, + }, + want: 1, + wantErr: false, + }, + { + name: "no model specified", + requiredModel: "", + gpus: []tfv1.GPU{ + { + Status: tfv1.GPUStatus{ + GPUModel: "A100", + Available: &tfv1.Resource{ + Tflops: resource.MustParse("100"), + Vram: resource.MustParse("40Gi"), + }, + }, + }, + }, + want: 1, + wantErr: false, + }, + { + name: "non-existent model", + requiredModel: "NonExistentModel", + gpus: []tfv1.GPU{ + { + Status: tfv1.GPUStatus{ + GPUModel: "A100", + Available: &tfv1.Resource{ + Tflops: resource.MustParse("100"), + Vram: resource.MustParse("40Gi"), + }, + }, + }, + }, + want: 0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := NewGPUModelFilter(tt.requiredModel) + got, err := filter.Filter(context.Background(), tt.gpus) + + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Len(t, got, tt.want) + + if tt.want > 0 && tt.requiredModel != "" { + for _, gpu := range got { + assert.Equal(t, tt.requiredModel, gpu.Status.GPUModel) + } + } + }) + } +} diff --git a/internal/gpuallocator/gpuallocator.go b/internal/gpuallocator/gpuallocator.go index 9aa7980..81db551 100644 --- a/internal/gpuallocator/gpuallocator.go +++ b/internal/gpuallocator/gpuallocator.go @@ -53,12 +53,19 @@ func (s *GpuAllocator) Alloc( poolName string, request tfv1.Resource, count uint, + gpuModel string, ) ([]*tfv1.GPU, error) { // Get GPUs from the pool using the in-memory store poolGPUs := s.listGPUsFromPool(poolName) // Add SameNodeFilter if count > 1 to ensure GPUs are from the same node filterRegistry := s.filterRegistry.With(filter.NewResourceFilter(request)) + + // Add GPU model filter if specified + if gpuModel != "" { + filterRegistry = filterRegistry.With(filter.NewGPUModelFilter(gpuModel)) + } + if count > 1 { filterRegistry = filterRegistry.With(filter.NewSameNodeFilter(count)) } diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index 5f7d472..3ca95a3 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -59,7 +59,7 @@ var _ = Describe("GPU Allocator", func() { Vram: resource.MustParse("8Gi"), } - gpus, err := allocator.Alloc(ctx, "test-pool", request, 1) + gpus, err := allocator.Alloc(ctx, "test-pool", request, 1, "") Expect(err).NotTo(HaveOccurred()) Expect(gpus).To(HaveLen(1)) @@ -78,7 +78,7 @@ var _ = Describe("GPU Allocator", func() { Vram: resource.MustParse("4Gi"), } - gpus, err := allocator.Alloc(ctx, "test-pool", request, 2) + gpus, err := allocator.Alloc(ctx, "test-pool", request, 2, "") Expect(err).NotTo(HaveOccurred()) Expect(gpus).To(HaveLen(2)) @@ -95,7 +95,7 @@ var _ = Describe("GPU Allocator", func() { Vram: resource.MustParse("2Gi"), } - _, err := allocator.Alloc(ctx, "test-pool", request, 10) + _, err := allocator.Alloc(ctx, "test-pool", request, 10, "") Expect(err).To(HaveOccurred()) }) @@ -105,7 +105,7 @@ var _ = Describe("GPU Allocator", func() { Vram: resource.MustParse("64Gi"), } - _, err := allocator.Alloc(ctx, "test-pool", request, 1) + _, err := allocator.Alloc(ctx, "test-pool", request, 1, "") Expect(err).To(HaveOccurred()) }) @@ -115,7 +115,24 @@ var _ = Describe("GPU Allocator", func() { Vram: resource.MustParse("2Gi"), } - _, err := allocator.Alloc(ctx, "nonexistent-pool", request, 1) + _, err := allocator.Alloc(ctx, "nonexistent-pool", request, 1, "") + Expect(err).To(HaveOccurred()) + }) + + It("should filter GPUs by model", func() { + request := tfv1.Resource{ + Tflops: resource.MustParse("50"), + Vram: resource.MustParse("8Gi"), + } + + // Try allocating with a specific GPU model + gpus, err := allocator.Alloc(ctx, "test-pool", request, 1, "A100") + Expect(err).NotTo(HaveOccurred()) + Expect(gpus).To(HaveLen(1)) + Expect(gpus[0].Status.GPUModel).To(Equal("A100")) + + // Try allocating with a non-existent GPU model + _, err = allocator.Alloc(ctx, "test-pool", request, 1, "NonExistentModel") Expect(err).To(HaveOccurred()) }) }) @@ -128,7 +145,7 @@ var _ = Describe("GPU Allocator", func() { Vram: resource.MustParse("6Gi"), } - gpus, err := allocator.Alloc(ctx, "test-pool", request, 1) + gpus, err := allocator.Alloc(ctx, "test-pool", request, 1, "") Expect(err).NotTo(HaveOccurred()) Expect(gpus).To(HaveLen(1)) diff --git a/internal/webhook/v1/tf_parser.go b/internal/webhook/v1/tf_parser.go index 0b50992..47d7e4b 100644 --- a/internal/webhook/v1/tf_parser.go +++ b/internal/webhook/v1/tf_parser.go @@ -21,6 +21,7 @@ type TFResource struct { VramRequest resource.Quantity TflopsLimit resource.Quantity VramLimit resource.Quantity + GPUModel string // Required GPU model (e.g., A100, H100) } type TensorFusionInfo struct { @@ -138,6 +139,11 @@ func ParseTensorFusionInfo(ctx context.Context, k8sClient client.Client, pod *co return info, fmt.Errorf("inject container not found") } + gpuModel, ok := pod.Annotations[constants.GPUModelAnnotation] + if ok { + workloadProfile.Spec.GPUModel = gpuModel + } + info.Profile = &workloadProfile.Spec info.ContainerNames = containerNames return info, nil From 09a5f730bc2113a92f7b16db67e9465960686475 Mon Sep 17 00:00:00 2001 From: haiping Date: Tue, 27 May 2025 00:18:42 +0800 Subject: [PATCH 2/3] fix lint & test --- internal/controller/tensorfusionworkload_controller_test.go | 6 +++--- internal/gpuallocator/filter/gpu_model_filter_test.go | 3 --- internal/gpuallocator/gpuallocator_test.go | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index 79a0f6e..43ee38a 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -192,10 +192,10 @@ var _ = Describe("TensorFusionWorkload Controller", func() { Context("When specifying GPU model in workload", func() { It("Should allocate GPUs of the specified model", func() { pool := tfEnv.GetGPUPool(0) - + // Create a workload requesting specific GPU model workload := createTensorFusionWorkload(pool.Name, key, 1) - workload.Spec.GPUModel = "A100" + workload.Spec.GPUModel = "NVIDIA A100" Expect(k8sClient.Update(ctx, workload)).To(Succeed()) checkWorkerPodCount(workload) @@ -211,7 +211,7 @@ var _ = Describe("TensorFusionWorkload Controller", func() { // Check if pod has the correct GPU model annotation pod := podList.Items[0] - g.Expect(pod.Annotations[constants.GPUModelAnnotation]).To(Equal("A100")) + g.Expect(pod.Annotations[constants.GPUModelAnnotation]).To(Equal("NVIDIA A100")) }, timeout, interval).Should(Succeed()) }) }) diff --git a/internal/gpuallocator/filter/gpu_model_filter_test.go b/internal/gpuallocator/filter/gpu_model_filter_test.go index 45fdae4..ba4bc3c 100644 --- a/internal/gpuallocator/filter/gpu_model_filter_test.go +++ b/internal/gpuallocator/filter/gpu_model_filter_test.go @@ -83,15 +83,12 @@ func TestGPUModelFilter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { filter := NewGPUModelFilter(tt.requiredModel) got, err := filter.Filter(context.Background(), tt.gpus) - if tt.wantErr { assert.Error(t, err) return } - assert.NoError(t, err) assert.Len(t, got, tt.want) - if tt.want > 0 && tt.requiredModel != "" { for _, gpu := range got { assert.Equal(t, tt.requiredModel, gpu.Status.GPUModel) diff --git a/internal/gpuallocator/gpuallocator_test.go b/internal/gpuallocator/gpuallocator_test.go index 3ca95a3..cd4c235 100644 --- a/internal/gpuallocator/gpuallocator_test.go +++ b/internal/gpuallocator/gpuallocator_test.go @@ -126,10 +126,10 @@ var _ = Describe("GPU Allocator", func() { } // Try allocating with a specific GPU model - gpus, err := allocator.Alloc(ctx, "test-pool", request, 1, "A100") + gpus, err := allocator.Alloc(ctx, "test-pool", request, 1, "NVIDIA A100") Expect(err).NotTo(HaveOccurred()) Expect(gpus).To(HaveLen(1)) - Expect(gpus[0].Status.GPUModel).To(Equal("A100")) + Expect(gpus[0].Status.GPUModel).To(Equal("NVIDIA A100")) // Try allocating with a non-existent GPU model _, err = allocator.Alloc(ctx, "test-pool", request, 1, "NonExistentModel") From db06f49148199b94aedb1aa0715eca439f71b50f Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Tue, 27 May 2025 09:43:18 +0800 Subject: [PATCH 3/3] test: update GPU model verification to use actual GPU resource instead of annotation --- .../tensorfusionworkload_controller_test.go | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/controller/tensorfusionworkload_controller_test.go b/internal/controller/tensorfusionworkload_controller_test.go index 43ee38a..56d8f1e 100644 --- a/internal/controller/tensorfusionworkload_controller_test.go +++ b/internal/controller/tensorfusionworkload_controller_test.go @@ -195,23 +195,43 @@ var _ = Describe("TensorFusionWorkload Controller", func() { // Create a workload requesting specific GPU model workload := createTensorFusionWorkload(pool.Name, key, 1) - workload.Spec.GPUModel = "NVIDIA A100" - Expect(k8sClient.Update(ctx, workload)).To(Succeed()) + Eventually(func(g Gomega) { + // Get the latest version of the workload + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload)).To(Succeed()) + // Set the GPU model + workload.Spec.GPUModel = "mock" + // Update the workload + g.Expect(k8sClient.Update(ctx, workload)).To(Succeed()) + }, timeout, interval).Should(Succeed()) checkWorkerPodCount(workload) checkWorkloadStatus(workload) // Verify pods got GPUs of the correct model podList := &corev1.PodList{} + // First make sure the pod exists Eventually(func(g Gomega) { g.Expect(k8sClient.List(ctx, podList, client.InNamespace(key.Namespace), client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed()) g.Expect(podList.Items).Should(HaveLen(1)) + }, timeout, interval).Should(Succeed()) - // Check if pod has the correct GPU model annotation - pod := podList.Items[0] - g.Expect(pod.Annotations[constants.GPUModelAnnotation]).To(Equal("NVIDIA A100")) + // Now check if the pod has the correct GPU + Eventually(func(g Gomega) { + // Get the latest version of the pod + pod := &corev1.Pod{} + g.Expect(k8sClient.Get(ctx, client.ObjectKey{ + Namespace: podList.Items[0].Namespace, + Name: podList.Items[0].Name, + }, pod)).Should(Succeed()) + gpuName := pod.Labels[constants.GpuKey] + gpuList := tfEnv.GetPoolGpuList(0) + gpu, ok := lo.Find(gpuList.Items, func(gpu tfv1.GPU) bool { + return gpu.Name == gpuName + }) + g.Expect(ok).To(BeTrue()) + g.Expect(gpu.Status.GPUModel).To(Equal("mock")) }, timeout, interval).Should(Succeed()) }) })