Skip to content

Commit 80e65c8

Browse files
authored
Revert "feat: add GPUCount field to TensorFusionWorkload and WorkloadProfile …" (#197)
This reverts commit f58be3c.
1 parent f58be3c commit 80e65c8

File tree

9 files changed

+111
-252
lines changed

9 files changed

+111
-252
lines changed

api/v1/workloadprofile_types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ type WorkloadProfileSpec struct {
3939
PoolName string `json:"poolName,omitempty"`
4040

4141
// +optional
42-
4342
Resources Resources `json:"resources,omitempty"`
43+
4444
// +optional
4545
// Qos defines the quality of service level for the client.
4646
Qos QoSLevel `json:"qos,omitempty"`
@@ -50,8 +50,9 @@ type WorkloadProfileSpec struct {
5050
IsLocalGPU bool `json:"isLocalGPU,omitempty"`
5151

5252
// +optional
53+
// TODO, not implemented
5354
// The number of GPUs to be used by the workload, default to 1
54-
GPUCount uint `json:"gpuCount,omitempty"`
55+
GPUCount int `json:"gpuCount,omitempty"`
5556

5657
// +optional
5758
// TODO, not implemented

internal/controller/tensorfusionworkload_controller.go

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@ import (
2020
"context"
2121
"fmt"
2222
"sort"
23-
"strings"
2423

2524
corev1 "k8s.io/api/core/v1"
2625
"k8s.io/apimachinery/pkg/api/equality"
2726
"k8s.io/apimachinery/pkg/api/errors"
2827
"k8s.io/apimachinery/pkg/runtime"
29-
"k8s.io/apimachinery/pkg/types"
3028
"k8s.io/client-go/tools/record"
3129
ctrl "sigs.k8s.io/controller-runtime"
3230
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -219,12 +217,12 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
219217
func (r *TensorFusionWorkloadReconciler) tryStartWorker(
220218
ctx context.Context,
221219
workerGenerator *worker.WorkerGenerator,
222-
gpus []*tfv1.GPU,
220+
gpu *tfv1.GPU,
223221
workload *tfv1.TensorFusionWorkload,
224222
hash string,
225223
) (*corev1.Pod, error) {
226224
port := workerGenerator.AllocPort()
227-
pod, hash, err := workerGenerator.GenerateWorkerPod(gpus, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits, hash)
225+
pod, hash, err := workerGenerator.GenerateWorkerPod(gpu, fmt.Sprintf("%s-tf-worker-", workload.Name), workload.Namespace, port, workload.Spec.Resources.Limits, hash)
228226
if err != nil {
229227
return nil, fmt.Errorf("generate worker pod %w", err)
230228
}
@@ -233,18 +231,9 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker(
233231
if pod.Labels == nil {
234232
pod.Labels = make(map[string]string)
235233
}
236-
237-
if pod.Annotations == nil {
238-
pod.Annotations = make(map[string]string)
239-
}
240-
241-
gpuNames := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) string {
242-
return gpu.Name
243-
})
244-
245234
pod.Labels[constants.WorkloadKey] = workload.Name
235+
pod.Labels[constants.GpuKey] = gpu.Name
246236
pod.Labels[constants.LabelKeyPodTemplateHash] = hash
247-
pod.Annotations[constants.GpuKey] = strings.Join(gpuNames, ",")
248237

249238
// Add finalizer for GPU resource cleanup
250239
pod.Finalizers = append(pod.Finalizers, constants.Finalizer)
@@ -280,7 +269,6 @@ func (r *TensorFusionWorkloadReconciler) scaleDownWorkers(ctx context.Context, w
280269
metrics.GpuTflopsLimit.Delete(labels)
281270
metrics.VramBytesRequest.Delete(labels)
282271
metrics.VramBytesLimit.Delete(labels)
283-
metrics.GpuCount.Delete(labels)
284272
}
285273
return nil
286274
}
@@ -291,24 +279,26 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context
291279

292280
log.Info("Processing pod with GPU resource cleanup finalizer", "pod", pod.Name)
293281

294-
// read the GPU names from the pod annotations
295-
gpuNamesStr, ok := pod.Annotations[constants.GpuKey]
282+
// Get GPU name from pod label
283+
gpuName, ok := pod.Labels[constants.GpuKey]
296284
if !ok {
297285
log.Info("Pod has finalizer but no GPU label", "pod", pod.Name)
298286
return true, nil
299287
}
300288

301-
// Split GPU names by comma
302-
gpuNames := strings.Split(gpuNamesStr, ",")
303-
gpus := lo.Map(gpuNames, func(gpuName string, _ int) types.NamespacedName {
304-
return types.NamespacedName{Name: gpuName}
305-
})
306-
// Release GPU resources
307-
if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpus); err != nil {
308-
log.Error(err, "Failed to release GPU resources, will retry", "gpus", gpus, "pod", pod.Name)
289+
// Get the GPU
290+
gpu := &tfv1.GPU{}
291+
if err := r.Get(ctx, client.ObjectKey{Name: gpuName}, gpu); err != nil {
292+
if errors.IsNotFound(err) {
293+
// GPU not found, just continue
294+
log.Info("GPU not found", "gpu", gpuName, "pod", pod.Name)
295+
return true, nil
296+
}
297+
// Error getting GPU, retry later
298+
log.Error(err, "Failed to get GPU", "gpu", gpuName, "pod", pod.Name)
309299
return false, err
310300
}
311-
log.Info("Released GPU resources via finalizer", "gpus", gpus, "pod", pod.Name)
301+
312302
if pod.Annotations == nil {
313303
pod.Annotations = make(map[string]string)
314304
}
@@ -320,10 +310,17 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context
320310
// not yet reflecting the finalizer's removal), Then this r.Update pod will fail.
321311
// Will not cause duplicate releases
322312
if err := r.Update(ctx, pod); err != nil {
323-
log.Error(err, "Failed to mark that GPU cleanup of pod")
313+
log.Error(err, "Failed to mark that GPU cleanup of pod", "gpu", gpuName, "pod", pod.Name)
314+
return false, err
315+
}
316+
317+
// Release GPU resources
318+
if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu); err != nil {
319+
log.Error(err, "Failed to release GPU resources, will retry", "gpu", gpuName, "pod", pod.Name)
324320
return false, err
325321
}
326322

323+
log.Info("Released GPU resources via finalizer", "gpu", gpuName, "pod", pod.Name)
327324
return true, nil
328325
}
329326

@@ -347,21 +344,21 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
347344
// Create worker pods
348345
for range count {
349346
// Schedule GPU for the worker
350-
gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, workload.Spec.GPUCount)
347+
gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, 1)
351348
if err != nil {
352349
r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err)
353350
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
354351
}
355352

