Skip to content

Commit 00a57cc

Browse files
committed
fix: unit test issues, workload count bug
1 parent d9751e6 commit 00a57cc

File tree

5 files changed

+44
-10
lines changed

5 files changed

+44
-10
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ vet: ## Run go vet against code.
6262

6363
.PHONY: test
6464
test: manifests generate fmt vet envtest ## Run tests.
65-
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" GO_TESTING=true ginkgo --p -timeout 0 -cover -coverprofile cover.out -r --skip-file ./test/e2e
65+
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" GO_TESTING=true go run github.com/onsi/ginkgo/v2/ginkgo -p -timeout 0 -cover -coverprofile cover.out -r --skip-file ./test/e2e
6666

6767
.PHONY: test-e2e
6868
test-e2e: manifests generate fmt vet ## Run the e2e tests. Expected an isolated environment using Kind.

internal/controller/suite_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3636
"k8s.io/client-go/kubernetes/scheme"
3737
"k8s.io/client-go/rest"
38+
"k8s.io/client-go/util/retry"
3839
"sigs.k8s.io/controller-runtime/pkg/client"
3940
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
4041
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -253,7 +254,15 @@ func (c *TensorFusionEnv) GetCluster() *tfv1.TensorFusionCluster {
253254

254255
func (c *TensorFusionEnv) UpdateCluster(tfc *tfv1.TensorFusionCluster) {
255256
GinkgoHelper()
256-
Expect(k8sClient.Update(ctx, tfc)).Should(Succeed())
257+
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
258+
latest := &tfv1.TensorFusionCluster{}
259+
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(tfc), latest); err != nil {
260+
return err
261+
}
262+
latest.Spec = tfc.Spec
263+
return k8sClient.Update(ctx, latest)
264+
})
265+
Expect(err).Should(Succeed())
257266
}
258267

