From 2335f1fecb91467a89c71ce59d58bb177f64c4bd Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Tue, 20 May 2025 17:47:10 +0530 Subject: [PATCH 1/6] panic recovered go-routines --- client/argocdServer/connection/Connection.go | 11 +++++++++-- pkg/build/trigger/HandlerService.go | 11 ++++++++++- pkg/workflow/dag/WorkflowDagExecutor.go | 5 ++++- wire_gen.go | 6 +++--- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/client/argocdServer/connection/Connection.go b/client/argocdServer/connection/Connection.go index 7e690864d5..24f8141442 100644 --- a/client/argocdServer/connection/Connection.go +++ b/client/argocdServer/connection/Connection.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/argoproj/argo-cd/v2/pkg/apiclient/account" "github.com/argoproj/argo-cd/v2/util/settings" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils/k8s" "github.com/devtron-labs/devtron/client/argocdServer/bean" config2 "github.com/devtron-labs/devtron/client/argocdServer/config" @@ -81,6 +82,7 @@ type ArgoCDConnectionManagerImpl struct { gitOpsConfigReadService config.GitOpsConfigReadService runTimeConfig *k8s.RuntimeConfig argoCDConfigGetter config2.ArgoCDConfigGetter + asyncRunnable *async.Runnable } func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger, @@ -92,7 +94,8 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger, versionService version.VersionService, gitOpsConfigReadService config.GitOpsConfigReadService, runTimeConfig *k8s.RuntimeConfig, - argoCDConfigGetter config2.ArgoCDConfigGetter) (*ArgoCDConnectionManagerImpl, error) { + argoCDConfigGetter config2.ArgoCDConfigGetter, + asyncRunnable *async.Runnable) (*ArgoCDConnectionManagerImpl, error) { argoUserServiceImpl := &ArgoCDConnectionManagerImpl{ logger: Logger, settingsManager: settingsManager, @@ -105,13 +108,17 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger, gitOpsConfigReadService: gitOpsConfigReadService, runTimeConfig: runTimeConfig, argoCDConfigGetter: argoCDConfigGetter, + asyncRunnable: asyncRunnable, } if !runTimeConfig.LocalDevMode { grpcConfig, err := argoCDConfigGetter.GetGRPCConfig() if err != nil { Logger.Errorw("error in GetAllGRPCConfigs", "error", err) } - go argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig) + runnableFunc := func() { + argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig) + } + asyncRunnable.Execute(runnableFunc) } return argoUserServiceImpl, nil } diff --git a/pkg/build/trigger/HandlerService.go b/pkg/build/trigger/HandlerService.go index 413edd3964..b32479513f 100644 --- a/pkg/build/trigger/HandlerService.go +++ b/pkg/build/trigger/HandlerService.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" blob_storage "github.com/devtron-labs/common-lib/blob-storage" "github.com/devtron-labs/common-lib/utils" bean4 "github.com/devtron-labs/common-lib/utils/bean" @@ -116,6 +117,7 @@ type HandlerServiceImpl struct { clusterService cluster.ClusterService envService environment.EnvironmentService K8sUtil *k8s.K8sServiceImpl + asyncRunnable *async.Runnable } func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.WorkflowService, @@ -141,6 +143,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W clusterService cluster.ClusterService, envService environment.EnvironmentService, K8sUtil *k8s.K8sServiceImpl, + asyncRunnable *async.Runnable, ) *HandlerServiceImpl { buildxCacheFlags := &BuildxCacheFlags{} err := env.Parse(buildxCacheFlags) @@ -174,6 +177,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W clusterService: clusterService, envService: envService, K8sUtil: K8sUtil, + asyncRunnable: asyncRunnable, } config, err := types.GetCiConfig() if err != nil { @@ -628,7 +632,12 @@ func (impl *HandlerServiceImpl) triggerCiPipeline(trigger types.Trigger) (int, e } middleware.CiTriggerCounter.WithLabelValues(pipeline.App.AppName, pipeline.Name).Inc() - go impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest) + + runnableFunc := func() { + impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest) + } + impl.asyncRunnable.Execute(runnableFunc) + return savedCiWf.Id, err } diff --git a/pkg/workflow/dag/WorkflowDagExecutor.go b/pkg/workflow/dag/WorkflowDagExecutor.go index a53e90dbd6..3c18181fec 100644 --- a/pkg/workflow/dag/WorkflowDagExecutor.go +++ b/pkg/workflow/dag/WorkflowDagExecutor.go @@ -1050,7 +1050,10 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger } else { ciArtifactArr = append(ciArtifactArr, pluginArtifacts[0]) } - go impl.WriteCiSuccessEvent(request, pipelineModal, buildArtifact) + runnableFunc := func() { + impl.WriteCiSuccessEvent(request, pipelineModal, buildArtifact) + } + impl.asyncRunnable.Execute(runnableFunc) async := false // execute auto trigger in batch on CI success event diff --git a/wire_gen.go b/wire_gen.go index c8b8946d92..d67e9179eb 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -385,7 +385,8 @@ func InitializeApp() (*App, error) { return nil, err } argoCDConfigGetterImpl := config3.NewArgoCDConfigGetter(beanConfig, environmentVariables, acdAuthConfig, clusterReadServiceImpl, sugaredLogger, k8sServiceImpl) - argoCDConnectionManagerImpl, err := connection.NewArgoCDConnectionManagerImpl(sugaredLogger, settingsManager, moduleRepositoryImpl, environmentVariables, k8sServiceImpl, k8sCommonServiceImpl, versionServiceImpl, gitOpsConfigReadServiceImpl, k8sRuntimeConfig, argoCDConfigGetterImpl) + runnable := asyncProvider.NewAsyncRunnable(sugaredLogger) + argoCDConnectionManagerImpl, err := connection.NewArgoCDConnectionManagerImpl(sugaredLogger, settingsManager, moduleRepositoryImpl, environmentVariables, k8sServiceImpl, k8sCommonServiceImpl, versionServiceImpl, gitOpsConfigReadServiceImpl, k8sRuntimeConfig, argoCDConfigGetterImpl, runnable) if err != nil { return nil, err } @@ -404,7 +405,6 @@ func InitializeApp() (*App, error) { } chartTemplateServiceImpl := util.NewChartTemplateServiceImpl(sugaredLogger) gitOperationServiceImpl := git.NewGitOperationServiceImpl(sugaredLogger, gitFactory, gitOpsConfigReadServiceImpl, chartTemplateServiceImpl, environmentVariables) - runnable := asyncProvider.NewAsyncRunnable(sugaredLogger) repositoryCredsK8sClientImpl := repoCredsK8sClient.NewRepositoryCredsK8sClientImpl(sugaredLogger, k8sServiceImpl) argoClientWrapperServiceEAImpl := argocdServer.NewArgoClientWrapperServiceEAImpl(sugaredLogger, repositoryCredsK8sClientImpl, argoCDConfigGetterImpl) argoK8sClientImpl := argocdServer.NewArgoK8sClientImpl(sugaredLogger, k8sServiceImpl) @@ -685,7 +685,7 @@ func InitializeApp() (*App, error) { return nil, err } blobStorageConfigServiceImpl := pipeline.NewBlobStorageConfigServiceImpl(sugaredLogger, k8sServiceImpl, ciCdConfig) - handlerServiceImpl := trigger.NewHandlerServiceImpl(sugaredLogger, workflowServiceImpl, ciPipelineMaterialRepositoryImpl, ciPipelineRepositoryImpl, ciArtifactRepositoryImpl, pipelineStageServiceImpl, userServiceImpl, ciTemplateReadServiceImpl, appCrudOperationServiceImpl, environmentRepositoryImpl, appRepositoryImpl, scopedVariableManagerImpl, customTagServiceImpl, ciCdPipelineOrchestratorImpl, attributesServiceImpl, pluginInputVariableParserImpl, globalPluginServiceImpl, ciServiceImpl, ciWorkflowRepositoryImpl, clientImpl, ciLogServiceImpl, blobStorageConfigServiceImpl, clusterServiceImplExtended, environmentServiceImpl, k8sServiceImpl) + handlerServiceImpl := trigger.NewHandlerServiceImpl(sugaredLogger, workflowServiceImpl, ciPipelineMaterialRepositoryImpl, ciPipelineRepositoryImpl, ciArtifactRepositoryImpl, pipelineStageServiceImpl, userServiceImpl, ciTemplateReadServiceImpl, appCrudOperationServiceImpl, environmentRepositoryImpl, appRepositoryImpl, scopedVariableManagerImpl, customTagServiceImpl, ciCdPipelineOrchestratorImpl, attributesServiceImpl, pluginInputVariableParserImpl, globalPluginServiceImpl, ciServiceImpl, ciWorkflowRepositoryImpl, clientImpl, ciLogServiceImpl, blobStorageConfigServiceImpl, clusterServiceImplExtended, environmentServiceImpl, k8sServiceImpl, runnable) gitWebhookServiceImpl := gitWebhook.NewGitWebhookServiceImpl(sugaredLogger, gitWebhookRepositoryImpl, handlerServiceImpl) gitWebhookRestHandlerImpl := restHandler.NewGitWebhookRestHandlerImpl(sugaredLogger, gitWebhookServiceImpl) ecrConfig, err := pipeline.GetEcrConfig() From 0a29c22c199dd898dc4dea51287f6c6146217645 Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Tue, 20 May 2025 18:47:01 +0530 Subject: [PATCH 2/6] panic recovered go-routines --- api/restHandler/BatchOperationRestHandler.go | 10 +++++++--- .../ArgoApplicationServiceExtended.go | 9 +++++++-- pkg/cluster/ClusterService.go | 11 ++++++++--- pkg/k8s/K8sCommonService.go | 17 ++++++++++++----- pkg/workflow/dag/WorkflowDagExecutor.go | 5 +++-- 5 files changed, 37 insertions(+), 15 deletions(-) diff --git a/api/restHandler/BatchOperationRestHandler.go b/api/restHandler/BatchOperationRestHandler.go index fd598e5725..f9df83eeed 100644 --- a/api/restHandler/BatchOperationRestHandler.go +++ b/api/restHandler/BatchOperationRestHandler.go @@ -22,6 +22,7 @@ import ( "errors" "net/http" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/devtron/api/restHandler/common" "github.com/devtron-labs/devtron/pkg/apis/devtron/v1" "github.com/devtron-labs/devtron/pkg/apis/devtron/v1/validation" @@ -44,10 +45,11 @@ type BatchOperationRestHandlerImpl struct { teamService team.TeamService logger *zap.SugaredLogger enforcerUtil rbac.EnforcerUtil + asyncRunnable *async.Runnable } func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer casbin.Enforcer, workflowAction batch.WorkflowAction, - teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil) *BatchOperationRestHandlerImpl { + teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil, asyncRunnable *async.Runnable) *BatchOperationRestHandlerImpl { return &BatchOperationRestHandlerImpl{ userAuthService: userAuthService, enforcer: enforcer, @@ -55,6 +57,7 @@ func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer teamService: teamService, logger: logger, enforcerUtil: enforcerUtil, + asyncRunnable: asyncRunnable, } } @@ -104,13 +107,14 @@ func (handler BatchOperationRestHandlerImpl) Operate(w http.ResponseWriter, r *h ctx, cancel := context.WithCancel(r.Context()) if cn, ok := w.(http.CloseNotifier); ok { - go func(done <-chan struct{}, closed <-chan bool) { + runnableFunc := func(done <-chan struct{}, closed <-chan bool) { select { case <-done: case <-closed: cancel() } - }(ctx.Done(), cn.CloseNotify()) + } + handler.asyncRunnable.Execute(func() { runnableFunc(ctx.Done(), cn.CloseNotify()) }) } err = handler.workflowAction.Execute(&workflow, emptyProps, r.Context()) if err != nil { diff --git a/pkg/argoApplication/ArgoApplicationServiceExtended.go b/pkg/argoApplication/ArgoApplicationServiceExtended.go index 0cfb81b2d3..0fce636655 100644 --- a/pkg/argoApplication/ArgoApplicationServiceExtended.go +++ b/pkg/argoApplication/ArgoApplicationServiceExtended.go @@ -22,6 +22,7 @@ import ( "fmt" application2 "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils/k8s" openapi "github.com/devtron-labs/devtron/api/helm-app/openapiClient" "github.com/devtron-labs/devtron/client/argocdServer" @@ -46,6 +47,7 @@ type ArgoApplicationServiceExtendedImpl struct { argoApplicationReadService read.ArgoApplicationReadService clusterService cluster.ClusterService acdClientWrapper argocdServer.ArgoClientWrapperService + asyncRunnable *async.Runnable } func NewArgoApplicationServiceExtendedServiceImpl( @@ -54,6 +56,7 @@ func NewArgoApplicationServiceExtendedServiceImpl( acdClientWrapper argocdServer.ArgoClientWrapperService, argoApplicationReadService read.ArgoApplicationReadService, clusterService cluster.ClusterService, + asyncRunnable *async.Runnable, ) *ArgoApplicationServiceExtendedImpl { return &ArgoApplicationServiceExtendedImpl{ aCDAuthConfig: aCDAuthConfig, @@ -61,6 +64,7 @@ func NewArgoApplicationServiceExtendedServiceImpl( argoApplicationReadService: argoApplicationReadService, acdClientWrapper: acdClientWrapper, ArgoApplicationServiceImpl: argoApplicationServiceImpl, + asyncRunnable: asyncRunnable, } } @@ -328,7 +332,7 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat for _, node := range queryNodes { rQuery := transform(node, query.ApplicationName) qCount++ - go func(request application2.ApplicationResourceRequest) { + runnableFunc := func(request application2.ApplicationResourceRequest) { ctx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() startTime := time.Now() @@ -343,7 +347,8 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat } else { response <- argoApplication.Result{Response: nil, Error: fmt.Errorf("connection closed by client"), Request: &request} } - }(*rQuery) + } + c.asyncRunnable.Execute(func() { runnableFunc(*rQuery) }) } if qCount == 0 { diff --git a/pkg/cluster/ClusterService.go b/pkg/cluster/ClusterService.go index 1b00c2f84f..615af7c1b8 100644 --- a/pkg/cluster/ClusterService.go +++ b/pkg/cluster/ClusterService.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/devtron-labs/common-lib/async" informerBean "github.com/devtron-labs/common-lib/informer" "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/devtron/pkg/cluster/adapter" @@ -97,6 +98,7 @@ type ClusterServiceImpl struct { userRepository repository3.UserRepository roleGroupRepository repository3.RoleGroupRepository clusterReadService read.ClusterReadService + asyncRunnable *async.Runnable } func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.SugaredLogger, @@ -105,7 +107,8 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap. roleGroupRepository repository3.RoleGroupRepository, envVariables *globalUtil.EnvironmentVariables, cronLogger *cronUtil.CronLoggerImpl, - clusterReadService read.ClusterReadService) (*ClusterServiceImpl, error) { + clusterReadService read.ClusterReadService, + asyncRunnable *async.Runnable) (*ClusterServiceImpl, error) { clusterService := &ClusterServiceImpl{ clusterRepository: repository, logger: logger, @@ -115,6 +118,7 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap. userRepository: userRepository, roleGroupRepository: roleGroupRepository, clusterReadService: clusterReadService, + asyncRunnable: asyncRunnable, } // initialise cron newCron := cron.New(cron.WithChain(cron.Recover(cronLogger))) @@ -782,7 +786,7 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB continue } wg.Add(1) - go func(idx int, cluster *bean.ClusterBean) { + runnableFunc := func(idx int, cluster *bean.ClusterBean) { defer wg.Done() clusterConfig := cluster.GetClusterConfig() _, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig) @@ -796,7 +800,8 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB id = idx } impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap) - }(idx, cluster) + } + impl.asyncRunnable.Execute(func() { runnableFunc(idx, cluster) }) } wg.Wait() diff --git a/pkg/k8s/K8sCommonService.go b/pkg/k8s/K8sCommonService.go index 360b8ab53d..39cae5dd0f 100644 --- a/pkg/k8s/K8sCommonService.go +++ b/pkg/k8s/K8sCommonService.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils/k8s" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/devtron/api/bean/AppView" @@ -75,6 +76,7 @@ type K8sCommonServiceImpl struct { K8sApplicationServiceConfig *K8sApplicationServiceConfig argoApplicationConfigService config.ArgoApplicationConfigService ClusterReadService read.ClusterReadService + asyncRunnable *async.Runnable } type K8sApplicationServiceConfig struct { BatchSize int `env:"BATCH_SIZE" envDefault:"5" description:"there is feature to get URL's of services/ingresses. so to extract those, we need to parse all the servcie and ingress objects of the application. this BATCH_SIZE flag controls the no of these objects get parsed in one go."` @@ -83,7 +85,7 @@ type K8sApplicationServiceConfig struct { func NewK8sCommonServiceImpl(Logger *zap.SugaredLogger, k8sUtils *k8s.K8sServiceImpl, argoApplicationConfigService config.ArgoApplicationConfigService, - ClusterReadService read.ClusterReadService) *K8sCommonServiceImpl { + ClusterReadService read.ClusterReadService, asyncRunnable *async.Runnable) *K8sCommonServiceImpl { cfg := &K8sApplicationServiceConfig{} err := env.Parse(cfg) if err != nil { @@ -95,6 +97,7 @@ func NewK8sCommonServiceImpl(Logger *zap.SugaredLogger, k8sUtils *k8s.K8sService K8sApplicationServiceConfig: cfg, argoApplicationConfigService: argoApplicationConfigService, ClusterReadService: ClusterReadService, + asyncRunnable: asyncRunnable, } } @@ -295,7 +298,8 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque var res []bean5.BatchResourceResponse ctx, cancel := context.WithTimeout(ctx, time.Duration(impl.K8sApplicationServiceConfig.TimeOutInSeconds)*time.Second) defer cancel() - go func() { + + runnableFunc := func() { ans := impl.getManifestsByBatch(ctx, requests) select { case <-ctx.Done(): @@ -303,7 +307,9 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque default: ch <- ans } - }() + } + impl.asyncRunnable.Execute(runnableFunc) + select { case ans := <-ch: res = ans @@ -398,7 +404,7 @@ func (impl *K8sCommonServiceImpl) getManifestsByBatch(ctx context.Context, reque var wg sync.WaitGroup for j := 0; j < batchSize; j++ { wg.Add(1) - go func(j int) { + runnableFunc := func(j int) { resp := bean5.BatchResourceResponse{} response, err := impl.GetResource(ctx, &requests[i+j]) if response != nil { @@ -407,7 +413,8 @@ func (impl *K8sCommonServiceImpl) getManifestsByBatch(ctx context.Context, reque resp.Err = err res[i+j] = resp wg.Done() - }(j) + } + impl.asyncRunnable.Execute(func() { runnableFunc(j) }) } wg.Wait() i += batchSize diff --git a/pkg/workflow/dag/WorkflowDagExecutor.go b/pkg/workflow/dag/WorkflowDagExecutor.go index 3c18181fec..aed15b9c0e 100644 --- a/pkg/workflow/dag/WorkflowDagExecutor.go +++ b/pkg/workflow/dag/WorkflowDagExecutor.go @@ -1075,7 +1075,7 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger for j := 0; j < batchSize; j++ { wg.Add(1) index := i + j - go func(index int) { + runnableFunc := func(index int) { defer wg.Done() ciArtifact := ciArtifactArr[index] // handle individual CiArtifact success event @@ -1083,7 +1083,8 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger if err != nil { impl.logger.Errorw("error on handle ci success event", "ciArtifactId", ciArtifact.Id, "err", err) } - }(index) + } + impl.asyncRunnable.Execute(func() { runnableFunc(index) }) } wg.Wait() i += batchSize From da1bcf1a202dd1bfff444337da7f599d9cd29392 Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Tue, 20 May 2025 22:28:12 +0530 Subject: [PATCH 3/6] panic recovered go-routines --- pkg/app/AppService.go | 19 +++++++++----- .../UserTerminalAccessService.go | 15 +++++++---- .../resourceTree/ResourceTreeService.go | 13 +++++++--- .../trigger/devtronApps/HandlerService.go | 9 +++++-- .../devtronApps/deployStageHandlerCode.go | 17 +++++++----- .../in/WorkflowEventProcessorService.go | 15 +++++++---- pkg/terminal/terminalSesion.go | 26 +++++++++++-------- pkg/variables/ScopedVariableService.go | 13 +++++++--- pkg/workflow/status/WorkflowStatusService.go | 11 ++++++-- wire_gen.go | 18 ++++++------- 10 files changed, 100 insertions(+), 56 deletions(-) diff --git a/pkg/app/AppService.go b/pkg/app/AppService.go index 589f272bb3..d1963c9f9c 100644 --- a/pkg/app/AppService.go +++ b/pkg/app/AppService.go @@ -20,6 +20,10 @@ import ( "context" "errors" "fmt" + "net/url" + "strconv" + "time" + health2 "github.com/argoproj/gitops-engine/pkg/health" argoApplication "github.com/devtron-labs/devtron/client/argocdServer/bean" "github.com/devtron-labs/devtron/internal/sql/models" @@ -40,12 +44,10 @@ import ( "github.com/devtron-labs/devtron/pkg/deployment/manifest/deploymentTemplate/read" bean4 "github.com/devtron-labs/devtron/pkg/deployment/trigger/devtronApps/bean" "github.com/devtron-labs/devtron/pkg/workflow/cd" - "net/url" - "strconv" - "time" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/common-lib/utils/k8s/health" "github.com/devtron-labs/devtron/api/bean" @@ -124,6 +126,7 @@ type AppServiceImpl struct { deploymentConfigService common2.DeploymentConfigService envConfigOverrideReadService read.EnvConfigOverrideService cdWorkflowRunnerService cd.CdWorkflowRunnerService + asyncRunnable *async.Runnable } type AppService interface { @@ -151,7 +154,7 @@ func NewAppService( cdWorkflowRepository pipelineConfig.CdWorkflowRepository, commonService commonService.CommonService, chartTemplateService internalUtil.ChartTemplateService, - cdPipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository, + pipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository, pipelineStatusTimelineResourcesService status2.PipelineStatusTimelineResourcesService, pipelineStatusSyncDetailService status2.PipelineStatusSyncDetailService, pipelineStatusTimelineService status2.PipelineStatusTimelineService, @@ -164,7 +167,8 @@ func NewAppService( appListingService AppListingService, deploymentConfigService common2.DeploymentConfigService, envConfigOverrideReadService read.EnvConfigOverrideService, - cdWorkflowRunnerService cd.CdWorkflowRunnerService) *AppServiceImpl { + cdWorkflowRunnerService cd.CdWorkflowRunnerService, + asyncRunnable *async.Runnable) *AppServiceImpl { appServiceImpl := &AppServiceImpl{ mergeUtil: mergeUtil, pipelineOverrideRepository: pipelineOverrideRepository, @@ -178,7 +182,7 @@ func NewAppService( cdWorkflowRepository: cdWorkflowRepository, commonService: commonService, chartTemplateService: chartTemplateService, - pipelineStatusTimelineRepository: cdPipelineStatusTimelineRepo, + pipelineStatusTimelineRepository: pipelineStatusTimelineRepo, pipelineStatusTimelineResourcesService: pipelineStatusTimelineResourcesService, pipelineStatusSyncDetailService: pipelineStatusSyncDetailService, pipelineStatusTimelineService: pipelineStatusTimelineService, @@ -195,6 +199,7 @@ func NewAppService( deploymentConfigService: deploymentConfigService, envConfigOverrideReadService: envConfigOverrideReadService, cdWorkflowRunnerService: cdWorkflowRunnerService, + asyncRunnable: asyncRunnable, } return appServiceImpl } @@ -320,7 +325,7 @@ func (impl *AppServiceImpl) UpdateDeploymentStatusForGitOpsPipelines(app *v1alph } if isSucceeded { impl.logger.Infow("writing cd success event", "gitHash", gitHash, "pipelineOverride", pipelineOverride) - go impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) + impl.asyncRunnable.Execute(func() { impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) }) } } else { impl.logger.Debugw("event received for older triggered revision", "gitHash", gitHash) diff --git a/pkg/clusterTerminalAccess/UserTerminalAccessService.go b/pkg/clusterTerminalAccess/UserTerminalAccessService.go index b6ab4a6b09..220655b82b 100644 --- a/pkg/clusterTerminalAccess/UserTerminalAccessService.go +++ b/pkg/clusterTerminalAccess/UserTerminalAccessService.go @@ -21,11 +21,18 @@ import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" + "sync" + "time" + "github.com/caarlos0/env/v6" + "github.com/devtron-labs/common-lib/async" k8s2 "github.com/devtron-labs/common-lib/utils/k8s" "github.com/devtron-labs/devtron/api/helm-app/service/bean" "github.com/devtron-labs/devtron/internal/sql/models" "github.com/devtron-labs/devtron/internal/sql/repository" + "github.com/devtron-labs/devtron/pkg/asyncProvider" utils1 "github.com/devtron-labs/devtron/pkg/clusterTerminalAccess/clusterTerminalUtils" "github.com/devtron-labs/devtron/pkg/k8s" bean2 "github.com/devtron-labs/devtron/pkg/k8s/bean" @@ -43,10 +50,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "strconv" - "strings" - "sync" - "time" ) type UserTerminalAccessService interface { @@ -75,6 +78,7 @@ type UserTerminalAccessServiceImpl struct { terminalSessionHandler terminal.TerminalSessionHandler K8sCapacityService capacity.K8sCapacityService k8sUtil *k8s2.K8sServiceImpl + asyncRunnable *async.Runnable } type UserTerminalAccessSessionData struct { @@ -114,6 +118,7 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR terminalSessionHandler: terminalSessionHandler, K8sCapacityService: K8sCapacityService, k8sUtil: k8sUtil, + asyncRunnable: asyncProvider.NewAsyncRunnable(logger), } podStatusSyncCron.Start() _, err := podStatusSyncCron.AddFunc(fmt.Sprintf("@every %ds", config.TerminalPodStatusSyncTimeInSecs), accessServiceImpl.SyncPodStatus) @@ -121,7 +126,7 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR logger.Errorw("error occurred while starting cron job", "time in secs", config.TerminalPodStatusSyncTimeInSecs) return nil, err } - go accessServiceImpl.SyncRunningInstances() + accessServiceImpl.asyncRunnable.Execute(func() { accessServiceImpl.SyncRunningInstances() }) return accessServiceImpl, err } func (impl *UserTerminalAccessServiceImpl) ValidateShell(podName, namespace, shellName, containerName string, clusterId int) (bool, string, error) { diff --git a/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go b/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go index b6fd6b5f69..66a6756574 100644 --- a/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go +++ b/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go @@ -19,6 +19,10 @@ package resourceTree import ( "context" "fmt" + "github.com/devtron-labs/common-lib/async" + "strconv" + "time" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" "github.com/argoproj/gitops-engine/pkg/health" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" @@ -42,8 +46,6 @@ import ( application2 "github.com/devtron-labs/devtron/pkg/k8s/application" util2 "github.com/devtron-labs/devtron/util" "go.uber.org/zap" - "strconv" - "time" ) type Service interface { @@ -66,6 +68,7 @@ type ServiceImpl struct { k8sApplicationService application2.K8sApplicationService k8sCommonService k8s.K8sCommonService environmentReadService read2.EnvironmentReadService + asyncRunnable *async.Runnable } func NewServiceImpl(logger *zap.SugaredLogger, @@ -78,6 +81,7 @@ func NewServiceImpl(logger *zap.SugaredLogger, k8sApplicationService application2.K8sApplicationService, k8sCommonService k8s.K8sCommonService, environmentReadService read2.EnvironmentReadService, + asyncRunnable *async.Runnable, ) *ServiceImpl { serviceImpl := &ServiceImpl{ logger: logger, @@ -90,6 +94,7 @@ func NewServiceImpl(logger *zap.SugaredLogger, k8sApplicationService: k8sApplicationService, k8sCommonService: k8sCommonService, environmentReadService: environmentReadService, + asyncRunnable: asyncRunnable, } return serviceImpl } @@ -164,7 +169,7 @@ func (impl *ServiceImpl) FetchResourceTree(ctx context.Context, appId int, envId } } resourceTree = util2.InterfaceToMapAdapter(resp) - go func() { + impl.asyncRunnable.Execute(func() { if resp.Status == string(health.HealthStatusHealthy) { err = impl.cdApplicationStatusUpdateHandler.SyncPipelineStatusForResourceTreeCall(cdPipeline) if err != nil { @@ -176,7 +181,7 @@ func (impl *ServiceImpl) FetchResourceTree(ctx context.Context, appId int, envId if err != nil { impl.logger.Warnw("error in updating app status", "err", err, "appId", cdPipeline.AppId, "envId", cdPipeline.EnvironmentId) } - }() + }) k8sAppDetail := AppView.AppDetailContainer{ DeploymentDetailContainer: AppView.DeploymentDetailContainer{ ClusterId: cdPipeline.Environment.ClusterId, diff --git a/pkg/deployment/trigger/devtronApps/HandlerService.go b/pkg/deployment/trigger/devtronApps/HandlerService.go index 4382f14ff0..1247ed2f9f 100644 --- a/pkg/deployment/trigger/devtronApps/HandlerService.go +++ b/pkg/deployment/trigger/devtronApps/HandlerService.go @@ -19,6 +19,10 @@ package devtronApps import ( "bufio" "context" + "os" + "time" + + "github.com/devtron-labs/common-lib/async" pubsub "github.com/devtron-labs/common-lib/pubsub-lib" util5 "github.com/devtron-labs/common-lib/utils/k8s" bean3 "github.com/devtron-labs/devtron/api/bean" @@ -36,6 +40,7 @@ import ( "github.com/devtron-labs/devtron/pkg/app" bean4 "github.com/devtron-labs/devtron/pkg/app/bean" "github.com/devtron-labs/devtron/pkg/app/status" + "github.com/devtron-labs/devtron/pkg/asyncProvider" "github.com/devtron-labs/devtron/pkg/attributes" "github.com/devtron-labs/devtron/pkg/auth/user" userBean "github.com/devtron-labs/devtron/pkg/auth/user/bean" @@ -70,8 +75,6 @@ import ( util2 "github.com/devtron-labs/devtron/util/event" "github.com/devtron-labs/devtron/util/rbac" "go.uber.org/zap" - "os" - "time" ) /* @@ -166,6 +169,7 @@ type HandlerServiceImpl struct { ciLogService pipeline.CiLogService workflowService executor.WorkflowService blobConfigStorageService pipeline.BlobStorageConfigService + asyncRunnable *async.Runnable } func NewHandlerServiceImpl(logger *zap.SugaredLogger, @@ -293,6 +297,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, ciLogService: ciLogService, workflowService: workflowService, blobConfigStorageService: blobConfigStorageService, + asyncRunnable: asyncProvider.NewAsyncRunnable(logger), } config, err := types.GetCdConfig() if err != nil { diff --git a/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go b/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go index 70f7620216..8a1ee736a7 100644 --- a/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go +++ b/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go @@ -20,6 +20,13 @@ import ( "context" "errors" "fmt" + "net/http" + "path" + "regexp" + "strconv" + "strings" + "time" + bean3 "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/api/bean/gitOps" bean6 "github.com/devtron-labs/devtron/api/helm-app/bean" @@ -57,12 +64,6 @@ import ( "google.golang.org/grpc/codes" status2 "google.golang.org/grpc/status" "helm.sh/helm/v3/pkg/chart" - "net/http" - "path" - "regexp" - "strconv" - "strings" - "time" ) func (impl *HandlerServiceImpl) TriggerStageForBulk(triggerRequest bean.TriggerRequest) error { @@ -741,7 +742,9 @@ func (impl *HandlerServiceImpl) triggerPipeline(overrideRequest *bean3.ValuesOve } } - go impl.writeCDTriggerEvent(overrideRequest, valuesOverrideResponse.Artifact, valuesOverrideResponse.PipelineOverride.PipelineReleaseCounter, valuesOverrideResponse.PipelineOverride.Id, overrideRequest.WfrId) + impl.asyncRunnable.Execute(func() { + impl.writeCDTriggerEvent(overrideRequest, valuesOverrideResponse.Artifact, valuesOverrideResponse.PipelineOverride.PipelineReleaseCounter, valuesOverrideResponse.PipelineOverride.Id, overrideRequest.WfrId) + }) _ = impl.markImageScanDeployed(newCtx, overrideRequest.AppId, overrideRequest.EnvId, overrideRequest.ClusterId, valuesOverrideResponse.Artifact.ImageDigest, valuesOverrideResponse.Artifact.ScanEnabled, valuesOverrideResponse.Artifact.Image) diff --git a/pkg/eventProcessor/in/WorkflowEventProcessorService.go b/pkg/eventProcessor/in/WorkflowEventProcessorService.go index 7012b20adf..4211fe8b9a 100644 --- a/pkg/eventProcessor/in/WorkflowEventProcessorService.go +++ b/pkg/eventProcessor/in/WorkflowEventProcessorService.go @@ -21,7 +21,14 @@ import ( "encoding/json" "errors" "fmt" + "slices" + "strconv" + "sync" + "time" + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/devtron-labs/common-lib/async" + commonConstants "github.com/devtron-labs/common-lib/constants" pubsub "github.com/devtron-labs/common-lib/pubsub-lib" "github.com/devtron-labs/common-lib/pubsub-lib/model" "github.com/devtron-labs/common-lib/utils/registry" @@ -63,10 +70,6 @@ import ( "go.uber.org/zap" "gopkg.in/go-playground/validator.v9" "k8s.io/utils/pointer" - "slices" - "strconv" - "sync" - "time" ) type WorkflowEventProcessorImpl struct { @@ -90,6 +93,7 @@ type WorkflowEventProcessorImpl struct { cdPipelineConfigService pipeline.CdPipelineConfigService userDeploymentRequestService service.UserDeploymentRequestService ucid ucid.Service + asyncRunnable *async.Runnable devtronAppReleaseContextMap map[int]bean.DevtronAppReleaseContextType devtronAppReleaseContextMapLock *sync.Mutex @@ -156,6 +160,7 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, cdWorkflowRepository: cdWorkflowRepository, deploymentConfigService: deploymentConfigService, ciHandlerService: ciHandlerService, + asyncRunnable: async.NewAsyncRunnable(logger, commonConstants.Orchestrator), } appServiceConfig, err := app.GetAppServiceConfig() if err != nil { @@ -163,7 +168,7 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, } impl.appServiceConfig = appServiceConfig // handle incomplete deployment requests after restart - go impl.ProcessIncompleteDeploymentReq() + impl.asyncRunnable.Execute(func() { impl.ProcessIncompleteDeploymentReq() }) return impl, nil } diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index bc889b542c..b4f6f498aa 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -23,6 +23,15 @@ import ( "encoding/json" errors2 "errors" "fmt" + "github.com/devtron-labs/common-lib/async" + "io" + "log" + "net/http" + "strconv" + "strings" + "sync" + "time" + "github.com/caarlos0/env" "github.com/devtron-labs/common-lib/utils/k8s" "github.com/devtron-labs/devtron/internal/middleware" @@ -35,14 +44,7 @@ import ( "github.com/devtron-labs/devtron/pkg/cluster/repository" errors1 "github.com/juju/errors" "go.uber.org/zap" - "io" "k8s.io/apimachinery/pkg/api/errors" - "log" - "net/http" - "strconv" - "strings" - "sync" - "time" "gopkg.in/igm/sockjs-go.v3/sockjs" v1 "k8s.io/api/core/v1" @@ -457,12 +459,13 @@ type TerminalSessionHandlerImpl struct { ephemeralContainerService cluster.EphemeralContainerService argoApplicationConfigService config.ArgoApplicationConfigService ClusterReadService read.ClusterReadService + asyncRunnable *async.Runnable } func NewTerminalSessionHandlerImpl(environmentService environment.EnvironmentService, logger *zap.SugaredLogger, k8sUtil *k8s.K8sServiceImpl, ephemeralContainerService cluster.EphemeralContainerService, argoApplicationConfigService config.ArgoApplicationConfigService, - ClusterReadService read.ClusterReadService) *TerminalSessionHandlerImpl { + ClusterReadService read.ClusterReadService, asyncRunnable *async.Runnable) *TerminalSessionHandlerImpl { return &TerminalSessionHandlerImpl{ environmentService: environmentService, logger: logger, @@ -470,6 +473,7 @@ func NewTerminalSessionHandlerImpl(environmentService environment.EnvironmentSer ephemeralContainerService: ephemeralContainerService, argoApplicationConfigService: argoApplicationConfigService, ClusterReadService: ClusterReadService, + asyncRunnable: asyncRunnable, } } @@ -515,18 +519,18 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR }) config, client, err := impl.getClientSetAndRestConfigForTerminalConn(req) - go func() { + impl.asyncRunnable.Execute(func() { err := impl.saveEphemeralContainerTerminalAccessAudit(req) if err != nil { impl.logger.Errorw("error in saving ephemeral container terminal access audit,so skipping auditing", "err", err) } - }() + }) if err != nil { impl.logger.Errorw("error in fetching config", "err", err) return http.StatusInternalServerError, nil, err } - go WaitForTerminal(client, config, req) + impl.asyncRunnable.Execute(func() { WaitForTerminal(client, config, req) }) return http.StatusOK, &TerminalMessage{SessionID: sessionID}, nil } diff --git a/pkg/variables/ScopedVariableService.go b/pkg/variables/ScopedVariableService.go index bf9e628e84..dd15c43fe8 100644 --- a/pkg/variables/ScopedVariableService.go +++ b/pkg/variables/ScopedVariableService.go @@ -18,9 +18,15 @@ package variables import ( "fmt" + "regexp" + "strings" + "sync" + "github.com/argoproj/argo-workflows/v3/errors" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/devtron/internal/sql/repository/app" + "github.com/devtron-labs/devtron/pkg/asyncProvider" repository3 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository" "github.com/devtron-labs/devtron/pkg/cluster/repository" "github.com/devtron-labs/devtron/pkg/devtronResource/read" @@ -34,9 +40,6 @@ import ( "github.com/go-pg/pg" "go.uber.org/zap" "golang.org/x/exp/slices" - "regexp" - "strings" - "sync" ) type ScopedVariableService interface { @@ -55,6 +58,7 @@ type ScopedVariableServiceImpl struct { qualifierMappingService resourceQualifiers.QualifierMappingService VariableNameConfig *VariableConfig VariableCache *cache.VariableCacheObj + asyncRunnable *async.Runnable } func NewScopedVariableServiceImpl(logger *zap.SugaredLogger, scopedVariableRepository repository2.ScopedVariableRepository, appRepository app.AppRepository, environmentRepository repository3.EnvironmentRepository, devtronResourceSearchableKeyService read.DevtronResourceSearchableKeyService, clusterRepository repository.ClusterRepository, @@ -64,6 +68,7 @@ func NewScopedVariableServiceImpl(logger *zap.SugaredLogger, scopedVariableRepos scopedVariableRepository: scopedVariableRepository, qualifierMappingService: qualifierMappingService, VariableCache: &cache.VariableCacheObj{CacheLock: &sync.Mutex{}}, + asyncRunnable: asyncProvider.NewAsyncRunnable(logger), } cfg, err := GetVariableNameConfig() if err != nil { @@ -83,7 +88,7 @@ type VariableConfig struct { func loadVariableCache(cfg *VariableConfig, service *ScopedVariableServiceImpl) { if cfg.VariableCacheEnabled { - go service.loadVarCache() + service.asyncRunnable.Execute(func() { service.loadVarCache() }) } } func GetVariableNameConfig() (*VariableConfig, error) { diff --git a/pkg/workflow/status/WorkflowStatusService.go b/pkg/workflow/status/WorkflowStatusService.go index 78d962f884..286c405766 100644 --- a/pkg/workflow/status/WorkflowStatusService.go +++ b/pkg/workflow/status/WorkflowStatusService.go @@ -19,6 +19,9 @@ package status import ( "context" "fmt" + "github.com/devtron-labs/common-lib/async" + "time" + bean2 "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/api/helm-app/service/bean" "github.com/devtron-labs/devtron/client/argocdServer" @@ -46,7 +49,6 @@ import ( "github.com/go-pg/pg" "go.uber.org/zap" "k8s.io/utils/strings/slices" - "time" ) type WorkflowStatusService interface { @@ -88,6 +90,7 @@ type WorkflowStatusServiceImpl struct { appListingService app.AppListingService deploymentConfigService common2.DeploymentConfigService cdWorkflowRunnerService cd.CdWorkflowRunnerService + asyncRunnable *async.Runnable } func NewWorkflowStatusServiceImpl(logger *zap.SugaredLogger, @@ -110,6 +113,7 @@ func NewWorkflowStatusServiceImpl(logger *zap.SugaredLogger, appListingService app.AppListingService, deploymentConfigService common2.DeploymentConfigService, cdWorkflowRunnerService cd.CdWorkflowRunnerService, + asyncRunnable *async.Runnable, ) (*WorkflowStatusServiceImpl, error) { impl := &WorkflowStatusServiceImpl{ logger: logger, @@ -134,6 +138,7 @@ func NewWorkflowStatusServiceImpl(logger *zap.SugaredLogger, appListingService: appListingService, deploymentConfigService: deploymentConfigService, cdWorkflowRunnerService: cdWorkflowRunnerService, + asyncRunnable: asyncRunnable, } config, err := types.GetCdConfig() if err != nil { @@ -196,7 +201,9 @@ func (impl *WorkflowStatusServiceImpl) CheckHelmAppStatusPeriodicallyAndUpdateIn impl.logger.Errorw("error in getting latest pipeline override by cdWorkflowId", "err", err, "cdWorkflowId", wfr.CdWorkflowId) return err } - go impl.appService.WriteCDSuccessEvent(pipelineOverride.Pipeline.AppId, pipelineOverride.Pipeline.EnvironmentId, pipelineOverride) + impl.asyncRunnable.Execute(func() { + impl.appService.WriteCDSuccessEvent(pipelineOverride.Pipeline.AppId, pipelineOverride.Pipeline.EnvironmentId, pipelineOverride) + }) err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(bean3.TriggerContext{}, pipelineOverride) if err != nil { impl.logger.Errorw("error on handling deployment success event", "wfr", wfr, "err", err) diff --git a/wire_gen.go b/wire_gen.go index d67e9179eb..bc1b686e07 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -365,7 +365,8 @@ func InitializeApp() (*App, error) { k8sInformerFactoryImpl := informer.NewK8sInformerFactoryImpl(sugaredLogger, syncMap, k8sServiceImpl) cronLoggerImpl := cron.NewCronLoggerImpl(sugaredLogger) clusterReadServiceImpl := read2.NewClusterReadServiceImpl(sugaredLogger, clusterRepositoryImpl) - clusterServiceImpl, err := cluster.NewClusterServiceImpl(clusterRepositoryImpl, sugaredLogger, k8sServiceImpl, k8sInformerFactoryImpl, userAuthRepositoryImpl, userRepositoryImpl, roleGroupRepositoryImpl, environmentVariables, cronLoggerImpl, clusterReadServiceImpl) + runnable := asyncProvider.NewAsyncRunnable(sugaredLogger) + clusterServiceImpl, err := cluster.NewClusterServiceImpl(clusterRepositoryImpl, sugaredLogger, k8sServiceImpl, k8sInformerFactoryImpl, userAuthRepositoryImpl, userRepositoryImpl, roleGroupRepositoryImpl, environmentVariables, cronLoggerImpl, clusterReadServiceImpl, runnable) if err != nil { return nil, err } @@ -378,14 +379,13 @@ func InitializeApp() (*App, error) { return nil, err } argoApplicationConfigServiceImpl := config2.NewArgoApplicationConfigServiceImpl(sugaredLogger, k8sServiceImpl, clusterRepositoryImpl) - k8sCommonServiceImpl := k8s2.NewK8sCommonServiceImpl(sugaredLogger, k8sServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl) + k8sCommonServiceImpl := k8s2.NewK8sCommonServiceImpl(sugaredLogger, k8sServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl, runnable) versionServiceImpl := version.NewVersionServiceImpl(sugaredLogger) acdAuthConfig, err := util3.GetACDAuthConfig() if err != nil { return nil, err } argoCDConfigGetterImpl := config3.NewArgoCDConfigGetter(beanConfig, environmentVariables, acdAuthConfig, clusterReadServiceImpl, sugaredLogger, k8sServiceImpl) - runnable := asyncProvider.NewAsyncRunnable(sugaredLogger) argoCDConnectionManagerImpl, err := connection.NewArgoCDConnectionManagerImpl(sugaredLogger, settingsManager, moduleRepositoryImpl, environmentVariables, k8sServiceImpl, k8sCommonServiceImpl, versionServiceImpl, gitOpsConfigReadServiceImpl, k8sRuntimeConfig, argoCDConfigGetterImpl, runnable) if err != nil { return nil, err @@ -619,7 +619,7 @@ func InitializeApp() (*App, error) { workflowStageRepositoryImpl := repository18.NewWorkflowStageRepositoryImpl(sugaredLogger, db) workFlowStageStatusServiceImpl := workflowStatus.NewWorkflowStageFlowStatusServiceImpl(sugaredLogger, workflowStageRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowRepositoryImpl, transactionUtilImpl) cdWorkflowRunnerServiceImpl := cd.NewCdWorkflowRunnerServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl, workFlowStageStatusServiceImpl, transactionUtilImpl) - appServiceImpl := app2.NewAppService(pipelineOverrideRepositoryImpl, utilMergeUtil, sugaredLogger, pipelineRepositoryImpl, eventRESTClientImpl, eventSimpleFactoryImpl, appRepositoryImpl, configMapRepositoryImpl, chartRepositoryImpl, cdWorkflowRepositoryImpl, commonServiceImpl, chartTemplateServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineStatusTimelineResourcesServiceImpl, pipelineStatusSyncDetailServiceImpl, pipelineStatusTimelineServiceImpl, appServiceConfig, appStatusServiceImpl, installedAppReadServiceImpl, installedAppVersionHistoryRepositoryImpl, scopedVariableCMCSManagerImpl, acdConfig, gitOpsConfigReadServiceImpl, gitOperationServiceImpl, deploymentTemplateServiceImpl, appListingServiceImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl, cdWorkflowRunnerServiceImpl) + appServiceImpl := app2.NewAppService(pipelineOverrideRepositoryImpl, utilMergeUtil, sugaredLogger, pipelineRepositoryImpl, eventRESTClientImpl, eventSimpleFactoryImpl, appRepositoryImpl, configMapRepositoryImpl, chartRepositoryImpl, cdWorkflowRepositoryImpl, commonServiceImpl, chartTemplateServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineStatusTimelineResourcesServiceImpl, pipelineStatusSyncDetailServiceImpl, pipelineStatusTimelineServiceImpl, appServiceConfig, appStatusServiceImpl, installedAppReadServiceImpl, installedAppVersionHistoryRepositoryImpl, scopedVariableCMCSManagerImpl, acdConfig, gitOpsConfigReadServiceImpl, gitOperationServiceImpl, deploymentTemplateServiceImpl, appListingServiceImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl, cdWorkflowRunnerServiceImpl, runnable) scopedVariableManagerImpl, err := variables.NewScopedVariableManagerImpl(sugaredLogger, scopedVariableServiceImpl, variableEntityMappingServiceImpl, variableSnapshotHistoryServiceImpl, variableTemplateParserImpl) if err != nil { return nil, err @@ -825,7 +825,7 @@ func InitializeApp() (*App, error) { k8sResourceHistoryServiceImpl := kubernetesResourceAuditLogs.Newk8sResourceHistoryServiceImpl(k8sResourceHistoryRepositoryImpl, sugaredLogger, appRepositoryImpl, environmentRepositoryImpl) ephemeralContainersRepositoryImpl := repository5.NewEphemeralContainersRepositoryImpl(db, transactionUtilImpl) ephemeralContainerServiceImpl := cluster.NewEphemeralContainerServiceImpl(ephemeralContainersRepositoryImpl, sugaredLogger) - terminalSessionHandlerImpl := terminal.NewTerminalSessionHandlerImpl(environmentServiceImpl, sugaredLogger, k8sServiceImpl, ephemeralContainerServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl) + terminalSessionHandlerImpl := terminal.NewTerminalSessionHandlerImpl(environmentServiceImpl, sugaredLogger, k8sServiceImpl, ephemeralContainerServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl, runnable) fluxApplicationServiceImpl := fluxApplication.NewFluxApplicationServiceImpl(sugaredLogger, helmAppReadServiceImpl, clusterServiceImplExtended, helmAppClientImpl, pumpImpl) k8sApplicationServiceImpl, err := application2.NewK8sApplicationServiceImpl(sugaredLogger, clusterServiceImplExtended, pumpImpl, helmAppServiceImpl, k8sServiceImpl, acdAuthConfig, k8sResourceHistoryServiceImpl, k8sCommonServiceImpl, terminalSessionHandlerImpl, ephemeralContainerServiceImpl, ephemeralContainersRepositoryImpl, fluxApplicationServiceImpl, clusterReadServiceImpl) if err != nil { @@ -833,7 +833,7 @@ func InitializeApp() (*App, error) { } argoApplicationServiceImpl := argoApplication.NewArgoApplicationServiceImpl(sugaredLogger, clusterRepositoryImpl, k8sServiceImpl, helmAppClientImpl, helmAppServiceImpl, k8sApplicationServiceImpl, argoApplicationConfigServiceImpl, deploymentConfigServiceImpl) argoApplicationReadServiceImpl := read22.NewArgoApplicationReadServiceImpl(sugaredLogger, clusterRepositoryImpl, k8sServiceImpl, helmAppClientImpl, helmAppServiceImpl) - argoApplicationServiceExtendedImpl := argoApplication.NewArgoApplicationServiceExtendedServiceImpl(acdAuthConfig, argoApplicationServiceImpl, argoClientWrapperServiceImpl, argoApplicationReadServiceImpl, clusterServiceImplExtended) + argoApplicationServiceExtendedImpl := argoApplication.NewArgoApplicationServiceExtendedServiceImpl(acdAuthConfig, argoApplicationServiceImpl, argoClientWrapperServiceImpl, argoApplicationReadServiceImpl, clusterServiceImplExtended, runnable) installedAppResourceServiceImpl := resource.NewInstalledAppResourceServiceImpl(sugaredLogger, installedAppRepositoryImpl, appStoreApplicationVersionRepositoryImpl, argoClientWrapperServiceImpl, acdAuthConfig, installedAppVersionHistoryRepositoryImpl, helmAppServiceImpl, helmAppReadServiceImpl, appStatusServiceImpl, k8sCommonServiceImpl, k8sApplicationServiceImpl, k8sServiceImpl, deploymentConfigServiceImpl, ociRegistryConfigRepositoryImpl, argoApplicationServiceExtendedImpl) chartGroupEntriesRepositoryImpl := repository28.NewChartGroupEntriesRepositoryImpl(db, sugaredLogger) chartGroupReposotoryImpl := repository28.NewChartGroupReposotoryImpl(db, sugaredLogger) @@ -855,7 +855,7 @@ func InitializeApp() (*App, error) { return nil, err } cdPipelineEventPublishServiceImpl := out.NewCDPipelineEventPublishServiceImpl(sugaredLogger, pubSubClientServiceImpl) - workflowStatusServiceImpl, err := status2.NewWorkflowStatusServiceImpl(sugaredLogger, workflowDagExecutorImpl, pipelineStatusTimelineServiceImpl, appServiceImpl, appStatusServiceImpl, acdConfig, appServiceConfig, pipelineStatusSyncDetailServiceImpl, argoClientWrapperServiceImpl, cdPipelineEventPublishServiceImpl, cdWorkflowRepositoryImpl, pipelineOverrideRepositoryImpl, installedAppVersionHistoryRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, installedAppRepositoryImpl, installedAppReadServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineRepositoryImpl, appListingServiceImpl, deploymentConfigServiceImpl, cdWorkflowRunnerServiceImpl) + workflowStatusServiceImpl, err := status2.NewWorkflowStatusServiceImpl(sugaredLogger, workflowDagExecutorImpl, pipelineStatusTimelineServiceImpl, appServiceImpl, appStatusServiceImpl, acdConfig, appServiceConfig, pipelineStatusSyncDetailServiceImpl, argoClientWrapperServiceImpl, cdPipelineEventPublishServiceImpl, cdWorkflowRepositoryImpl, pipelineOverrideRepositoryImpl, installedAppVersionHistoryRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, installedAppRepositoryImpl, installedAppReadServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineRepositoryImpl, appListingServiceImpl, deploymentConfigServiceImpl, cdWorkflowRunnerServiceImpl, runnable) if err != nil { return nil, err } @@ -895,7 +895,7 @@ func InitializeApp() (*App, error) { deploymentTemplateActionImpl := batch.NewDeploymentTemplateActionImpl(sugaredLogger, appRepositoryImpl, chartServiceImpl) deploymentActionImpl := batch.NewDeploymentActionImpl(pipelineBuilderImpl, sugaredLogger, appRepositoryImpl, environmentServiceImpl, appWorkflowRepositoryImpl, ciPipelineRepositoryImpl, pipelineRepositoryImpl, dataHolderActionImpl, deploymentTemplateActionImpl) workflowActionImpl := batch.NewWorkflowActionImpl(sugaredLogger, appRepositoryImpl, appWorkflowServiceImpl, buildActionImpl, deploymentActionImpl) - batchOperationRestHandlerImpl := restHandler.NewBatchOperationRestHandlerImpl(userServiceImpl, enforcerImpl, workflowActionImpl, teamServiceImpl, sugaredLogger, enforcerUtilImpl) + batchOperationRestHandlerImpl := restHandler.NewBatchOperationRestHandlerImpl(userServiceImpl, enforcerImpl, workflowActionImpl, teamServiceImpl, sugaredLogger, enforcerUtilImpl, runnable) batchOperationRouterImpl := router.NewBatchOperationRouterImpl(batchOperationRestHandlerImpl, sugaredLogger) chartGroupRestHandlerImpl := chartGroup2.NewChartGroupRestHandlerImpl(chartGroupServiceImpl, sugaredLogger, userServiceImpl, enforcerImpl, validate) chartGroupRouterImpl := chartGroup2.NewChartGroupRouterImpl(chartGroupRestHandlerImpl) @@ -958,7 +958,7 @@ func InitializeApp() (*App, error) { webhookListenerRouterImpl := router.NewWebhookListenerRouterImpl(webhookEventHandlerImpl) appFilteringRestHandlerImpl := appList.NewAppFilteringRestHandlerImpl(sugaredLogger, teamServiceImpl, enforcerImpl, userServiceImpl, clusterServiceImplExtended, environmentServiceImpl, teamReadServiceImpl) appFilteringRouterImpl := appList2.NewAppFilteringRouterImpl(appFilteringRestHandlerImpl) - resourceTreeServiceImpl := resourceTree.NewServiceImpl(sugaredLogger, appListingServiceImpl, appStatusServiceImpl, argoApplicationServiceExtendedImpl, cdApplicationStatusUpdateHandlerImpl, helmAppReadServiceImpl, helmAppServiceImpl, k8sApplicationServiceImpl, k8sCommonServiceImpl, environmentReadServiceImpl) + resourceTreeServiceImpl := resourceTree.NewServiceImpl(sugaredLogger, appListingServiceImpl, appStatusServiceImpl, argoApplicationServiceExtendedImpl, cdApplicationStatusUpdateHandlerImpl, helmAppReadServiceImpl, helmAppServiceImpl, k8sApplicationServiceImpl, k8sCommonServiceImpl, environmentReadServiceImpl, runnable) appListingRestHandlerImpl := appList.NewAppListingRestHandlerImpl(appListingServiceImpl, enforcerImpl, pipelineBuilderImpl, sugaredLogger, enforcerUtilImpl, deploymentGroupServiceImpl, userServiceImpl, k8sCommonServiceImpl, installedAppDBExtendedServiceImpl, installedAppResourceServiceImpl, pipelineRepositoryImpl, k8sApplicationServiceImpl, deploymentConfigServiceImpl, resourceTreeServiceImpl) appListingRouterImpl := appList2.NewAppListingRouterImpl(appListingRestHandlerImpl) appInfoRestHandlerImpl := appInfo.NewAppInfoRestHandlerImpl(sugaredLogger, appCrudOperationServiceImpl, userServiceImpl, validate, enforcerUtilImpl, enforcerImpl, helmAppServiceImpl, enforcerUtilHelmImpl, genericNoteServiceImpl, commonEnforcementUtilImpl) From b892c05718c7ca25fe22c570d2ce268035756ca8 Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Fri, 23 May 2025 16:58:14 +0530 Subject: [PATCH 4/6] Refactor async runnable initialization for dependency injection Replaced direct initialization of async runnable with dependency injection across multiple services. This change improves testability and aligns with DI principles, ensuring better modularity and maintainability. --- cmd/external-app/wire_gen.go | 4 ++-- pkg/deployment/trigger/devtronApps/HandlerService.go | 5 ++--- pkg/eventProcessor/in/WorkflowEventProcessorService.go | 6 +++--- pkg/variables/ScopedVariableService.go | 5 ++--- wire_gen.go | 8 ++++---- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/cmd/external-app/wire_gen.go b/cmd/external-app/wire_gen.go index 43cd02248a..acc5496427 100644 --- a/cmd/external-app/wire_gen.go +++ b/cmd/external-app/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run github.com/google/wire/cmd/wire +//go:generate go run -mod=mod github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject @@ -451,7 +451,7 @@ func InitializeApp() (*App, error) { apiTokenRestHandlerImpl := apiToken2.NewApiTokenRestHandlerImpl(sugaredLogger, apiTokenServiceImpl, userServiceImpl, enforcerImpl, validate) apiTokenRouterImpl := apiToken2.NewApiTokenRouterImpl(apiTokenRestHandlerImpl) k8sCapacityServiceImpl := capacity.NewK8sCapacityServiceImpl(sugaredLogger, k8sApplicationServiceImpl, k8sServiceImpl, k8sCommonServiceImpl) - k8sCapacityRestHandlerImpl := capacity2.NewK8sCapacityRestHandlerImpl(sugaredLogger, k8sCapacityServiceImpl, userServiceImpl, enforcerImpl, clusterServiceImpl, environmentServiceImpl, clusterRbacServiceImpl, clusterReadServiceImpl) + k8sCapacityRestHandlerImpl := capacity2.NewK8sCapacityRestHandlerImpl(sugaredLogger, k8sCapacityServiceImpl, userServiceImpl, enforcerImpl, clusterServiceImpl, environmentServiceImpl, clusterRbacServiceImpl, clusterReadServiceImpl, validate) k8sCapacityRouterImpl := capacity2.NewK8sCapacityRouterImpl(k8sCapacityRestHandlerImpl) webhookHelmServiceImpl := webhookHelm.NewWebhookHelmServiceImpl(sugaredLogger, helmAppServiceImpl, clusterServiceImpl, chartRepositoryServiceImpl, attributesServiceImpl) webhookHelmRestHandlerImpl := webhookHelm2.NewWebhookHelmRestHandlerImpl(sugaredLogger, webhookHelmServiceImpl, userServiceImpl, enforcerImpl, validate) diff --git a/pkg/deployment/trigger/devtronApps/HandlerService.go b/pkg/deployment/trigger/devtronApps/HandlerService.go index 1247ed2f9f..d56d4c023e 100644 --- a/pkg/deployment/trigger/devtronApps/HandlerService.go +++ b/pkg/deployment/trigger/devtronApps/HandlerService.go @@ -40,7 +40,6 @@ import ( "github.com/devtron-labs/devtron/pkg/app" bean4 "github.com/devtron-labs/devtron/pkg/app/bean" "github.com/devtron-labs/devtron/pkg/app/status" - "github.com/devtron-labs/devtron/pkg/asyncProvider" "github.com/devtron-labs/devtron/pkg/attributes" "github.com/devtron-labs/devtron/pkg/auth/user" userBean "github.com/devtron-labs/devtron/pkg/auth/user/bean" @@ -231,7 +230,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, ciLogService pipeline.CiLogService, workflowService executor.WorkflowService, blobConfigStorageService pipeline.BlobStorageConfigService, -) (*HandlerServiceImpl, error) { + asyncRunnable *async.Runnable) (*HandlerServiceImpl, error) { impl := &HandlerServiceImpl{ logger: logger, cdWorkflowCommonService: cdWorkflowCommonService, @@ -297,7 +296,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, ciLogService: ciLogService, workflowService: workflowService, blobConfigStorageService: blobConfigStorageService, - asyncRunnable: asyncProvider.NewAsyncRunnable(logger), + asyncRunnable: asyncRunnable, } config, err := types.GetCdConfig() if err != nil { diff --git a/pkg/eventProcessor/in/WorkflowEventProcessorService.go b/pkg/eventProcessor/in/WorkflowEventProcessorService.go index 4211fe8b9a..aa47da205a 100644 --- a/pkg/eventProcessor/in/WorkflowEventProcessorService.go +++ b/pkg/eventProcessor/in/WorkflowEventProcessorService.go @@ -28,7 +28,6 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/devtron-labs/common-lib/async" - commonConstants "github.com/devtron-labs/common-lib/constants" pubsub "github.com/devtron-labs/common-lib/pubsub-lib" "github.com/devtron-labs/common-lib/pubsub-lib/model" "github.com/devtron-labs/common-lib/utils/registry" @@ -131,7 +130,8 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, ciArtifactRepository repository.CiArtifactRepository, cdWorkflowRepository pipelineConfig.CdWorkflowRepository, deploymentConfigService common.DeploymentConfigService, - ciHandlerService trigger.HandlerService) (*WorkflowEventProcessorImpl, error) { + ciHandlerService trigger.HandlerService, + asyncRunnable *async.Runnable) (*WorkflowEventProcessorImpl, error) { impl := &WorkflowEventProcessorImpl{ logger: logger, pubSubClient: pubSubClient, @@ -160,7 +160,7 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, cdWorkflowRepository: cdWorkflowRepository, deploymentConfigService: deploymentConfigService, ciHandlerService: ciHandlerService, - asyncRunnable: async.NewAsyncRunnable(logger, commonConstants.Orchestrator), + asyncRunnable: asyncRunnable, } appServiceConfig, err := app.GetAppServiceConfig() if err != nil { diff --git a/pkg/variables/ScopedVariableService.go b/pkg/variables/ScopedVariableService.go index dd15c43fe8..bfde4840b9 100644 --- a/pkg/variables/ScopedVariableService.go +++ b/pkg/variables/ScopedVariableService.go @@ -26,7 +26,6 @@ import ( "github.com/caarlos0/env" "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/devtron/internal/sql/repository/app" - "github.com/devtron-labs/devtron/pkg/asyncProvider" repository3 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository" "github.com/devtron-labs/devtron/pkg/cluster/repository" "github.com/devtron-labs/devtron/pkg/devtronResource/read" @@ -62,13 +61,13 @@ type ScopedVariableServiceImpl struct { } func NewScopedVariableServiceImpl(logger *zap.SugaredLogger, scopedVariableRepository repository2.ScopedVariableRepository, appRepository app.AppRepository, environmentRepository repository3.EnvironmentRepository, devtronResourceSearchableKeyService read.DevtronResourceSearchableKeyService, clusterRepository repository.ClusterRepository, - qualifierMappingService resourceQualifiers.QualifierMappingService) (*ScopedVariableServiceImpl, error) { + qualifierMappingService resourceQualifiers.QualifierMappingService, asyncRunnable *async.Runnable) (*ScopedVariableServiceImpl, error) { scopedVariableService := &ScopedVariableServiceImpl{ logger: logger, scopedVariableRepository: scopedVariableRepository, qualifierMappingService: qualifierMappingService, VariableCache: &cache.VariableCacheObj{CacheLock: &sync.Mutex{}}, - asyncRunnable: asyncProvider.NewAsyncRunnable(logger), + asyncRunnable: asyncRunnable, } cfg, err := GetVariableNameConfig() if err != nil { diff --git a/wire_gen.go b/wire_gen.go index 8a2d5fe520..286d488059 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run github.com/google/wire/cmd/wire +//go:generate go run -mod=mod github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject @@ -518,7 +518,7 @@ func InitializeApp() (*App, error) { if err != nil { return nil, err } - scopedVariableServiceImpl, err := variables.NewScopedVariableServiceImpl(sugaredLogger, scopedVariableRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, devtronResourceSearchableKeyServiceImpl, clusterRepositoryImpl, qualifierMappingServiceImpl) + scopedVariableServiceImpl, err := variables.NewScopedVariableServiceImpl(sugaredLogger, scopedVariableRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, devtronResourceSearchableKeyServiceImpl, clusterRepositoryImpl, qualifierMappingServiceImpl, runnable) if err != nil { return nil, err } @@ -771,7 +771,7 @@ func InitializeApp() (*App, error) { scanToolExecutionHistoryMappingRepositoryImpl := repository24.NewScanToolExecutionHistoryMappingRepositoryImpl(db, sugaredLogger) cdWorkflowReadServiceImpl := read20.NewCdWorkflowReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) imageScanServiceImpl := imageScanning.NewImageScanServiceImpl(sugaredLogger, imageScanHistoryRepositoryImpl, imageScanResultRepositoryImpl, imageScanObjectMetaRepositoryImpl, cveStoreRepositoryImpl, imageScanDeployInfoRepositoryImpl, userServiceImpl, appRepositoryImpl, environmentServiceImpl, ciArtifactRepositoryImpl, policyServiceImpl, pipelineRepositoryImpl, ciPipelineRepositoryImpl, scanToolMetadataRepositoryImpl, scanToolExecutionHistoryMappingRepositoryImpl, cvePolicyRepositoryImpl, cdWorkflowReadServiceImpl) - devtronAppsHandlerServiceImpl, err := devtronApps.NewHandlerServiceImpl(sugaredLogger, cdWorkflowCommonServiceImpl, gitOpsManifestPushServiceImpl, gitOpsConfigReadServiceImpl, argoK8sClientImpl, acdConfig, argoClientWrapperServiceImpl, pipelineStatusTimelineServiceImpl, chartTemplateServiceImpl, workflowEventPublishServiceImpl, manifestCreationServiceImpl, deployedConfigurationHistoryServiceImpl, pipelineStageServiceImpl, globalPluginServiceImpl, customTagServiceImpl, pluginInputVariableParserImpl, prePostCdScriptHistoryServiceImpl, scopedVariableCMCSManagerImpl, imageDigestPolicyServiceImpl, userServiceImpl, helmAppServiceImpl, enforcerUtilImpl, userDeploymentRequestServiceImpl, helmAppClientImpl, eventSimpleFactoryImpl, eventRESTClientImpl, environmentVariables, appRepositoryImpl, ciPipelineMaterialRepositoryImpl, imageScanHistoryReadServiceImpl, imageScanDeployInfoReadServiceImpl, imageScanDeployInfoServiceImpl, pipelineRepositoryImpl, pipelineOverrideRepositoryImpl, manifestPushConfigRepositoryImpl, chartRepositoryImpl, environmentRepositoryImpl, cdWorkflowRepositoryImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciTemplateReadServiceImpl, gitMaterialReadServiceImpl, appLabelRepositoryImpl, ciPipelineRepositoryImpl, appWorkflowRepositoryImpl, dockerArtifactStoreRepositoryImpl, imageScanServiceImpl, k8sServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, ciCdPipelineOrchestratorImpl, gitOperationServiceImpl, attributesServiceImpl, clusterRepositoryImpl, cdWorkflowRunnerServiceImpl, clusterServiceImplExtended, ciLogServiceImpl, workflowServiceImpl, blobStorageConfigServiceImpl) + devtronAppsHandlerServiceImpl, err := devtronApps.NewHandlerServiceImpl(sugaredLogger, cdWorkflowCommonServiceImpl, gitOpsManifestPushServiceImpl, gitOpsConfigReadServiceImpl, argoK8sClientImpl, acdConfig, argoClientWrapperServiceImpl, pipelineStatusTimelineServiceImpl, chartTemplateServiceImpl, workflowEventPublishServiceImpl, manifestCreationServiceImpl, deployedConfigurationHistoryServiceImpl, pipelineStageServiceImpl, globalPluginServiceImpl, customTagServiceImpl, pluginInputVariableParserImpl, prePostCdScriptHistoryServiceImpl, scopedVariableCMCSManagerImpl, imageDigestPolicyServiceImpl, userServiceImpl, helmAppServiceImpl, enforcerUtilImpl, userDeploymentRequestServiceImpl, helmAppClientImpl, eventSimpleFactoryImpl, eventRESTClientImpl, environmentVariables, appRepositoryImpl, ciPipelineMaterialRepositoryImpl, imageScanHistoryReadServiceImpl, imageScanDeployInfoReadServiceImpl, imageScanDeployInfoServiceImpl, pipelineRepositoryImpl, pipelineOverrideRepositoryImpl, manifestPushConfigRepositoryImpl, chartRepositoryImpl, environmentRepositoryImpl, cdWorkflowRepositoryImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciTemplateReadServiceImpl, gitMaterialReadServiceImpl, appLabelRepositoryImpl, ciPipelineRepositoryImpl, appWorkflowRepositoryImpl, dockerArtifactStoreRepositoryImpl, imageScanServiceImpl, k8sServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, ciCdPipelineOrchestratorImpl, gitOperationServiceImpl, attributesServiceImpl, clusterRepositoryImpl, cdWorkflowRunnerServiceImpl, clusterServiceImplExtended, ciLogServiceImpl, workflowServiceImpl, blobStorageConfigServiceImpl, runnable) if err != nil { return nil, err } @@ -1088,7 +1088,7 @@ func InitializeApp() (*App, error) { cdWorkflowServiceImpl := cd.NewCdWorkflowServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) cdWorkflowRunnerReadServiceImpl := read20.NewCdWorkflowRunnerReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) webhookServiceImpl := pipeline.NewWebhookServiceImpl(ciArtifactRepositoryImpl, sugaredLogger, ciPipelineRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowCommonServiceImpl, workFlowStageStatusServiceImpl, ciServiceImpl) - workflowEventProcessorImpl, err := in.NewWorkflowEventProcessorImpl(sugaredLogger, pubSubClientServiceImpl, cdWorkflowServiceImpl, cdWorkflowReadServiceImpl, cdWorkflowRunnerServiceImpl, cdWorkflowRunnerReadServiceImpl, workflowDagExecutorImpl, ciHandlerImpl, cdHandlerImpl, eventSimpleFactoryImpl, eventRESTClientImpl, devtronAppsHandlerServiceImpl, deployedAppServiceImpl, webhookServiceImpl, validate, environmentVariables, cdWorkflowCommonServiceImpl, cdPipelineConfigServiceImpl, userDeploymentRequestServiceImpl, serviceImpl, pipelineRepositoryImpl, ciArtifactRepositoryImpl, cdWorkflowRepositoryImpl, deploymentConfigServiceImpl, handlerServiceImpl) + workflowEventProcessorImpl, err := in.NewWorkflowEventProcessorImpl(sugaredLogger, pubSubClientServiceImpl, cdWorkflowServiceImpl, cdWorkflowReadServiceImpl, cdWorkflowRunnerServiceImpl, cdWorkflowRunnerReadServiceImpl, workflowDagExecutorImpl, ciHandlerImpl, cdHandlerImpl, eventSimpleFactoryImpl, eventRESTClientImpl, devtronAppsHandlerServiceImpl, deployedAppServiceImpl, webhookServiceImpl, validate, environmentVariables, cdWorkflowCommonServiceImpl, cdPipelineConfigServiceImpl, userDeploymentRequestServiceImpl, serviceImpl, pipelineRepositoryImpl, ciArtifactRepositoryImpl, cdWorkflowRepositoryImpl, deploymentConfigServiceImpl, handlerServiceImpl, runnable) if err != nil { return nil, err } From 7e9854eec5becc8053fbe3bbb400f378191a68db Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Fri, 23 May 2025 17:00:29 +0530 Subject: [PATCH 5/6] fix --- pkg/app/AppService.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/app/AppService.go b/pkg/app/AppService.go index b439422f9b..9bf09ab199 100644 --- a/pkg/app/AppService.go +++ b/pkg/app/AppService.go @@ -154,7 +154,7 @@ func NewAppService( cdWorkflowRepository pipelineConfig.CdWorkflowRepository, commonService commonService.CommonService, chartTemplateService internalUtil.ChartTemplateService, - pipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository, + cdPipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository, pipelineStatusTimelineResourcesService status2.PipelineStatusTimelineResourcesService, pipelineStatusSyncDetailService status2.PipelineStatusSyncDetailService, pipelineStatusTimelineService status2.PipelineStatusTimelineService, @@ -182,7 +182,7 @@ func NewAppService( cdWorkflowRepository: cdWorkflowRepository, commonService: commonService, chartTemplateService: chartTemplateService, - pipelineStatusTimelineRepository: pipelineStatusTimelineRepo, + pipelineStatusTimelineRepository: cdPipelineStatusTimelineRepo, pipelineStatusTimelineResourcesService: pipelineStatusTimelineResourcesService, pipelineStatusSyncDetailService: pipelineStatusSyncDetailService, pipelineStatusTimelineService: pipelineStatusTimelineService, From d67ed550f13009362ae2962e3c085ac2bfc630db Mon Sep 17 00:00:00 2001 From: prakhar katiyar Date: Fri, 23 May 2025 17:02:26 +0530 Subject: [PATCH 6/6] fix --- pkg/clusterTerminalAccess/UserTerminalAccessService.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/clusterTerminalAccess/UserTerminalAccessService.go b/pkg/clusterTerminalAccess/UserTerminalAccessService.go index 220655b82b..299cd30b6f 100644 --- a/pkg/clusterTerminalAccess/UserTerminalAccessService.go +++ b/pkg/clusterTerminalAccess/UserTerminalAccessService.go @@ -32,7 +32,6 @@ import ( "github.com/devtron-labs/devtron/api/helm-app/service/bean" "github.com/devtron-labs/devtron/internal/sql/models" "github.com/devtron-labs/devtron/internal/sql/repository" - "github.com/devtron-labs/devtron/pkg/asyncProvider" utils1 "github.com/devtron-labs/devtron/pkg/clusterTerminalAccess/clusterTerminalUtils" "github.com/devtron-labs/devtron/pkg/k8s" bean2 "github.com/devtron-labs/devtron/pkg/k8s/bean" @@ -102,7 +101,12 @@ func GetTerminalAccessConfig() (*models.UserTerminalSessionConfig, error) { return config, err } -func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessRepository repository.TerminalAccessRepository, config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, terminalSessionHandler terminal.TerminalSessionHandler, K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, cronLogger *cron3.CronLoggerImpl) (*UserTerminalAccessServiceImpl, error) { +func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, + terminalAccessRepository repository.TerminalAccessRepository, + config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, + terminalSessionHandler terminal.TerminalSessionHandler, + K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, + cronLogger *cron3.CronLoggerImpl, asyncRunnable *async.Runnable) (*UserTerminalAccessServiceImpl, error) { //fetches all running and starting entities from db and start SyncStatus podStatusSyncCron := cron.New(cron.WithChain(cron.Recover(cronLogger))) terminalAccessDataArrayMutex := &sync.RWMutex{} @@ -118,7 +122,7 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR terminalSessionHandler: terminalSessionHandler, K8sCapacityService: K8sCapacityService, k8sUtil: k8sUtil, - asyncRunnable: asyncProvider.NewAsyncRunnable(logger), + asyncRunnable: asyncRunnable, } podStatusSyncCron.Start() _, err := podStatusSyncCron.AddFunc(fmt.Sprintf("@every %ds", config.TerminalPodStatusSyncTimeInSecs), accessServiceImpl.SyncPodStatus)