356-
pod, err := r.tryStartWorker(ctx, workerGenerator, gpus, workload, hash)
353+
// Use the first GPU from the allocated array
354+
gpu := gpus[0]
355+
356+
pod, err := r.tryStartWorker(ctx, workerGenerator, gpu, workload, hash)
357357
if err != nil {
358-
// Try to release all allocated GPUs if pod creation fails
359-
gpus := lo.Map(gpus, func(gpu *tfv1.GPU, _ int) types.NamespacedName {
360-
return client.ObjectKeyFromObject(gpu)
361-
})
362-
releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpus)
358+
// Try to release the GPU resource if pod creation fails
359+
releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu)
363360
if releaseErr != nil {
364-
log.Error(releaseErr, "Failed to release GPU after pod creation failure", "gpus", gpus)
361+
log.Error(releaseErr, "Failed to release GPU after pod creation failure")
365362
}
366363
return ctrl.Result{}, fmt.Errorf("create worker pod: %w", err)
367364
}
@@ -375,7 +372,6 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
375372
metrics.GpuTflopsLimit.With(labels).Set(workload.Spec.Resources.Limits.Tflops.AsApproximateFloat64())
376373
metrics.VramBytesRequest.With(labels).Set(workload.Spec.Resources.Requests.Vram.AsApproximateFloat64())
377374
metrics.VramBytesLimit.With(labels).Set(workload.Spec.Resources.Limits.Vram.AsApproximateFloat64())
378-
metrics.GpuCount.With(labels).Set(float64(workload.Spec.GPUCount))
379375
}
380376

381377
return ctrl.Result{}, nil

internal/controller/tensorfusionworkload_controller_test.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package controller
1818

