From 0f679f5c527e537220c6138ff4f5e8676102da8b Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Mon, 12 Aug 2024 16:07:48 +0530 Subject: [PATCH] fix for concurrent read and write and in buildInformerAndNamespaceList --- pkg/k8s/informer/K8sInformerFactory.go | 72 ++++++++++---------------- wire_gen.go | 4 +- 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/pkg/k8s/informer/K8sInformerFactory.go b/pkg/k8s/informer/K8sInformerFactory.go index 7ddc92f08c..bc9739e9df 100644 --- a/pkg/k8s/informer/K8sInformerFactory.go +++ b/pkg/k8s/informer/K8sInformerFactory.go @@ -18,25 +18,23 @@ package informer import ( "github.com/devtron-labs/common-lib/utils/k8s" - "sync" - "time" - "github.com/devtron-labs/devtron/api/bean" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" + "sync" + "time" ) -func NewGlobalMapClusterNamespace() map[string]map[string]bool { - globalMapClusterNamespace := make(map[string]map[string]bool) +func NewGlobalMapClusterNamespace() sync.Map { + var globalMapClusterNamespace sync.Map return globalMapClusterNamespace } type K8sInformerFactoryImpl struct { logger *zap.SugaredLogger - globalMapClusterNamespace map[string]map[string]bool // {"cluster1":{"ns1":true","ns2":true"}} - mutex sync.Mutex + globalMapClusterNamespace sync.Map // {"cluster1":{"ns1":true","ns2":true"}} informerStopper map[string]chan struct{} k8sUtil *k8s.K8sServiceImpl } @@ -47,7 +45,7 @@ type K8sInformerFactory interface { CleanNamespaceInformer(clusterName string) } -func NewK8sInformerFactoryImpl(logger *zap.SugaredLogger, globalMapClusterNamespace map[string]map[string]bool, k8sUtil *k8s.K8sServiceImpl) *K8sInformerFactoryImpl { +func NewK8sInformerFactoryImpl(logger *zap.SugaredLogger, globalMapClusterNamespace sync.Map, k8sUtil *k8s.K8sServiceImpl) *K8sInformerFactoryImpl { informerFactory := &K8sInformerFactoryImpl{ logger: logger, globalMapClusterNamespace: globalMapClusterNamespace, @@ -59,19 +57,17 @@ func NewK8sInformerFactoryImpl(logger *zap.SugaredLogger, globalMapClusterNamesp func (impl *K8sInformerFactoryImpl) GetLatestNamespaceListGroupByCLuster() map[string]map[string]bool { copiedClusterNamespaces := make(map[string]map[string]bool) - for key, value := range impl.globalMapClusterNamespace { - for namespace, v := range value { - if _, ok := copiedClusterNamespaces[key]; !ok { - allNamespaces := make(map[string]bool) - allNamespaces[namespace] = v - copiedClusterNamespaces[key] = allNamespaces - } else { - allNamespaces := copiedClusterNamespaces[key] - allNamespaces[namespace] = v - copiedClusterNamespaces[key] = allNamespaces - } - } - } + impl.globalMapClusterNamespace.Range(func(key, value interface{}) bool { + clusterName := key.(string) + allNamespaces := value.(*sync.Map) + namespaceMap := make(map[string]bool) + allNamespaces.Range(func(nsKey, nsValue interface{}) bool { + namespaceMap[nsKey.(string)] = nsValue.(bool) + return true + }) + copiedClusterNamespaces[clusterName] = namespaceMap + return true + }) return copiedClusterNamespaces } @@ -86,14 +82,14 @@ func (impl *K8sInformerFactoryImpl) BuildInformer(clusterInfo []*bean.ClusterInf CertData: info.CertData, CAData: info.CAData, } - impl.buildInformerAndNamespaceList(info.ClusterName, clusterConfig, &impl.mutex) + impl.buildInformerAndNamespaceList(info.ClusterName, clusterConfig) } return } -func (impl *K8sInformerFactoryImpl) buildInformerAndNamespaceList(clusterName string, clusterConfig *k8s.ClusterConfig, mutex *sync.Mutex) map[string]map[string]bool { - allNamespaces := make(map[string]bool) - impl.globalMapClusterNamespace[clusterName] = allNamespaces +func (impl *K8sInformerFactoryImpl) buildInformerAndNamespaceList(clusterName string, clusterConfig *k8s.ClusterConfig) sync.Map { + allNamespaces := sync.Map{} + impl.globalMapClusterNamespace.Store(clusterName, &allNamespaces) _, _, clusterClient, err := impl.k8sUtil.GetK8sConfigAndClients(clusterConfig) if err != nil { impl.logger.Errorw("error in getting k8s clientset", "err", err, "clusterName", clusterConfig.ClusterName) @@ -105,30 +101,16 @@ func (impl *K8sInformerFactoryImpl) buildInformerAndNamespaceList(clusterName st nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { if mobject, ok := obj.(metav1.Object); ok { - mutex.Lock() - defer mutex.Unlock() - if _, ok := impl.globalMapClusterNamespace[clusterName]; !ok { - allNamespaces := make(map[string]bool) - allNamespaces[mobject.GetName()] = true - impl.globalMapClusterNamespace[clusterName] = allNamespaces - } else { - allNamespaces := impl.globalMapClusterNamespace[clusterName] - allNamespaces[mobject.GetName()] = true - impl.globalMapClusterNamespace[clusterName] = allNamespaces - } - //mutex.Unlock() + value, _ := impl.globalMapClusterNamespace.Load(clusterName) + allNamespaces := value.(*sync.Map) + allNamespaces.Store(mobject.GetName(), true) } }, DeleteFunc: func(obj interface{}) { if object, ok := obj.(metav1.Object); ok { - mutex.Lock() - defer mutex.Unlock() - if _, ok := impl.globalMapClusterNamespace[clusterName]; ok { - allNamespaces := impl.globalMapClusterNamespace[clusterName] - delete(allNamespaces, object.GetName()) - impl.globalMapClusterNamespace[clusterName] = allNamespaces - //mutex.Unlock() - } + value, _ := impl.globalMapClusterNamespace.Load(clusterName) + allNamespaces := value.(*sync.Map) + allNamespaces.Delete(object.GetName()) } }, }) diff --git a/wire_gen.go b/wire_gen.go index c349364e73..0d69bd654d 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -267,8 +267,8 @@ func InitializeApp() (*App, error) { return nil, err } serviceClientImpl := cluster.NewServiceClientImpl(sugaredLogger, argoCDConnectionManagerImpl) - v := informer.NewGlobalMapClusterNamespace() - k8sInformerFactoryImpl := informer.NewK8sInformerFactoryImpl(sugaredLogger, v, k8sServiceImpl) + syncMap := informer.NewGlobalMapClusterNamespace() + k8sInformerFactoryImpl := informer.NewK8sInformerFactoryImpl(sugaredLogger, syncMap, k8sServiceImpl) defaultAuthPolicyRepositoryImpl := repository4.NewDefaultAuthPolicyRepositoryImpl(db, sugaredLogger) defaultAuthRoleRepositoryImpl := repository4.NewDefaultAuthRoleRepositoryImpl(db, sugaredLogger) userAuthRepositoryImpl := repository4.NewUserAuthRepositoryImpl(db, sugaredLogger, defaultAuthPolicyRepositoryImpl, defaultAuthRoleRepositoryImpl)