Skip to content

Commit 32f56c3

Browse files
authored
[Cherry-pick][Refactor][RayCluster] RayClusterHeadPodsAssociationOptions and RayClusterWorkerPodsAssociationOptions (#2023) (#2035)
1 parent 7440579 commit 32f56c3

File tree

5 files changed

+121
-68
lines changed

5 files changed

+121
-68
lines changed

ray-operator/controllers/ray/common/association.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,66 @@ func RayClusterHeadlessServiceListOptions(instance *rayv1.RayCluster) []client.L
3333
}
3434
}
3535

36+
type AssociationOption interface {
37+
client.ListOption
38+
client.DeleteAllOfOption
39+
}
40+
41+
type AssociationOptions []AssociationOption
42+
43+
func (list AssociationOptions) ToListOptions() (options []client.ListOption) {
44+
for _, option := range list {
45+
options = append(options, option.(client.ListOption))
46+
}
47+
return options
48+
}
49+
50+
func (list AssociationOptions) ToDeleteOptions() (options []client.DeleteAllOfOption) {
51+
for _, option := range list {
52+
options = append(options, option.(client.DeleteAllOfOption))
53+
}
54+
return options
55+
}
56+
57+
func RayClusterHeadPodsAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
58+
return AssociationOptions{
59+
client.InNamespace(instance.Namespace),
60+
client.MatchingLabels{
61+
utils.RayClusterLabelKey: instance.Name,
62+
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
63+
},
64+
}
65+
}
66+
67+
func RayClusterWorkerPodsAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
68+
return AssociationOptions{
69+
client.InNamespace(instance.Namespace),
70+
client.MatchingLabels{
71+
utils.RayClusterLabelKey: instance.Name,
72+
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
73+
},
74+
}
75+
}
76+
77+
func RayClusterGroupPodsAssociationOptions(instance *rayv1.RayCluster, group string) AssociationOptions {
78+
return AssociationOptions{
79+
client.InNamespace(instance.Namespace),
80+
client.MatchingLabels{
81+
utils.RayClusterLabelKey: instance.Name,
82+
utils.RayNodeGroupLabelKey: group,
83+
},
84+
}
85+
}
86+
87+
func RayClusterAllPodsAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
88+
return AssociationOptions{
89+
client.InNamespace(instance.Namespace),
90+
client.MatchingLabels{
91+
utils.RayClusterLabelKey: instance.Name,
92+
},
93+
}
94+
}
95+
3696
func RayServiceServeServiceNamespacedName(rayService *rayv1.RayService) types.NamespacedName {
3797
if rayService.Spec.ServeService != nil && rayService.Spec.ServeService.Name != "" {
3898
return types.NamespacedName{

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
179179
return ctrl.Result{}, client.IgnoreNotFound(err)
180180
}
181181

182-
func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace string, filterLabels client.MatchingLabels) (pods corev1.PodList, err error) {
182+
func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common.AssociationOptions) (pods corev1.PodList, err error) {
183183
logger := ctrl.LoggerFrom(ctx)
184-
if err = r.List(ctx, &pods, client.InNamespace(namespace), filterLabels); err != nil {
184+
if err = r.List(ctx, &pods, filters.ToListOptions()...); err != nil {
185185
return pods, err
186186
}
187187
active := 0
@@ -191,8 +191,8 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace stri
191191
}
192192
}
193193
if active > 0 {
194-
logger.Info("Deleting all Pods with labels", "filterLabels", filterLabels, "Number of active Pods", active)
195-
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(namespace), filterLabels)
194+
logger.Info("Deleting all Pods with labels", "filters", filters, "Number of active Pods", active)
195+
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, filters.ToDeleteOptions()...)
196196
}
197197
return pods, nil
198198
}
@@ -230,18 +230,12 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
230230
"DeletionTimestamp", instance.ObjectMeta.DeletionTimestamp)
231231

232232
// Delete the head Pod if it exists.
233-
headPods, err := r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
234-
utils.RayClusterLabelKey: instance.Name,
235-
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
236-
})
233+
headPods, err := r.deleteAllPods(ctx, common.RayClusterHeadPodsAssociationOptions(instance))
237234
if err != nil {
238235
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
239236
}
240237
// Delete all worker Pods if they exist.
241-
if _, err = r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
242-
utils.RayClusterLabelKey: instance.Name,
243-
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
244-
}); err != nil {
238+
if _, err = r.deleteAllPods(ctx, common.RayClusterWorkerPodsAssociationOptions(instance)); err != nil {
245239
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
246240
}
247241
if len(headPods.Items) > 0 {
@@ -631,8 +625,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
631625

632626
// if RayCluster is suspended, delete all pods and skip reconcile
633627
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
634-
clusterLabel := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
635-
if _, err := r.deleteAllPods(ctx, instance.Namespace, clusterLabel); err != nil {
628+
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
636629
return err
637630
}
638631

@@ -644,8 +637,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
644637

645638
// check if all the pods exist
646639
headPods := corev1.PodList{}
647-
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
648-
if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
640+
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
649641
return err
650642
}
651643
if EnableBatchScheduler {
@@ -721,8 +713,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
721713
// check if WorkerGroupSpecs has been changed and we need to kill worker pods
722714
for _, worker := range instance.Spec.WorkerGroupSpecs {
723715
workerPods := corev1.PodList{}
724-
filterLabels = client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeGroupLabelKey: worker.GroupName}
725-
if err := r.List(ctx, &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
716+
if err := r.List(ctx, &workerPods, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName).ToListOptions()...); err != nil {
726717
return err
727718
}
728719
updatedWorkerPods := false
@@ -749,8 +740,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
749740
logger.Info("reconcilePods", "desired workerReplicas (always adhering to minReplicas/maxReplica)", workerReplicas, "worker group", worker.GroupName, "maxReplicas", worker.MaxReplicas, "minReplicas", worker.MinReplicas, "replicas", worker.Replicas)
750741

751742
workerPods := corev1.PodList{}
752-
filterLabels = client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeGroupLabelKey: worker.GroupName}
753-
if err := r.List(ctx, &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
743+
if err := r.List(ctx, &workerPods, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName).ToListOptions()...); err != nil {
754744
return err
755745
}
756746

ray-operator/controllers/ray/raycluster_controller_fake_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2654,12 +2654,12 @@ func TestDeleteAllPods(t *testing.T) {
26542654
}
26552655
ctx := context.Background()
26562656
// The first `deleteAllPods` function call should delete the "alive" Pod.
2657-
pods, err := testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
2657+
pods, err := testRayClusterReconciler.deleteAllPods(ctx, common.AssociationOptions{client.InNamespace(ns), client.MatchingLabels(filter)})
26582658
assert.Nil(t, err)
26592659
assert.Equal(t, 2, len(pods.Items))
26602660
assert.Subset(t, []string{"alive", "deleted"}, []string{pods.Items[0].Name, pods.Items[1].Name})
26612661
// The second `deleteAllPods` function call should delete no Pods because none are active.
2662-
pods, err = testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
2662+
pods, err = testRayClusterReconciler.deleteAllPods(ctx, common.AssociationOptions{client.InNamespace(ns), client.MatchingLabels(filter)})
26632663
assert.Nil(t, err)
26642664
assert.Equal(t, 1, len(pods.Items))
26652665
assert.Equal(t, "deleted", pods.Items[0].Name)

0 commit comments

Comments
 (0)