1919
import (
20-
"strings"
2120
"time"
2221

2322
"github.com/aws/smithy-go/ptr"
@@ -58,51 +57,6 @@ var _ = Describe("TensorFusionWorkload Controller", func() {
5857
checkWorkerPodCount(workload)
5958
checkWorkloadStatus(workload)
6059
})
61-
62-
It("Should allocate multiple GPUs per workload when GPUCount > 1", func() {
63-
pool := tfEnv.GetGPUPool(0)
64-
By("creating a workload that requests 2 GPUs")
65-
workload := &tfv1.TensorFusionWorkload{
66-
ObjectMeta: metav1.ObjectMeta{
67-
Name: key.Name,
68-
Namespace: key.Namespace,
69-
Labels: map[string]string{
70-
constants.LabelKeyOwner: pool.Name,
71-
},
72-
},
73-
Spec: tfv1.WorkloadProfileSpec{
74-
Replicas: ptr.Int32(1),
75-
PoolName: pool.Name,
76-
GPUCount: 2,
77-
Resources: tfv1.Resources{
78-
Requests: tfv1.Resource{
79-
Tflops: resource.MustParse("10"),
80-
Vram: resource.MustParse("8Gi"),
81-
},
82-
Limits: tfv1.Resource{
83-
Tflops: resource.MustParse("20"),
84-
Vram: resource.MustParse("16Gi"),
85-
},
86-
},
87-
},
88-
}
89-
90-
Expect(k8sClient.Create(ctx, workload)).To(Succeed())
91-
92-
// Check that pod is created with 2 GPUs
93-
podList := &corev1.PodList{}
94-
Eventually(func(g Gomega) {
95-
g.Expect(k8sClient.List(ctx, podList,
96-
client.InNamespace(key.Namespace),
97-
client.MatchingLabels{constants.WorkloadKey: key.Name})).Should(Succeed())
98-
g.Expect(podList.Items).Should(HaveLen(1))
99-
100-
gpuNames := strings.Split(podList.Items[0].Annotations[constants.GpuKey], ",")
101-
g.Expect(gpuNames).Should(HaveLen(2))
102-
}, timeout, interval).Should(Succeed())
103-
104-
checkWorkloadStatus(workload)
105-
})
10660
})
10761

