Skip to content

Commit 43ba232

Browse files
Ash-expnishant-d
andauthored
fix: cron status update refactoring (#5790)
Co-authored-by: Nishant <58689354+nishant-d@users.noreply.github.com>
1 parent d4bd272 commit 43ba232

File tree

3 files changed

+43
-23
lines changed

3 files changed

+43
-23
lines changed

api/cluster/EnvironmentRestHandler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/http"
2424
"strconv"
2525
"strings"
26+
"sync"
2627
"time"
2728

2829
k8s2 "github.com/devtron-labs/common-lib/utils/k8s"
@@ -513,9 +514,8 @@ func (impl EnvironmentRestHandlerImpl) GetEnvironmentConnection(w http.ResponseW
513514
responseObj.ClusterReachable = false
514515
}
515516
//updating the cluster connection error to db
516-
mapObj := map[int]error{
517-
clusterBean.Id: err,
518-
}
517+
mapObj := &sync.Map{}
518+
mapObj.Store(clusterBean.Id, err)
519519
impl.environmentClusterMappingsService.HandleErrorInClusterConnections([]*request.ClusterBean{clusterBean}, mapObj, true)
520520
common.WriteJsonResp(w, nil, responseObj, http.StatusOK)
521521
}

pkg/cluster/ClusterService.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ type ClusterService interface {
181181
FindAllNamespacesByUserIdAndClusterId(userId int32, clusterId int, isActionUserSuperAdmin bool) ([]string, error)
182182
FindAllForClusterByUserId(userId int32, isActionUserSuperAdmin bool) ([]ClusterBean, error)
183183
FetchRolesFromGroup(userId int32) ([]*repository3.RoleModel, error)
184-
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool)
184+
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool)
185185
ConnectClustersInBatch(clusters []*ClusterBean, clusterExistInDb bool)
186186
ConvertClusterBeanToCluster(clusterBean *ClusterBean, userId int32) *repository.Cluster
187187
ConvertClusterBeanObjectToCluster(bean *ClusterBean) *v1alpha1.Cluster
@@ -259,11 +259,14 @@ func (impl *ClusterServiceImpl) ConvertClusterBeanToCluster(clusterBean *Cluster
259259

260260
// getAndUpdateClusterConnectionStatus is a cron function to update the connection status of all clusters
261261
func (impl *ClusterServiceImpl) getAndUpdateClusterConnectionStatus() {
262-
impl.logger.Debug("starting cluster connection status fetch thread")
263-
defer impl.logger.Debug("stopped cluster connection status fetch thread")
262+
impl.logger.Info("starting cluster connection status fetch thread")
263+
startTime := time.Now()
264+
defer func() {
265+
impl.logger.Debugw("cluster connection status fetch thread completed", "timeTaken", time.Since(startTime))
266+
}()
264267

265268
//getting all clusters
266-
clusters, err := impl.FindAllExceptVirtual()
269+
clusters, err := impl.FindAll()
267270
if err != nil {
268271
impl.logger.Errorw("error in getting all clusters", "err", err)
269272
return
@@ -845,38 +848,54 @@ func (impl *ClusterServiceImpl) FetchRolesFromGroup(userId int32) ([]*repository
845848
return roles, nil
846849
}
847850

851+
func (impl *ClusterServiceImpl) updateConnectionStatusForVirtualCluster(respMap *sync.Map, clusterId int, clusterName string) {
852+
connErr := fmt.Errorf("Get virtual cluster '%s' error: connection not setup for isolated clusters", clusterName)
853+
respMap.Store(clusterId, connErr)
854+
}
855+
848856
func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*ClusterBean, clusterExistInDb bool) {
849857
var wg sync.WaitGroup
850-
respMap := make(map[int]error)
851-
mutex := &sync.Mutex{}
852-
858+
respMap := &sync.Map{}
853859
for idx, cluster := range clusters {
860+
if cluster.IsVirtualCluster {
861+
impl.updateConnectionStatusForVirtualCluster(respMap, cluster.Id, cluster.ClusterName)
862+
continue
863+
}
854864
wg.Add(1)
855865
go func(idx int, cluster *ClusterBean) {
856866
defer wg.Done()
857867
clusterConfig := cluster.GetClusterConfig()
858868
_, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig)
859869
if err != nil {
860-
mutex.Lock()
861-
respMap[cluster.Id] = err
862-
mutex.Unlock()
870+
respMap.Store(cluster.Id, err)
863871
return
864872
}
865873

866874
id := cluster.Id
867875
if !clusterExistInDb {
868876
id = idx
869877
}
870-
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap, mutex)
878+
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap)
871879
}(idx, cluster)
872880
}
873881

