Skip to content

Commit 3d8c3ef

Browse files
committed
feat: enhance TensorFusionConnection lifecycle and worker configuration
- Add "Starting" phase to TensorFusionConnection states - Implement worker pod template configuration in config package - Update controller to use WorkerGenerator for pod management - Enhance main.go with worker template initialization
1 parent da891eb commit 3d8c3ef

File tree

6 files changed

+99
-11
lines changed

6 files changed

+99
-11
lines changed

api/v1/tensorfusionconnection_types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ type TensorFusionConnectionPhase string
4040

4141
// These are the valid phases of a GpuConnection.
4242
const (
43-
TensorFusionConnectionPending TensorFusionConnectionPhase = "Pending"
44-
TensorFusionConnectionRunning TensorFusionConnectionPhase = "Running"
43+
TensorFusionConnectionPending TensorFusionConnectionPhase = "Pending"
44+
TensorFusionConnectionStarting TensorFusionConnectionPhase = "Starting"
45+
TensorFusionConnectionRunning TensorFusionConnectionPhase = "Running"
4546
)
4647

4748
// TensorFusionConnectionStatus defines the observed state of TensorFusionConnection.

cmd/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/NexusGPU/tensor-fusion-operator/internal/server"
4545
"github.com/NexusGPU/tensor-fusion-operator/internal/server/router"
4646
webhookcorev1 "github.com/NexusGPU/tensor-fusion-operator/internal/webhook/v1"
47+
"github.com/NexusGPU/tensor-fusion-operator/internal/worker"
4748
// +kubebuilder:scaffold:imports
4849
)
4950

@@ -157,6 +158,9 @@ func main() {
157158
Client: mgr.GetClient(),
158159
Scheme: mgr.GetScheme(),
159160
Scheduler: scheduler,
161+
WorkerGenerator: &worker.WorkerGenerator{
162+
PodTemplate: &config.WorkerTemplate,
163+
},
160164
}).SetupWithManager(mgr); err != nil {
161165
setupLog.Error(err, "unable to create controller", "controller", "TensorFusionConnection")
162166
os.Exit(1)

internal/config/config.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package config
22

3-
import corev1 "k8s.io/api/core/v1"
3+
import (
4+
corev1 "k8s.io/api/core/v1"
5+
"k8s.io/utils/ptr"
6+
)
47

58
type Config struct {
6-
PodMutator PodMutator `json:"podMutator"`
9+
WorkerTemplate corev1.PodTemplate `json:"workerTemplate"`
10+
PodMutator PodMutator `json:"podMutator"`
711
}
812

913
type PodMutator struct {
@@ -12,5 +16,20 @@ type PodMutator struct {
1216
}
1317

1418
func NewDefaultConfig() Config {
15-
return Config{}
19+
return Config{
20+
WorkerTemplate: corev1.PodTemplate{
21+
Template: corev1.PodTemplateSpec{
22+
Spec: corev1.PodSpec{
23+
TerminationGracePeriodSeconds: ptr.To[int64](0),
24+
Containers: []corev1.Container{
25+
{
26+
Name: "tensorfusion-worker",
27+
Image: "busybox:stable-glibc",
28+
Command: []string{"sleep", "infinity"},
29+
},
30+
},
31+
},
32+
},
33+
},
34+
}
1635
}

internal/controller/tensorfusionconnection_controller.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122

2223
"k8s.io/apimachinery/pkg/api/errors"
2324
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/types"
2426
"k8s.io/client-go/util/retry"
2527
ctrl "sigs.k8s.io/controller-runtime"
2628
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -30,13 +32,15 @@ import (
3032
"github.com/NexusGPU/tensor-fusion-operator/internal/constants"
3133
scheduler "github.com/NexusGPU/tensor-fusion-operator/internal/scheduler"
3234
"github.com/NexusGPU/tensor-fusion-operator/internal/worker"
35+
corev1 "k8s.io/api/core/v1"
3336
)
3437

3538
// TensorFusionConnectionReconciler reconciles a TensorFusionConnection object
3639
type TensorFusionConnectionReconciler struct {
3740
client.Client
38-
Scheme *runtime.Scheme
39-
Scheduler scheduler.Scheduler
41+
Scheme *runtime.Scheme
42+
Scheduler scheduler.Scheduler
43+
WorkerGenerator *worker.WorkerGenerator
4044
}
4145

4246
var (
@@ -102,25 +106,59 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
102106
log.Info(err.Error())
103107
connection.Status.Phase = tfv1.TensorFusionConnectionPending
104108
} else if gpu != nil {
105-
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
106-
connection.Status.ConnectionURL = worker.GenerateConnectionURL(gpu, connection)
109+
connection.Status.Phase = tfv1.TensorFusionConnectionStarting
107110
// Store the gpu name for cleanup
108111
connection.Status.GPU = gpu.Name
109112
} else {
113+
// Init status
110114
connection.Status.Phase = tfv1.TensorFusionConnectionPending
111115
}
112116
}
113117

118+
// Start worker job
119+
phase, err := r.TryStartWorker(ctx, connection, types.NamespacedName{Name: connection.Name, Namespace: connection.Namespace})
120+
if err != nil {
121+
log.Error(err, "Failed to start worker pod")
122+
return ctrl.Result{}, err
123+
}
124+
125+
if phase == corev1.PodRunning {
126+
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
127+
connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(gpu, connection)
128+
}
129+
// TODO: Handle PodFailure
130+
114131
if err := r.MustUpdateStatus(ctx, connection, gpu); err != nil {
115132
return ctrl.Result{}, err
116133
}
117134

118135
if connection.Status.Phase == tfv1.TensorFusionConnectionPending {
136+
// requeue
119137
return ctrl.Result{RequeueAfter: constants.PendingRequeueDuration}, nil
120138
}
139+
121140
return ctrl.Result{}, nil
122141
}
123142

