From 47a78666a7f799376fc9e7d6635f2343c061d975 Mon Sep 17 00:00:00 2001 From: Matthieu Patou Date: Wed, 18 Dec 2024 01:20:10 +0000 Subject: [PATCH] Fix the coordinator selection to try harder and pick more nodes in each zones after a first pass --- controllers/generate_initial_cluster_file.go | 30 ++++++++++++++++++-- internal/coordinator/coordinator.go | 27 ++++++++++++++++-- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/controllers/generate_initial_cluster_file.go b/controllers/generate_initial_cluster_file.go index 7aa0929b5..0b88c67a7 100644 --- a/controllers/generate_initial_cluster_file.go +++ b/controllers/generate_initial_cluster_file.go @@ -132,8 +132,34 @@ func (g generateInitialClusterFile) reconcile(ctx context.Context, r *Foundation coordinators, err := locality.ChooseDistributedProcesses(cluster, processLocality, count, locality.ProcessSelectionConstraint{ HardLimits: locality.GetHardLimits(cluster), }) - if err != nil { - return &requeue{curError: err} + logger.Info("Current coordinators", "coordinators", coordinators, "error", err) + if len(coordinators) != count { + // Try one more time to get more coordinators + // Let's do a map with the ID of the coordinators + coordinatorsIDMap := make(map[string]bool, len(coordinators)) + for _, coordinator := range coordinators { + coordinatorsIDMap[coordinator.ID] = true + } + // Filter out the coordinators already found from the candidates list + filteredCandidates := make([]locality.Info, 0, len(processLocality)) + for _, candidate := range processLocality { + if _, exists := coordinatorsIDMap[candidate.ID]; !exists { + filteredCandidates = append(filteredCandidates, candidate) + } + } + newcoordinators, err := locality.ChooseDistributedProcesses(cluster, filteredCandidates, count-len(coordinators), locality.ProcessSelectionConstraint{ + HardLimits: locality.GetHardLimits(cluster), + }) + // append the new coordinators to the existing coordinators + logger.Info("Current coordinators", "coordinators", coordinators, "additional coordinators", newcoordinators, "error", err) + if err != nil { + return &requeue{curError: err} + } + coordinators = append(coordinators, newcoordinators...) + } else { + if err != nil { + return &requeue{curError: err} + } } for _, currentLocality := range coordinators { diff --git a/internal/coordinator/coordinator.go b/internal/coordinator/coordinator.go index 821845822..27e858fa6 100644 --- a/internal/coordinator/coordinator.go +++ b/internal/coordinator/coordinator.go @@ -119,8 +119,31 @@ func selectCoordinatorsLocalities(logger logr.Logger, cluster *fdbv1beta2.Founda }) logger.Info("Current coordinators", "coordinators", coordinators, "error", err) - if err != nil { - return nil, err + if len(coordinators) != coordinatorCount { + // Try one more time to get more coordinators + coordinatorsIDMap := make(map[string]bool, len(coordinators)) + for _, coordinator := range coordinators { + coordinatorsIDMap[coordinator.ID] = true + } + filteredCandidates := make([]locality.Info, 0, len(candidates)) + for _, candidate := range candidates { + if _, exists := coordinatorsIDMap[candidate.ID]; !exists { + filteredCandidates = append(filteredCandidates, candidate) + } + } + newcoordinators, err := locality.ChooseDistributedProcesses(cluster, filteredCandidates, coordinatorCount-len(coordinators), locality.ProcessSelectionConstraint{ + HardLimits: locality.GetHardLimits(cluster), + }) + // append the new coordinators to the existing coordinators + logger.Info("Current coordinators", "coordinators", coordinators, "additional coordinators", newcoordinators, "error", err) + if err != nil { + return nil, err + } + coordinators = append(coordinators, newcoordinators...) + } else { + if err != nil { + return nil, err + } } coordinatorStatus := make(map[string]bool, len(status.Client.Coordinators.Coordinators))