10862
Context("When scaling up a workload", func() {

internal/gpuallocator/gpuallocator.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,26 +121,25 @@ func (s *GpuAllocator) Alloc(
121121
return result, nil
122122
}
123123

124-
// Dealloc deallocates a request from one or multiple gpus.
125-
func (s *GpuAllocator) Dealloc(ctx context.Context, request tfv1.Resource, gpus []types.NamespacedName) error {
124+
// Dealloc deallocates a request from a gpu.
125+
func (s *GpuAllocator) Dealloc(ctx context.Context, request tfv1.Resource, gpu *tfv1.GPU) error {
126126
log := log.FromContext(ctx)
127127
s.storeMutex.Lock()
128128
defer s.storeMutex.Unlock()
129129

130-
for _, gpu := range gpus {
131-
// Get the GPU from the store
132-
storeGPU, exists := s.gpuStore[gpu]
133-
if !exists {
134-
log.Error(fmt.Errorf("GPU not found in store"), "Failed to deallocate GPU", "name", gpu.String())
135-
continue
136-
}
130+
// Get the GPU from the store
131+
key := types.NamespacedName{Name: gpu.Name, Namespace: gpu.Namespace}
132+
storeGPU, exists := s.gpuStore[key]
133+
if !exists {
134+
log.Info("GPU not found in store during deallocation", "name", key.String())
135+
return fmt.Errorf("GPU %s not found in store", key.String())
136+
}
137137

138-
// Add resources back to the GPU
139-
storeGPU.Status.Available.Tflops.Add(request.Tflops)
140-
storeGPU.Status.Available.Vram.Add(request.Vram)
138+
// Add resources back to the GPU
139+
storeGPU.Status.Available.Tflops.Add(request.Tflops)
140+
storeGPU.Status.Available.Vram.Add(request.Vram)
141141

142-
s.markGPUDirty(gpu)
143-
}
142+
s.markGPUDirty(key)
144143

145144
return nil
146145
}

internal/gpuallocator/gpuallocator_test.go

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ import (
2323
"github.com/NexusGPU/tensor-fusion/internal/constants"
2424
. "github.com/onsi/ginkgo/v2"
2525
. "github.com/onsi/gomega"
26-
"github.com/samber/lo"
2726
"k8s.io/apimachinery/pkg/api/resource"
2827
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2928
"k8s.io/apimachinery/pkg/types"
30-
"sigs.k8s.io/controller-runtime/pkg/client"
3129
)
3230

3331
var _ = Describe("GPU Allocator", func() {
@@ -140,7 +138,7 @@ var _ = Describe("GPU Allocator", func() {
140138
allocatedVram := allocatedGPU.Status.Available.Vram.DeepCopy()
141139

142140
// Now deallocate
143-
err = allocator.Dealloc(ctx, request, []types.NamespacedName{client.ObjectKeyFromObject(gpus[0])})
141+
err = allocator.Dealloc(ctx, request, allocatedGPU)
144142
Expect(err).NotTo(HaveOccurred())
145143

146144
// Verify resources were restored
@@ -150,69 +148,9 @@ var _ = Describe("GPU Allocator", func() {
150148
expectedTflops.Add(request.Tflops)
151149
expectedVram.Add(request.Vram)
152150

153-
Expect(deallocatedGPU.Status.Available.Tflops.Cmp(expectedTflops)).To(Equal(0))
154-
Expect(deallocatedGPU.Status.Available.Vram.Cmp(expectedVram)).To(Equal(0))
151+
Expect(deallocatedGPU.Status.Available.Tflops.Cmp(allocatedTflops)).To(Equal(1))
155152
Expect(deallocatedGPU.Status.Available.Vram.Cmp(allocatedVram)).To(Equal(1))
156153
})
157-
158-
It("should continue deallocating when some GPUs don't exist", func() {
159-
// First allocate resources to multiple GPUs
160-
request := tfv1.Resource{
161-
Tflops: resource.MustParse("20"),
162-
Vram: resource.MustParse("4Gi"),
163-
}
164-
165-
// Allocate 2 GPUs
166-
allocatedGPUs, err := allocator.Alloc(ctx, "test-pool", request, 2)
167-
Expect(err).NotTo(HaveOccurred())
168-
Expect(allocatedGPUs).To(HaveLen(2))
169-
170-
// Create a non-existent GPU
171-
nonExistentGPU := &tfv1.GPU{
172-
ObjectMeta: metav1.ObjectMeta{
173-
Name: "non-existent-gpu",
174-
Namespace: "default",
175-
},
176-
}
177-
178-
// Add the non-existent GPU to the list
179-
gpusToDealloc := append(allocatedGPUs, nonExistentGPU)
180-
181-
// Store the allocated values for existing GPUs
182-
initialStates := make(map[string]struct {
183-
tflops resource.Quantity
184-
vram resource.Quantity
185-
})
186-
for _, gpu := range allocatedGPUs {
187-
initialStates[gpu.Name] = struct {
188-
tflops resource.Quantity
189-
vram resource.Quantity
190-
}{
191-
tflops: gpu.Status.Available.Tflops.DeepCopy(),
192-
vram: gpu.Status.Available.Vram.DeepCopy(),
193-
}
194-
}
195-
gpusToDeallocKeys := lo.Map(gpusToDealloc, func(gpu *tfv1.GPU, _ int) types.NamespacedName {
196-
return client.ObjectKeyFromObject(gpu)
197-
})
198-
// Now deallocate all GPUs including the non-existent one
199-
err = allocator.Dealloc(ctx, request, gpusToDeallocKeys)
200-
Expect(err).NotTo(HaveOccurred())
201-
202-
// Verify resources were restored for existing GPUs
203-
for _, allocatedGPU := range allocatedGPUs {
204-
deallocatedGPU := getGPU(allocatedGPU.Name, allocatedGPU.Namespace)
205-
initialState := initialStates[allocatedGPU.Name]
206-
207-
expectedTflops := initialState.tflops.DeepCopy()
208-
expectedVram := initialState.vram.DeepCopy()
209-
expectedTflops.Add(request.Tflops)
210-
expectedVram.Add(request.Vram)
211-
212-
Expect(deallocatedGPU.Status.Available.Tflops.Cmp(initialState.tflops)).To(Equal(1))
213-
Expect(deallocatedGPU.Status.Available.Vram.Cmp(initialState.vram)).To(Equal(1))
214-
}
215-
})
216154
})
217155

218156
Context("Event Handling", func() {

internal/metrics/worker.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,6 @@ var (
4242
labels,
4343
)
4444

45-
GpuCount = prometheus.NewGaugeVec(
46-
prometheus.GaugeOpts{
47-
Name: "gpu_count",
48-
Help: "Number of GPUs allocated to the workload",
49-
},
50-
labels,
51-
)
52-
5345
AllocatedTflopsPercent = prometheus.NewGaugeVec(
5446
prometheus.GaugeOpts{
5547
Name: "allocated_compute_percentage",
@@ -66,13 +58,5 @@ var (
6658
)
6759

6860
func init() {
69-
metrics.Registry.MustRegister(
70-
GpuTflopsRequest,
71-
GpuTflopsLimit,
72-
VramBytesRequest,
73-
VramBytesLimit,
74-
AllocatedTflopsPercent,
75-
AllocatedVramBytes,
76-
GpuCount,
77-
)
61+
metrics.Registry.MustRegister(GpuTflopsRequest, GpuTflopsLimit, VramBytesRequest, VramBytesLimit)
7862
}

0 commit comments

Comments
 (0)