143+
func (r *TensorFusionConnectionReconciler) TryStartWorker(ctx context.Context, connection *tfv1.TensorFusionConnection, namespacedName types.NamespacedName) (corev1.PodPhase, error) {
144+
// Try to get the Pod
145+
pod := &corev1.Pod{}
146+
if err := r.Get(ctx, namespacedName, pod); err != nil {
147+
if errors.IsNotFound(err) {
148+
// Pod doesn't exist, create a new one
149+
pod = r.WorkerGenerator.GenerateWorkerPod(connection, namespacedName)
150+
if err := ctrl.SetControllerReference(connection, pod, r.Scheme); err != nil {
151+
return "", fmt.Errorf("set owner reference %w", err)
152+
}
153+
if err := r.Create(ctx, pod); err != nil {
154+
return "", fmt.Errorf("create pod %w", err)
155+
}
156+
return corev1.PodPending, nil
157+
}
158+
}
159+
return pod.Status.Phase, nil
160+
}
161+
124162
// handleDeletion handles cleanup of external dependencies
125163
func (r *TensorFusionConnectionReconciler) handleDeletion(ctx context.Context, connection *tfv1.TensorFusionConnection) error {
126164
if connection.Status.GPU == "" {
@@ -209,6 +247,7 @@ func (r *TensorFusionConnectionReconciler) MustUpdateStatus(ctx context.Context,
209247
func (r *TensorFusionConnectionReconciler) SetupWithManager(mgr ctrl.Manager) error {
210248
return ctrl.NewControllerManagedBy(mgr).
211249
For(&tfv1.TensorFusionConnection{}).
250+
Owns(&corev1.Pod{}).
212251
Named("tensorfusionconnection").
213252
Complete(r)
214253
}

internal/controller/tensorfusionconnection_controller_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929

3030
tensorfusionaiv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
31+
"github.com/NexusGPU/tensor-fusion-operator/internal/config"
32+
"github.com/NexusGPU/tensor-fusion-operator/internal/worker"
3133
)
3234

3335
var _ = Describe("TensorFusionConnection Controller", func() {
@@ -68,11 +70,14 @@ var _ = Describe("TensorFusionConnection Controller", func() {
6870
})
6971
It("should successfully reconcile the resource", func() {
7072
By("Reconciling the created resource")
73+
config := config.NewDefaultConfig()
7174
controllerReconciler := &TensorFusionConnectionReconciler{
7275
Client: k8sClient,
7376
Scheme: k8sClient.Scheme(),
77+
WorkerGenerator: &worker.WorkerGenerator{
78+
PodTemplate: &config.WorkerTemplate,
79+
},
7480
}
75-
7681
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
7782
NamespacedName: typeNamespacedName,
7883
})

internal/worker/worker.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,28 @@ package worker
22

33
import (
44
tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
5+
corev1 "k8s.io/api/core/v1"
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
"k8s.io/apimachinery/pkg/types"
58
)
69

7-
func GenerateConnectionURL(_gpu *tfv1.GPU, _connection *tfv1.TensorFusionConnection) string {
10+
type WorkerGenerator struct {
11+
PodTemplate *corev1.PodTemplate
12+
}
13+
14+
func (wg *WorkerGenerator) GenerateConnectionURL(_gpu *tfv1.GPU, _connection *tfv1.TensorFusionConnection) string {
815
return "TODO://"
916
}
17+
18+
func (wg *WorkerGenerator) GenerateWorkerPod(
19+
connection *tfv1.TensorFusionConnection,
20+
namespacedName types.NamespacedName,
21+
) *corev1.Pod {
22+
return &corev1.Pod{
23+
ObjectMeta: metav1.ObjectMeta{
24+
Name: namespacedName.Name,
25+
Namespace: namespacedName.Namespace,
26+
},
27+
Spec: wg.PodTemplate.Template.Spec,
28+
}
29+
}

0 commit comments

Comments
 (0)