874882
wg.Wait()
875883
impl.HandleErrorInClusterConnections(clusters, respMap, clusterExistInDb)
876884
}
877885

878-
func (impl *ClusterServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool) {
879-
for id, err := range respMap {
886+
func (impl *ClusterServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool) {
887+
respMap.Range(func(key, value any) bool {
888+
defer func() {
889+
// defer to handle panic on type assertion
890+
if r := recover(); r != nil {
891+
impl.logger.Errorw("error in handling error in cluster connections", "key", key, "value", value, "err", r)
892+
}
893+
}()
894+
id := key.(int)
895+
var err error
896+
if connectionError, ok := value.(error); ok {
897+
err = connectionError
898+
}
880899
errorInConnecting := ""
881900
if err != nil {
882901
errorInConnecting = err.Error()
@@ -896,7 +915,8 @@ func (impl *ClusterServiceImpl) HandleErrorInClusterConnections(clusters []*Clus
896915
//id is index of the cluster in clusters array
897916
clusters[id].ErrorInConnecting = errorInConnecting
898917
}
899-
}
918+
return true
919+
})
900920
}
901921

902922
func (impl *ClusterServiceImpl) ValidateKubeconfig(kubeConfig string) (map[string]*ValidateClusterBean, error) {
@@ -1066,7 +1086,7 @@ func (impl *ClusterServiceImpl) ValidateKubeconfig(kubeConfig string) (map[strin
10661086

10671087
}
10681088

1069-
func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sClientSet *kubernetes.Clientset, clusterId int, respMap map[int]error, mutex *sync.Mutex) {
1089+
func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sClientSet *kubernetes.Clientset, clusterId int, respMap *sync.Map) {
10701090
response, err := impl.K8sUtil.GetLiveZCall(k8s.LiveZ, k8sClientSet)
10711091
log.Println("received response for cluster livez status", "response", string(response), "err", err, "clusterId", clusterId)
10721092

@@ -1092,9 +1112,8 @@ func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sCli
10921112
} else if err == nil && string(response) != "ok" {
10931113
err = fmt.Errorf("Validation failed with response : %s", string(response))
10941114
}
1095-
mutex.Lock()
1096-
respMap[clusterId] = err
1097-
mutex.Unlock()
1115+
1116+
respMap.Store(clusterId, err)
10981117
}
10991118

11001119
func (impl *ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBean) *v1alpha1.Cluster {

pkg/cluster/EnvironmentService.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
bean2 "github.com/devtron-labs/devtron/pkg/cluster/repository/bean"
2525
"strconv"
2626
"strings"
27+
"sync"
2728
"time"
2829

2930
util2 "github.com/devtron-labs/common-lib/utils/k8s"
@@ -63,7 +64,7 @@ type EnvironmentService interface {
6364
GetByClusterId(id int) ([]*bean2.EnvironmentBean, error)
6465
GetCombinedEnvironmentListForDropDown(token string, isActionUserSuperAdmin bool, auth func(email string, object []string) map[string]bool) ([]*bean2.ClusterEnvDto, error)
6566
GetCombinedEnvironmentListForDropDownByClusterIds(token string, clusterIds []int, auth func(token string, object string) bool) ([]*bean2.ClusterEnvDto, error)
66-
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool)
67+
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool)
6768
GetDetailsById(envId int) (*repository.Environment, error)
6869
}
6970

@@ -734,7 +735,7 @@ func (impl EnvironmentServiceImpl) Delete(deleteReq *bean2.EnvironmentBean, user
734735
return nil
735736
}
736737

737-
func (impl EnvironmentServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool) {
738+
func (impl EnvironmentServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool) {
738739
impl.clusterService.HandleErrorInClusterConnections(clusters, respMap, clusterExistInDb)
739740
}
740741

0 commit comments

Comments
 (0)