259268
func (c *TensorFusionEnv) Cleanup() {

internal/controller/tensorfusioncluster_controller.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,16 +432,16 @@ func (r *TensorFusionClusterReconciler) updateMetricsRecorder(ctx context.Contex
432432
}
433433
pricingDetail := r.MetricsRecorder.WorkerUnitPriceMap[pool.Name]
434434
for _, pricing := range qosConfig.Pricing {
435-
tflopsPerSecond, _ := strconv.ParseFloat(pricing.Requests.PerFP16TFlopsPerHour, 64)
436-
vramPerSecond, _ := strconv.ParseFloat(pricing.Requests.PerGBOfVRAMPerHour, 64)
435+
tflopsPerHour, _ := strconv.ParseFloat(pricing.Requests.PerFP16TFlopsPerHour, 64)
436+
vramPerHour, _ := strconv.ParseFloat(pricing.Requests.PerGBOfVRAMPerHour, 64)
437437
limitOverRequestChargingRatio, _ := strconv.ParseFloat(pricing.LimitsOverRequestsChargingRatio, 64)
438438

439439
pricingDetail[string(pricing.Qos)] = metrics.RawBillingPricing{
440-
TflopsPerSecond: tflopsPerSecond / 3600,
441-
VramPerSecond: vramPerSecond / 3600,
440+
TflopsPerSecond: tflopsPerHour / float64(3600),
441+
VramPerSecond: vramPerHour / float64(3600),
442442

443-
TflopsOverRequestPerSecond: tflopsPerSecond / 3600 * limitOverRequestChargingRatio,
444-
VramOverRequestPerSecond: vramPerSecond / 3600 * limitOverRequestChargingRatio,
443+
TflopsOverRequestPerSecond: tflopsPerHour / float64(3600) * limitOverRequestChargingRatio,
444+
VramOverRequestPerSecond: vramPerHour / float64(3600) * limitOverRequestChargingRatio,
445445
}
446446
}
447447

internal/controller/tensorfusionworkload_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ func (r *TensorFusionWorkloadReconciler) tryStartWorker(
269269
workload *tfv1.TensorFusionWorkload,
270270
hash string,
271271
) (*corev1.Pod, error) {
272+
if len(gpus) == 0 || gpus[0].Labels == nil {
273+
return nil, fmt.Errorf("no gpus or no labels, can not assign host port for worker")
274+
}
272275
port, err := r.PortAllocator.AssignHostPort(gpus[0].Status.NodeSelector[constants.KubernetesHostNameLabel])
273276
if err != nil {
274277
return nil, fmt.Errorf("get host port %w", err)

internal/gpuallocator/gpuallocator.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ func (s *GpuAllocator) Alloc(
108108
s.storeMutex.Lock()
109109
defer s.storeMutex.Unlock()
110110

111+
appAdded := false
111112
for _, selectedGPU := range selectedGPUs {
113+
112114
// Get the GPU from the store
113115
key := types.NamespacedName{Name: selectedGPU.Name, Namespace: selectedGPU.Namespace}
114116
gpu, exists := s.gpuStore[key]
@@ -122,7 +124,10 @@ func (s *GpuAllocator) Alloc(
122124
gpu.Status.Available.Tflops.Sub(request.Tflops)
123125
gpu.Status.Available.Vram.Sub(request.Vram)
124126

125-
addRunningApp(gpu, workloadNameNamespace)
127+
if !appAdded {
128+
addRunningApp(ctx, gpu, workloadNameNamespace)
129+
appAdded = true
130+
}
126131

127132
s.markGPUDirty(key)
128133
}
@@ -143,6 +148,7 @@ func (s *GpuAllocator) Dealloc(ctx context.Context, workloadNameNamespace tfv1.N
143148
s.storeMutex.Lock()
144149
defer s.storeMutex.Unlock()
145150

151+
appRemoved := false
146152
for _, gpu := range gpus {
147153
// Get the GPU from the store
148154
storeGPU, exists := s.gpuStore[gpu]
@@ -154,6 +160,10 @@ func (s *GpuAllocator) Dealloc(ctx context.Context, workloadNameNamespace tfv1.N
154160
// Add resources back to the GPU
155161
storeGPU.Status.Available.Tflops.Add(request.Tflops)
156162
storeGPU.Status.Available.Vram.Add(request.Vram)
163+
if !appRemoved {
164+
removeRunningApp(ctx, storeGPU, workloadNameNamespace)
165+
appRemoved = true
166+
}
157167

158168
s.markGPUDirty(gpu)
159169
}
@@ -468,11 +478,14 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
468478

469479
tflopsCapacityMap := make(map[types.NamespacedName]resource.Quantity)
470480
vramCapacityMap := make(map[types.NamespacedName]resource.Quantity)
481+
gpuMap := make(map[types.NamespacedName]*tfv1.GPU)
471482

472483
for gpuKey, gpu := range s.gpuStore {
473484
if gpu.Status.Capacity != nil {
474485
tflopsCapacityMap[gpuKey] = gpu.Status.Capacity.Tflops
475486
vramCapacityMap[gpuKey] = gpu.Status.Capacity.Vram
487+
gpu.Status.RunningApps = []*tfv1.RunningAppDetail{}
488+
gpuMap[gpuKey] = gpu
476489
}
477490
}
478491

@@ -481,6 +494,7 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
481494
vramRequest, _ := resource.ParseQuantity(worker.Annotations[constants.VRAMRequestAnnotation])
482495
gpuIds := worker.Annotations[constants.GpuKey]
483496
gpuIdsList := strings.Split(gpuIds, ",")
497+
appAdded := false
484498
for _, gpuId := range gpuIdsList {
485499
gpuKey := types.NamespacedName{Name: gpuId}
486500
gpuCapacity, ok := tflopsCapacityMap[gpuKey]
@@ -491,6 +505,10 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
491505
if ok {
492506
gpuCapacity.Sub(vramRequest)
493507
}
508+
if !appAdded {
509+
addRunningApp(ctx, gpuMap[gpuKey], tfv1.NameNamespace{Namespace: worker.Namespace, Name: worker.Labels[constants.WorkloadKey]})
510+
appAdded = true
511+
}
494512
}
495513
}
496514

@@ -510,7 +528,11 @@ func (s *GpuAllocator) reconcileAllocationState(ctx context.Context) {
510528
}
511529
}
512530

513-
func addRunningApp(gpu *tfv1.GPU, workloadNameNamespace tfv1.NameNamespace) {
531+
func addRunningApp(ctx context.Context, gpu *tfv1.GPU, workloadNameNamespace tfv1.NameNamespace) {
532+
if gpu == nil {
533+
log.FromContext(ctx).Info("[Warning] GPU is nil, skip adding running app", "workload", workloadNameNamespace.Name, "namespace", workloadNameNamespace.Namespace)
534+
return
535+
}
514536
if gpu.Status.RunningApps == nil {
515537
gpu.Status.RunningApps = []*tfv1.RunningAppDetail{}
516538
}

0 commit comments

Comments
 (0)