From a19ae795b45f552ce9ea9dfe67a9b05ad2e81892 Mon Sep 17 00:00:00 2001 From: 0x5457 <0x5457@protonmail.com> Date: Tue, 10 Dec 2024 09:49:15 +0000 Subject: [PATCH] feat: enhance TensorFusionConnection lifecycle and derive worker pod - Add "Starting" phase to TensorFusionConnection states - derive worker pod --- api/v1/tensorfusionconnection_types.go | 5 +- cmd/main.go | 4 ++ internal/config/config.go | 25 ++++++++-- .../tensorfusionconnection_controller.go | 47 +++++++++++++++++-- .../tensorfusionconnection_controller_test.go | 7 ++- internal/worker/worker.go | 22 ++++++++- 6 files changed, 99 insertions(+), 11 deletions(-) diff --git a/api/v1/tensorfusionconnection_types.go b/api/v1/tensorfusionconnection_types.go index c14eb66..c9585bb 100644 --- a/api/v1/tensorfusionconnection_types.go +++ b/api/v1/tensorfusionconnection_types.go @@ -40,8 +40,9 @@ type TensorFusionConnectionPhase string // These are the valid phases of a GpuConnection. const ( - TensorFusionConnectionPending TensorFusionConnectionPhase = "Pending" - TensorFusionConnectionRunning TensorFusionConnectionPhase = "Running" + TensorFusionConnectionPending TensorFusionConnectionPhase = "Pending" + TensorFusionConnectionStarting TensorFusionConnectionPhase = "Starting" + TensorFusionConnectionRunning TensorFusionConnectionPhase = "Running" ) // TensorFusionConnectionStatus defines the observed state of TensorFusionConnection. diff --git a/cmd/main.go b/cmd/main.go index 47055fe..c5e29a7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -44,6 +44,7 @@ import ( "github.com/NexusGPU/tensor-fusion-operator/internal/server" "github.com/NexusGPU/tensor-fusion-operator/internal/server/router" webhookcorev1 "github.com/NexusGPU/tensor-fusion-operator/internal/webhook/v1" + "github.com/NexusGPU/tensor-fusion-operator/internal/worker" // +kubebuilder:scaffold:imports ) @@ -157,6 +158,9 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Scheduler: scheduler, + WorkerGenerator: &worker.WorkerGenerator{ + PodTemplate: &config.WorkerTemplate, + }, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TensorFusionConnection") os.Exit(1) diff --git a/internal/config/config.go b/internal/config/config.go index 1146c2b..5cd899e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,9 +1,13 @@ package config -import corev1 "k8s.io/api/core/v1" +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" +) type Config struct { - PodMutator PodMutator `json:"podMutator"` + WorkerTemplate corev1.PodTemplate `json:"workerTemplate"` + PodMutator PodMutator `json:"podMutator"` } type PodMutator struct { @@ -12,5 +16,20 @@ type PodMutator struct { } func NewDefaultConfig() Config { - return Config{} + return Config{ + WorkerTemplate: corev1.PodTemplate{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: ptr.To[int64](0), + Containers: []corev1.Container{ + { + Name: "tensorfusion-worker", + Image: "busybox:stable-glibc", + Command: []string{"sleep", "infinity"}, + }, + }, + }, + }, + }, + } } diff --git a/internal/controller/tensorfusionconnection_controller.go b/internal/controller/tensorfusionconnection_controller.go index 6280a86..7d024a4 100644 --- a/internal/controller/tensorfusionconnection_controller.go +++ b/internal/controller/tensorfusionconnection_controller.go @@ -18,9 +18,11 @@ package controller import ( "context" + "fmt" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,13 +32,15 @@ import ( "github.com/NexusGPU/tensor-fusion-operator/internal/constants" scheduler "github.com/NexusGPU/tensor-fusion-operator/internal/scheduler" "github.com/NexusGPU/tensor-fusion-operator/internal/worker" + corev1 "k8s.io/api/core/v1" ) // TensorFusionConnectionReconciler reconciles a TensorFusionConnection object type TensorFusionConnectionReconciler struct { client.Client - Scheme *runtime.Scheme - Scheduler scheduler.Scheduler + Scheme *runtime.Scheme + Scheduler scheduler.Scheduler + WorkerGenerator *worker.WorkerGenerator } var ( @@ -102,25 +106,59 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct log.Info(err.Error()) connection.Status.Phase = tfv1.TensorFusionConnectionPending } else if gpu != nil { - connection.Status.Phase = tfv1.TensorFusionConnectionRunning - connection.Status.ConnectionURL = worker.GenerateConnectionURL(gpu, connection) + connection.Status.Phase = tfv1.TensorFusionConnectionStarting // Store the gpu name for cleanup connection.Status.GPU = gpu.Name } else { + // Init status connection.Status.Phase = tfv1.TensorFusionConnectionPending } } + // Start worker job + phase, err := r.TryStartWorker(ctx, connection, types.NamespacedName{Name: connection.Name, Namespace: connection.Namespace}) + if err != nil { + log.Error(err, "Failed to start worker pod") + return ctrl.Result{}, err + } + + if phase == corev1.PodRunning { + connection.Status.Phase = tfv1.TensorFusionConnectionRunning + connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(gpu, connection) + } + // TODO: Handle PodFailure + if err := r.MustUpdateStatus(ctx, connection, gpu); err != nil { return ctrl.Result{}, err } if connection.Status.Phase == tfv1.TensorFusionConnectionPending { + // requeue return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil } + return ctrl.Result{}, nil } +func (r *TensorFusionConnectionReconciler) TryStartWorker(ctx context.Context, connection *tfv1.TensorFusionConnection, namespacedName types.NamespacedName) (corev1.PodPhase, error) { + // Try to get the Pod + pod := &corev1.Pod{} + if err := r.Get(ctx, namespacedName, pod); err != nil { + if errors.IsNotFound(err) { + // Pod doesn't exist, create a new one + pod = r.WorkerGenerator.GenerateWorkerPod(connection, namespacedName) + if err := ctrl.SetControllerReference(connection, pod, r.Scheme); err != nil { + return "", fmt.Errorf("set owner reference %w", err) + } + if err := r.Create(ctx, pod); err != nil { + return "", fmt.Errorf("create pod %w", err) + } + return corev1.PodPending, nil + } + } + return pod.Status.Phase, nil +} + // handleDeletion handles cleanup of external dependencies func (r *TensorFusionConnectionReconciler) handleDeletion(ctx context.Context, connection *tfv1.TensorFusionConnection) error { if connection.Status.GPU == "" { @@ -209,6 +247,7 @@ func (r *TensorFusionConnectionReconciler) MustUpdateStatus(ctx context.Context, func (r *TensorFusionConnectionReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&tfv1.TensorFusionConnection{}). + Owns(&corev1.Pod{}). Named("tensorfusionconnection"). Complete(r) } diff --git a/internal/controller/tensorfusionconnection_controller_test.go b/internal/controller/tensorfusionconnection_controller_test.go index 6ad2872..7a8fb4b 100644 --- a/internal/controller/tensorfusionconnection_controller_test.go +++ b/internal/controller/tensorfusionconnection_controller_test.go @@ -28,6 +28,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" tensorfusionaiv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1" + "github.com/NexusGPU/tensor-fusion-operator/internal/config" + "github.com/NexusGPU/tensor-fusion-operator/internal/worker" ) var _ = Describe("TensorFusionConnection Controller", func() { @@ -68,11 +70,14 @@ var _ = Describe("TensorFusionConnection Controller", func() { }) It("should successfully reconcile the resource", func() { By("Reconciling the created resource") + config := config.NewDefaultConfig() controllerReconciler := &TensorFusionConnectionReconciler{ Client: k8sClient, Scheme: k8sClient.Scheme(), + WorkerGenerator: &worker.WorkerGenerator{ + PodTemplate: &config.WorkerTemplate, + }, } - _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: typeNamespacedName, }) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index d3509c5..32c4500 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -2,8 +2,28 @@ package worker import ( tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) -func GenerateConnectionURL(_gpu *tfv1.GPU, _connection *tfv1.TensorFusionConnection) string { +type WorkerGenerator struct { + PodTemplate *corev1.PodTemplate +} + +func (wg *WorkerGenerator) GenerateConnectionURL(_gpu *tfv1.GPU, _connection *tfv1.TensorFusionConnection) string { return "TODO://" } + +func (wg *WorkerGenerator) GenerateWorkerPod( + connection *tfv1.TensorFusionConnection, + namespacedName types.NamespacedName, +) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespacedName.Name, + Namespace: namespacedName.Namespace, + }, + Spec: wg.PodTemplate.Template.Spec, + } +}