Skip to content

Commit c3c1f5e

Browse files
authored
feat: maintain all GPUs in memory instead of accessing the apiserver … (#186)
* feat: maintain all GPUs in memory instead of accessing the apiserver every time * go mod tidy * fix test * fix lint * feat: add sync interval parameter to GPU allocator * fix test * feat: update GPU allocator setup to return readiness channel * fix: disable metricsserver for mgr in suite test * feat: add allocator cleanup and initialization in main and suite tests * fix: change readyCh channel to buffered in GPU allocator setup * feat: implement dirty queue for tracking modified GPUs in GpuAllocator * fix: if the gpu update fails, put it back in the queue
1 parent c25eca9 commit c3c1f5e

21 files changed

+1807
-695
lines changed

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
"CUDA",
2121
"cycjimmy",
2222
"dylib",
23-
"essd",
2423
"envtest",
24+
"essd",
2525
"Eventf",
2626
"finalizer",
2727
"Finalizers",
@@ -30,6 +30,7 @@
3030
"Gomega",
3131
"gopsutil",
3232
"gosec",
33+
"gpuallocator",
3334
"gpunode",
3435
"gpunodeclasses",
3536
"gpunodes",

cmd/main.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,15 @@ import (
3636
"sigs.k8s.io/controller-runtime/pkg/client"
3737
"sigs.k8s.io/controller-runtime/pkg/healthz"
3838
"sigs.k8s.io/controller-runtime/pkg/log/zap"
39+
"sigs.k8s.io/controller-runtime/pkg/manager"
3940
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4041
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4142
"sigs.k8s.io/controller-runtime/pkg/webhook"
4243

4344
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
4445
"github.com/NexusGPU/tensor-fusion/internal/config"
4546
"github.com/NexusGPU/tensor-fusion/internal/controller"
46-
"github.com/NexusGPU/tensor-fusion/internal/scheduler"
47+
"github.com/NexusGPU/tensor-fusion/internal/gpuallocator"
4748
"github.com/NexusGPU/tensor-fusion/internal/server"
4849
"github.com/NexusGPU/tensor-fusion/internal/server/router"
4950
webhookcorev1 "github.com/NexusGPU/tensor-fusion/internal/webhook/v1"
@@ -169,7 +170,12 @@ func main() {
169170

170171
ctx := context.Background()
171172

172-
scheduler := scheduler.NewScheduler(mgr.GetClient())
173+
// Initialize GPU allocator and set up watches
174+
allocator := gpuallocator.NewGpuAllocator(ctx, mgr.GetClient(), 10*time.Second)
175+
if _, err = allocator.SetupWithManager(ctx, mgr); err != nil {
176+
setupLog.Error(err, "unable to set up GPU allocator watches")
177+
os.Exit(1)
178+
}
173179
if err = (&controller.TensorFusionConnectionReconciler{
174180
Client: mgr.GetClient(),
175181
Scheme: mgr.GetScheme(),
@@ -270,7 +276,7 @@ func main() {
270276
if err = (&controller.TensorFusionWorkloadReconciler{
271277
Client: mgr.GetClient(),
272278
Scheme: mgr.GetScheme(),
273-
Scheduler: scheduler,
279+
Allocator: allocator,
274280
Recorder: mgr.GetEventRecorderFor("tensorfusionworkload"),
275281
GpuInfos: &gpuInfos,
276282
}).SetupWithManager(mgr); err != nil {
@@ -307,6 +313,21 @@ func main() {
307313
}
308314
}()
309315

316+
// cleanup function to stop the allocator
317+
err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
318+
// wait for the context to be done
319+
<-ctx.Done()
320+
setupLog.Info("stopping allocator")
321+
if allocator != nil {
322+
allocator.Stop()
323+
}
324+
return nil
325+
}))
326+
if err != nil {
327+
setupLog.Error(err, "unable to add allocator cleanup to manager")
328+
os.Exit(1)
329+
}
330+
310331
setupLog.Info("starting manager")
311332
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
312333
setupLog.Error(err, "problem running manager")

internal/controller/suite_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import (
4646
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
4747
"github.com/NexusGPU/tensor-fusion/internal/config"
4848
"github.com/NexusGPU/tensor-fusion/internal/constants"
49-
scheduler "github.com/NexusGPU/tensor-fusion/internal/scheduler"
49+
"github.com/NexusGPU/tensor-fusion/internal/gpuallocator"
5050
"github.com/NexusGPU/tensor-fusion/internal/utils"
5151
// +kubebuilder:scaffold:imports
5252
)
@@ -59,6 +59,7 @@ var k8sClient client.Client
5959
var testEnv *envtest.Environment
6060
var ctx context.Context
6161
var cancel context.CancelFunc
62+
var allocator *gpuallocator.GpuAllocator
6263

6364
const (
6465
timeout = time.Second * 10
@@ -178,7 +179,10 @@ var _ = BeforeSuite(func() {
178179
}).SetupWithManager(mgr)
179180
Expect(err).ToNot(HaveOccurred())
180181

181-
scheduler := scheduler.NewScheduler(mgr.GetClient())
182+
allocator = gpuallocator.NewGpuAllocator(ctx, mgr.GetClient(), 3*time.Second)
183+
_, err = allocator.SetupWithManager(ctx, mgr)
184+
Expect(err).ToNot(HaveOccurred())
185+
182186
err = (&TensorFusionConnectionReconciler{
183187
Client: mgr.GetClient(),
184188
Scheme: mgr.GetScheme(),
@@ -195,7 +199,7 @@ var _ = BeforeSuite(func() {
195199
err = (&TensorFusionWorkloadReconciler{
196200
Client: mgr.GetClient(),
197201
Scheme: mgr.GetScheme(),
198-
Scheduler: scheduler,
202+
Allocator: allocator,
199203
Recorder: mgr.GetEventRecorderFor("TensorFusionWorkload"),
200204
GpuInfos: config.MockGpuInfo(),
201205
}).SetupWithManager(mgr)
@@ -211,6 +215,7 @@ var _ = BeforeSuite(func() {
211215

212216
var _ = AfterSuite(func() {
213217
By("tearing down the test environment")
218+
allocator.Stop()
214219
cancel()
215220
err := testEnv.Stop()
216221
Expect(err).NotTo(HaveOccurred())

internal/controller/tensorfusionworkload_controller.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333
tfv1 "github.com/NexusGPU/tensor-fusion/api/v1"
3434
"github.com/NexusGPU/tensor-fusion/internal/config"
3535
"github.com/NexusGPU/tensor-fusion/internal/constants"
36+
"github.com/NexusGPU/tensor-fusion/internal/gpuallocator"
3637
"github.com/NexusGPU/tensor-fusion/internal/metrics"
37-
scheduler "github.com/NexusGPU/tensor-fusion/internal/scheduler"
3838
"github.com/NexusGPU/tensor-fusion/internal/utils"
3939
"github.com/NexusGPU/tensor-fusion/internal/worker"
4040
"github.com/lithammer/shortuuid/v4"
@@ -46,7 +46,7 @@ import (
4646
type TensorFusionWorkloadReconciler struct {
4747
client.Client
4848
Scheme *runtime.Scheme
49-
Scheduler scheduler.Scheduler
49+
Allocator *gpuallocator.GpuAllocator
5050
Recorder record.EventRecorder
5151
GpuInfos *[]config.GpuInfo
5252
}
@@ -315,7 +315,7 @@ func (r *TensorFusionWorkloadReconciler) handlePodGPUCleanup(ctx context.Context
315315
}
316316

317317
// Release GPU resources
318-
if err := r.Scheduler.Release(ctx, workload.Spec.Resources.Requests, gpu); err != nil {
318+
if err := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu); err != nil {
319319
log.Error(err, "Failed to release GPU resources, will retry", "gpu", gpuName, "pod", pod.Name)
320320
return false, err
321321
}
@@ -344,16 +344,19 @@ func (r *TensorFusionWorkloadReconciler) scaleUpWorkers(ctx context.Context, wor
344344
// Create worker pods
345345
for range count {
346346
// Schedule GPU for the worker
347-
gpu, err := r.Scheduler.Schedule(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests)
347+
gpus, err := r.Allocator.Alloc(ctx, workload.Spec.PoolName, workload.Spec.Resources.Requests, 1)
348348
if err != nil {
349349
r.Recorder.Eventf(workload, corev1.EventTypeWarning, "ScheduleGPUFailed", "Failed to schedule GPU: %v", err)
350350
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
351351
}
352352

353+
// Use the first GPU from the allocated array
354+
gpu := gpus[0]
355+
353356
pod, err := r.tryStartWorker(ctx, workerGenerator, gpu, workload, hash)
354357
if err != nil {
355358
// Try to release the GPU resource if pod creation fails
356-
releaseErr := r.Scheduler.Release(ctx, workload.Spec.Resources.Requests, gpu)
359+
releaseErr := r.Allocator.Dealloc(ctx, workload.Spec.Resources.Requests, gpu)
357360
if releaseErr != nil {
358361
log.Error(releaseErr, "Failed to release GPU after pod creation failure")
359362
}

internal/controller/tensorfusionworkload_controller_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,15 @@ var _ = Describe("TensorFusionWorkload Controller", func() {
158158
checkWorkerPodCount(workload)
159159
checkWorkloadStatus(workload)
160160

161-
gpuList := tfEnv.GetPoolGpuList(0)
162-
updatedGPU, ok := lo.Find(gpuList.Items, func(gpu tfv1.GPU) bool {
163-
return gpu.Status.Available.Tflops.Equal(resource.MustParse("1990")) && gpu.Status.Available.Vram.Equal(resource.MustParse("1992Gi"))
164-
})
165-
Expect(ok).Should(BeTrue())
161+
var updatedGPU tfv1.GPU
162+
Eventually(func(g Gomega) bool {
163+
gpuList := tfEnv.GetPoolGpuList(0)
164+
ok := false
165+
updatedGPU, ok = lo.Find(gpuList.Items, func(gpu tfv1.GPU) bool {
166+
return gpu.Status.Available.Tflops.Equal(resource.MustParse("1990")) && gpu.Status.Available.Vram.Equal(resource.MustParse("1992Gi"))
167+
})
168+
return ok
169+
}, timeout, interval).Should(BeTrue())
166170

167171
Expect(k8sClient.Get(ctx, key, workload)).Should(Succeed())
168172
workloadCopy := workload.DeepCopy()

internal/scheduler/filter.go renamed to internal/gpuallocator/filter/filter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package scheduler
1+
package filter
22

33
import (
44
"context"

0 commit comments

Comments
 (0)