diff --git a/api/restHandler/BatchOperationRestHandler.go b/api/restHandler/BatchOperationRestHandler.go index fd598e5725..f9df83eeed 100644 --- a/api/restHandler/BatchOperationRestHandler.go +++ b/api/restHandler/BatchOperationRestHandler.go @@ -22,6 +22,7 @@ import ( "errors" "net/http" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/devtron/api/restHandler/common" "github.com/devtron-labs/devtron/pkg/apis/devtron/v1" "github.com/devtron-labs/devtron/pkg/apis/devtron/v1/validation" @@ -44,10 +45,11 @@ type BatchOperationRestHandlerImpl struct { teamService team.TeamService logger *zap.SugaredLogger enforcerUtil rbac.EnforcerUtil + asyncRunnable *async.Runnable } func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer casbin.Enforcer, workflowAction batch.WorkflowAction, - teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil) *BatchOperationRestHandlerImpl { + teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil, asyncRunnable *async.Runnable) *BatchOperationRestHandlerImpl { return &BatchOperationRestHandlerImpl{ userAuthService: userAuthService, enforcer: enforcer, @@ -55,6 +57,7 @@ func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer teamService: teamService, logger: logger, enforcerUtil: enforcerUtil, + asyncRunnable: asyncRunnable, } } @@ -104,13 +107,14 @@ func (handler BatchOperationRestHandlerImpl) Operate(w http.ResponseWriter, r *h ctx, cancel := context.WithCancel(r.Context()) if cn, ok := w.(http.CloseNotifier); ok { - go func(done <-chan struct{}, closed <-chan bool) { + runnableFunc := func(done <-chan struct{}, closed <-chan bool) { select { case <-done: case <-closed: cancel() } - }(ctx.Done(), cn.CloseNotify()) + } + handler.asyncRunnable.Execute(func() { runnableFunc(ctx.Done(), cn.CloseNotify()) }) } err = handler.workflowAction.Execute(&workflow, emptyProps, r.Context()) if err != nil { diff --git a/client/argocdServer/connection/Connection.go b/client/argocdServer/connection/Connection.go index 7e690864d5..24f8141442 100644 --- a/client/argocdServer/connection/Connection.go +++ b/client/argocdServer/connection/Connection.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/argoproj/argo-cd/v2/pkg/apiclient/account" "github.com/argoproj/argo-cd/v2/util/settings" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils/k8s" "github.com/devtron-labs/devtron/client/argocdServer/bean" config2 "github.com/devtron-labs/devtron/client/argocdServer/config" @@ -81,6 +82,7 @@ type ArgoCDConnectionManagerImpl struct { gitOpsConfigReadService config.GitOpsConfigReadService runTimeConfig *k8s.RuntimeConfig argoCDConfigGetter config2.ArgoCDConfigGetter + asyncRunnable *async.Runnable } func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger, @@ -92,7 +94,8 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger, versionService version.VersionService, gitOpsConfigReadService config.GitOpsConfigReadService, runTimeConfig *k8s.RuntimeConfig, - argoCDConfigGetter config2.ArgoCDConfigGetter) (*ArgoCDConnectionManagerImpl, error) { + argoCDConfigGetter config2.ArgoCDConfigGetter, + asyncRunnable *async.Runnable) (*ArgoCDConnectionManagerImpl, error) { argoUserServiceImpl := &ArgoCDConnectionManagerImpl{ logger: Logger, settingsManager: settingsManager, @@ -105,13 +108,17 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger, gitOpsConfigReadService: gitOpsConfigReadService, runTimeConfig: runTimeConfig, argoCDConfigGetter: argoCDConfigGetter, + asyncRunnable: asyncRunnable, } if !runTimeConfig.LocalDevMode { grpcConfig, err := argoCDConfigGetter.GetGRPCConfig() if err != nil { Logger.Errorw("error in GetAllGRPCConfigs", "error", err) } - go argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig) + runnableFunc := func() { + argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig) + } + asyncRunnable.Execute(runnableFunc) } return argoUserServiceImpl, nil } diff --git a/cmd/external-app/wire_gen.go b/cmd/external-app/wire_gen.go index 43cd02248a..acc5496427 100644 --- a/cmd/external-app/wire_gen.go +++ b/cmd/external-app/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run github.com/google/wire/cmd/wire +//go:generate go run -mod=mod github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject @@ -451,7 +451,7 @@ func InitializeApp() (*App, error) { apiTokenRestHandlerImpl := apiToken2.NewApiTokenRestHandlerImpl(sugaredLogger, apiTokenServiceImpl, userServiceImpl, enforcerImpl, validate) apiTokenRouterImpl := apiToken2.NewApiTokenRouterImpl(apiTokenRestHandlerImpl) k8sCapacityServiceImpl := capacity.NewK8sCapacityServiceImpl(sugaredLogger, k8sApplicationServiceImpl, k8sServiceImpl, k8sCommonServiceImpl) - k8sCapacityRestHandlerImpl := capacity2.NewK8sCapacityRestHandlerImpl(sugaredLogger, k8sCapacityServiceImpl, userServiceImpl, enforcerImpl, clusterServiceImpl, environmentServiceImpl, clusterRbacServiceImpl, clusterReadServiceImpl) + k8sCapacityRestHandlerImpl := capacity2.NewK8sCapacityRestHandlerImpl(sugaredLogger, k8sCapacityServiceImpl, userServiceImpl, enforcerImpl, clusterServiceImpl, environmentServiceImpl, clusterRbacServiceImpl, clusterReadServiceImpl, validate) k8sCapacityRouterImpl := capacity2.NewK8sCapacityRouterImpl(k8sCapacityRestHandlerImpl) webhookHelmServiceImpl := webhookHelm.NewWebhookHelmServiceImpl(sugaredLogger, helmAppServiceImpl, clusterServiceImpl, chartRepositoryServiceImpl, attributesServiceImpl) webhookHelmRestHandlerImpl := webhookHelm2.NewWebhookHelmRestHandlerImpl(sugaredLogger, webhookHelmServiceImpl, userServiceImpl, enforcerImpl, validate) diff --git a/pkg/app/AppService.go b/pkg/app/AppService.go index c6df69e9a4..9bf09ab199 100644 --- a/pkg/app/AppService.go +++ b/pkg/app/AppService.go @@ -20,6 +20,10 @@ import ( "context" "errors" "fmt" + "net/url" + "strconv" + "time" + health2 "github.com/argoproj/gitops-engine/pkg/health" argoApplication "github.com/devtron-labs/devtron/client/argocdServer/bean" "github.com/devtron-labs/devtron/internal/sql/models" @@ -40,12 +44,10 @@ import ( "github.com/devtron-labs/devtron/pkg/deployment/manifest/deploymentTemplate/read" bean4 "github.com/devtron-labs/devtron/pkg/deployment/trigger/devtronApps/bean" "github.com/devtron-labs/devtron/pkg/workflow/cd" - "net/url" - "strconv" - "time" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/common-lib/utils/k8s/health" "github.com/devtron-labs/devtron/api/bean" @@ -124,6 +126,7 @@ type AppServiceImpl struct { deploymentConfigService common2.DeploymentConfigService envConfigOverrideReadService read.EnvConfigOverrideService cdWorkflowRunnerService cd.CdWorkflowRunnerService + asyncRunnable *async.Runnable } type AppService interface { @@ -164,7 +167,8 @@ func NewAppService( appListingService AppListingService, deploymentConfigService common2.DeploymentConfigService, envConfigOverrideReadService read.EnvConfigOverrideService, - cdWorkflowRunnerService cd.CdWorkflowRunnerService) *AppServiceImpl { + cdWorkflowRunnerService cd.CdWorkflowRunnerService, + asyncRunnable *async.Runnable) *AppServiceImpl { appServiceImpl := &AppServiceImpl{ mergeUtil: mergeUtil, pipelineOverrideRepository: pipelineOverrideRepository, @@ -195,6 +199,7 @@ func NewAppService( deploymentConfigService: deploymentConfigService, envConfigOverrideReadService: envConfigOverrideReadService, cdWorkflowRunnerService: cdWorkflowRunnerService, + asyncRunnable: asyncRunnable, } return appServiceImpl } @@ -320,7 +325,7 @@ func (impl *AppServiceImpl) UpdateDeploymentStatusForGitOpsPipelines(app *v1alph } if isSucceeded { impl.logger.Infow("writing cd success event", "gitHash", gitHash, "pipelineOverride", pipelineOverride) - go impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) + impl.asyncRunnable.Execute(func() { impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) }) } } else { impl.logger.Debugw("event received for older triggered revision", "gitHash", gitHash) diff --git a/pkg/argoApplication/ArgoApplicationServiceExtended.go b/pkg/argoApplication/ArgoApplicationServiceExtended.go index 0cfb81b2d3..0fce636655 100644 --- a/pkg/argoApplication/ArgoApplicationServiceExtended.go +++ b/pkg/argoApplication/ArgoApplicationServiceExtended.go @@ -22,6 +22,7 @@ import ( "fmt" application2 "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils/k8s" openapi "github.com/devtron-labs/devtron/api/helm-app/openapiClient" "github.com/devtron-labs/devtron/client/argocdServer" @@ -46,6 +47,7 @@ type ArgoApplicationServiceExtendedImpl struct { argoApplicationReadService read.ArgoApplicationReadService clusterService cluster.ClusterService acdClientWrapper argocdServer.ArgoClientWrapperService + asyncRunnable *async.Runnable } func NewArgoApplicationServiceExtendedServiceImpl( @@ -54,6 +56,7 @@ func NewArgoApplicationServiceExtendedServiceImpl( acdClientWrapper argocdServer.ArgoClientWrapperService, argoApplicationReadService read.ArgoApplicationReadService, clusterService cluster.ClusterService, + asyncRunnable *async.Runnable, ) *ArgoApplicationServiceExtendedImpl { return &ArgoApplicationServiceExtendedImpl{ aCDAuthConfig: aCDAuthConfig, @@ -61,6 +64,7 @@ func NewArgoApplicationServiceExtendedServiceImpl( argoApplicationReadService: argoApplicationReadService, acdClientWrapper: acdClientWrapper, ArgoApplicationServiceImpl: argoApplicationServiceImpl, + asyncRunnable: asyncRunnable, } } @@ -328,7 +332,7 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat for _, node := range queryNodes { rQuery := transform(node, query.ApplicationName) qCount++ - go func(request application2.ApplicationResourceRequest) { + runnableFunc := func(request application2.ApplicationResourceRequest) { ctx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() startTime := time.Now() @@ -343,7 +347,8 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat } else { response <- argoApplication.Result{Response: nil, Error: fmt.Errorf("connection closed by client"), Request: &request} } - }(*rQuery) + } + c.asyncRunnable.Execute(func() { runnableFunc(*rQuery) }) } if qCount == 0 { diff --git a/pkg/build/trigger/HandlerService.go b/pkg/build/trigger/HandlerService.go index 8df68c4375..faf1d54647 100644 --- a/pkg/build/trigger/HandlerService.go +++ b/pkg/build/trigger/HandlerService.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" blob_storage "github.com/devtron-labs/common-lib/blob-storage" "github.com/devtron-labs/common-lib/utils" bean4 "github.com/devtron-labs/common-lib/utils/bean" @@ -116,6 +117,7 @@ type HandlerServiceImpl struct { clusterService cluster.ClusterService envService environment.EnvironmentService K8sUtil *k8s.K8sServiceImpl + asyncRunnable *async.Runnable } func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.WorkflowService, @@ -141,6 +143,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W clusterService cluster.ClusterService, envService environment.EnvironmentService, K8sUtil *k8s.K8sServiceImpl, + asyncRunnable *async.Runnable, ) *HandlerServiceImpl { buildxCacheFlags := &BuildxCacheFlags{} err := env.Parse(buildxCacheFlags) @@ -174,6 +177,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W clusterService: clusterService, envService: envService, K8sUtil: K8sUtil, + asyncRunnable: asyncRunnable, } config, err := types.GetCiConfig() if err != nil { @@ -628,7 +632,12 @@ func (impl *HandlerServiceImpl) triggerCiPipeline(trigger types.Trigger) (int, e } middleware.CiTriggerCounter.WithLabelValues(pipeline.App.AppName, pipeline.Name).Inc() - go impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest) + + runnableFunc := func() { + impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest) + } + impl.asyncRunnable.Execute(runnableFunc) + return savedCiWf.Id, err } diff --git a/pkg/cluster/ClusterService.go b/pkg/cluster/ClusterService.go index 1b00c2f84f..615af7c1b8 100644 --- a/pkg/cluster/ClusterService.go +++ b/pkg/cluster/ClusterService.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/devtron-labs/common-lib/async" informerBean "github.com/devtron-labs/common-lib/informer" "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/devtron/pkg/cluster/adapter" @@ -97,6 +98,7 @@ type ClusterServiceImpl struct { userRepository repository3.UserRepository roleGroupRepository repository3.RoleGroupRepository clusterReadService read.ClusterReadService + asyncRunnable *async.Runnable } func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.SugaredLogger, @@ -105,7 +107,8 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap. roleGroupRepository repository3.RoleGroupRepository, envVariables *globalUtil.EnvironmentVariables, cronLogger *cronUtil.CronLoggerImpl, - clusterReadService read.ClusterReadService) (*ClusterServiceImpl, error) { + clusterReadService read.ClusterReadService, + asyncRunnable *async.Runnable) (*ClusterServiceImpl, error) { clusterService := &ClusterServiceImpl{ clusterRepository: repository, logger: logger, @@ -115,6 +118,7 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap. userRepository: userRepository, roleGroupRepository: roleGroupRepository, clusterReadService: clusterReadService, + asyncRunnable: asyncRunnable, } // initialise cron newCron := cron.New(cron.WithChain(cron.Recover(cronLogger))) @@ -782,7 +786,7 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB continue } wg.Add(1) - go func(idx int, cluster *bean.ClusterBean) { + runnableFunc := func(idx int, cluster *bean.ClusterBean) { defer wg.Done() clusterConfig := cluster.GetClusterConfig() _, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig) @@ -796,7 +800,8 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB id = idx } impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap) - }(idx, cluster) + } + impl.asyncRunnable.Execute(func() { runnableFunc(idx, cluster) }) } wg.Wait() diff --git a/pkg/clusterTerminalAccess/UserTerminalAccessService.go b/pkg/clusterTerminalAccess/UserTerminalAccessService.go index b6ab4a6b09..299cd30b6f 100644 --- a/pkg/clusterTerminalAccess/UserTerminalAccessService.go +++ b/pkg/clusterTerminalAccess/UserTerminalAccessService.go @@ -21,7 +21,13 @@ import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" + "sync" + "time" + "github.com/caarlos0/env/v6" + "github.com/devtron-labs/common-lib/async" k8s2 "github.com/devtron-labs/common-lib/utils/k8s" "github.com/devtron-labs/devtron/api/helm-app/service/bean" "github.com/devtron-labs/devtron/internal/sql/models" @@ -43,10 +49,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "strconv" - "strings" - "sync" - "time" ) type UserTerminalAccessService interface { @@ -75,6 +77,7 @@ type UserTerminalAccessServiceImpl struct { terminalSessionHandler terminal.TerminalSessionHandler K8sCapacityService capacity.K8sCapacityService k8sUtil *k8s2.K8sServiceImpl + asyncRunnable *async.Runnable } type UserTerminalAccessSessionData struct { @@ -98,7 +101,12 @@ func GetTerminalAccessConfig() (*models.UserTerminalSessionConfig, error) { return config, err } -func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessRepository repository.TerminalAccessRepository, config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, terminalSessionHandler terminal.TerminalSessionHandler, K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, cronLogger *cron3.CronLoggerImpl) (*UserTerminalAccessServiceImpl, error) { +func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, + terminalAccessRepository repository.TerminalAccessRepository, + config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, + terminalSessionHandler terminal.TerminalSessionHandler, + K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, + cronLogger *cron3.CronLoggerImpl, asyncRunnable *async.Runnable) (*UserTerminalAccessServiceImpl, error) { //fetches all running and starting entities from db and start SyncStatus podStatusSyncCron := cron.New(cron.WithChain(cron.Recover(cronLogger))) terminalAccessDataArrayMutex := &sync.RWMutex{} @@ -114,6 +122,7 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR terminalSessionHandler: terminalSessionHandler, K8sCapacityService: K8sCapacityService, k8sUtil: k8sUtil, + asyncRunnable: asyncRunnable, } podStatusSyncCron.Start() _, err := podStatusSyncCron.AddFunc(fmt.Sprintf("@every %ds", config.TerminalPodStatusSyncTimeInSecs), accessServiceImpl.SyncPodStatus) @@ -121,7 +130,7 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR logger.Errorw("error occurred while starting cron job", "time in secs", config.TerminalPodStatusSyncTimeInSecs) return nil, err } - go accessServiceImpl.SyncRunningInstances() + accessServiceImpl.asyncRunnable.Execute(func() { accessServiceImpl.SyncRunningInstances() }) return accessServiceImpl, err } func (impl *UserTerminalAccessServiceImpl) ValidateShell(podName, namespace, shellName, containerName string, clusterId int) (bool, string, error) { diff --git a/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go b/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go index b6fd6b5f69..66a6756574 100644 --- a/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go +++ b/pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go @@ -19,6 +19,10 @@ package resourceTree import ( "context" "fmt" + "github.com/devtron-labs/common-lib/async" + "strconv" + "time" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" "github.com/argoproj/gitops-engine/pkg/health" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" @@ -42,8 +46,6 @@ import ( application2 "github.com/devtron-labs/devtron/pkg/k8s/application" util2 "github.com/devtron-labs/devtron/util" "go.uber.org/zap" - "strconv" - "time" ) type Service interface { @@ -66,6 +68,7 @@ type ServiceImpl struct { k8sApplicationService application2.K8sApplicationService k8sCommonService k8s.K8sCommonService environmentReadService read2.EnvironmentReadService + asyncRunnable *async.Runnable } func NewServiceImpl(logger *zap.SugaredLogger, @@ -78,6 +81,7 @@ func NewServiceImpl(logger *zap.SugaredLogger, k8sApplicationService application2.K8sApplicationService, k8sCommonService k8s.K8sCommonService, environmentReadService read2.EnvironmentReadService, + asyncRunnable *async.Runnable, ) *ServiceImpl { serviceImpl := &ServiceImpl{ logger: logger, @@ -90,6 +94,7 @@ func NewServiceImpl(logger *zap.SugaredLogger, k8sApplicationService: k8sApplicationService, k8sCommonService: k8sCommonService, environmentReadService: environmentReadService, + asyncRunnable: asyncRunnable, } return serviceImpl } @@ -164,7 +169,7 @@ func (impl *ServiceImpl) FetchResourceTree(ctx context.Context, appId int, envId } } resourceTree = util2.InterfaceToMapAdapter(resp) - go func() { + impl.asyncRunnable.Execute(func() { if resp.Status == string(health.HealthStatusHealthy) { err = impl.cdApplicationStatusUpdateHandler.SyncPipelineStatusForResourceTreeCall(cdPipeline) if err != nil { @@ -176,7 +181,7 @@ func (impl *ServiceImpl) FetchResourceTree(ctx context.Context, appId int, envId if err != nil { impl.logger.Warnw("error in updating app status", "err", err, "appId", cdPipeline.AppId, "envId", cdPipeline.EnvironmentId) } - }() + }) k8sAppDetail := AppView.AppDetailContainer{ DeploymentDetailContainer: AppView.DeploymentDetailContainer{ ClusterId: cdPipeline.Environment.ClusterId, diff --git a/pkg/deployment/trigger/devtronApps/HandlerService.go b/pkg/deployment/trigger/devtronApps/HandlerService.go index 65d067ba07..6194a0d093 100644 --- a/pkg/deployment/trigger/devtronApps/HandlerService.go +++ b/pkg/deployment/trigger/devtronApps/HandlerService.go @@ -19,6 +19,10 @@ package devtronApps import ( "bufio" "context" + "os" + "time" + + "github.com/devtron-labs/common-lib/async" pubsub "github.com/devtron-labs/common-lib/pubsub-lib" util5 "github.com/devtron-labs/common-lib/utils/k8s" bean3 "github.com/devtron-labs/devtron/api/bean" @@ -70,8 +74,6 @@ import ( util2 "github.com/devtron-labs/devtron/util/event" "github.com/devtron-labs/devtron/util/rbac" "go.uber.org/zap" - "os" - "time" ) /* @@ -166,6 +168,7 @@ type HandlerServiceImpl struct { ciLogService pipeline.CiLogService workflowService executor.WorkflowService blobConfigStorageService pipeline.BlobStorageConfigService + asyncRunnable *async.Runnable } func NewHandlerServiceImpl(logger *zap.SugaredLogger, @@ -227,7 +230,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, ciLogService pipeline.CiLogService, workflowService executor.WorkflowService, blobConfigStorageService pipeline.BlobStorageConfigService, -) (*HandlerServiceImpl, error) { + asyncRunnable *async.Runnable) (*HandlerServiceImpl, error) { impl := &HandlerServiceImpl{ logger: logger, cdWorkflowCommonService: cdWorkflowCommonService, @@ -293,6 +296,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, ciLogService: ciLogService, workflowService: workflowService, blobConfigStorageService: blobConfigStorageService, + asyncRunnable: asyncRunnable, } config, err := types.GetCdConfig() if err != nil { diff --git a/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go b/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go index 70f7620216..8a1ee736a7 100644 --- a/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go +++ b/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go @@ -20,6 +20,13 @@ import ( "context" "errors" "fmt" + "net/http" + "path" + "regexp" + "strconv" + "strings" + "time" + bean3 "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/api/bean/gitOps" bean6 "github.com/devtron-labs/devtron/api/helm-app/bean" @@ -57,12 +64,6 @@ import ( "google.golang.org/grpc/codes" status2 "google.golang.org/grpc/status" "helm.sh/helm/v3/pkg/chart" - "net/http" - "path" - "regexp" - "strconv" - "strings" - "time" ) func (impl *HandlerServiceImpl) TriggerStageForBulk(triggerRequest bean.TriggerRequest) error { @@ -741,7 +742,9 @@ func (impl *HandlerServiceImpl) triggerPipeline(overrideRequest *bean3.ValuesOve } } - go impl.writeCDTriggerEvent(overrideRequest, valuesOverrideResponse.Artifact, valuesOverrideResponse.PipelineOverride.PipelineReleaseCounter, valuesOverrideResponse.PipelineOverride.Id, overrideRequest.WfrId) + impl.asyncRunnable.Execute(func() { + impl.writeCDTriggerEvent(overrideRequest, valuesOverrideResponse.Artifact, valuesOverrideResponse.PipelineOverride.PipelineReleaseCounter, valuesOverrideResponse.PipelineOverride.Id, overrideRequest.WfrId) + }) _ = impl.markImageScanDeployed(newCtx, overrideRequest.AppId, overrideRequest.EnvId, overrideRequest.ClusterId, valuesOverrideResponse.Artifact.ImageDigest, valuesOverrideResponse.Artifact.ScanEnabled, valuesOverrideResponse.Artifact.Image) diff --git a/pkg/eventProcessor/in/WorkflowEventProcessorService.go b/pkg/eventProcessor/in/WorkflowEventProcessorService.go index 7012b20adf..aa47da205a 100644 --- a/pkg/eventProcessor/in/WorkflowEventProcessorService.go +++ b/pkg/eventProcessor/in/WorkflowEventProcessorService.go @@ -21,7 +21,13 @@ import ( "encoding/json" "errors" "fmt" + "slices" + "strconv" + "sync" + "time" + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/devtron-labs/common-lib/async" pubsub "github.com/devtron-labs/common-lib/pubsub-lib" "github.com/devtron-labs/common-lib/pubsub-lib/model" "github.com/devtron-labs/common-lib/utils/registry" @@ -63,10 +69,6 @@ import ( "go.uber.org/zap" "gopkg.in/go-playground/validator.v9" "k8s.io/utils/pointer" - "slices" - "strconv" - "sync" - "time" ) type WorkflowEventProcessorImpl struct { @@ -90,6 +92,7 @@ type WorkflowEventProcessorImpl struct { cdPipelineConfigService pipeline.CdPipelineConfigService userDeploymentRequestService service.UserDeploymentRequestService ucid ucid.Service + asyncRunnable *async.Runnable devtronAppReleaseContextMap map[int]bean.DevtronAppReleaseContextType devtronAppReleaseContextMapLock *sync.Mutex @@ -127,7 +130,8 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, ciArtifactRepository repository.CiArtifactRepository, cdWorkflowRepository pipelineConfig.CdWorkflowRepository, deploymentConfigService common.DeploymentConfigService, - ciHandlerService trigger.HandlerService) (*WorkflowEventProcessorImpl, error) { + ciHandlerService trigger.HandlerService, + asyncRunnable *async.Runnable) (*WorkflowEventProcessorImpl, error) { impl := &WorkflowEventProcessorImpl{ logger: logger, pubSubClient: pubSubClient, @@ -156,6 +160,7 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, cdWorkflowRepository: cdWorkflowRepository, deploymentConfigService: deploymentConfigService, ciHandlerService: ciHandlerService, + asyncRunnable: asyncRunnable, } appServiceConfig, err := app.GetAppServiceConfig() if err != nil { @@ -163,7 +168,7 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger, } impl.appServiceConfig = appServiceConfig // handle incomplete deployment requests after restart - go impl.ProcessIncompleteDeploymentReq() + impl.asyncRunnable.Execute(func() { impl.ProcessIncompleteDeploymentReq() }) return impl, nil } diff --git a/pkg/k8s/K8sCommonService.go b/pkg/k8s/K8sCommonService.go index 360b8ab53d..39cae5dd0f 100644 --- a/pkg/k8s/K8sCommonService.go +++ b/pkg/k8s/K8sCommonService.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils/k8s" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/devtron/api/bean/AppView" @@ -75,6 +76,7 @@ type K8sCommonServiceImpl struct { K8sApplicationServiceConfig *K8sApplicationServiceConfig argoApplicationConfigService config.ArgoApplicationConfigService ClusterReadService read.ClusterReadService + asyncRunnable *async.Runnable } type K8sApplicationServiceConfig struct { BatchSize int `env:"BATCH_SIZE" envDefault:"5" description:"there is feature to get URL's of services/ingresses. so to extract those, we need to parse all the servcie and ingress objects of the application. this BATCH_SIZE flag controls the no of these objects get parsed in one go."` @@ -83,7 +85,7 @@ type K8sApplicationServiceConfig struct { func NewK8sCommonServiceImpl(Logger *zap.SugaredLogger, k8sUtils *k8s.K8sServiceImpl, argoApplicationConfigService config.ArgoApplicationConfigService, - ClusterReadService read.ClusterReadService) *K8sCommonServiceImpl { + ClusterReadService read.ClusterReadService, asyncRunnable *async.Runnable) *K8sCommonServiceImpl { cfg := &K8sApplicationServiceConfig{} err := env.Parse(cfg) if err != nil { @@ -95,6 +97,7 @@ func NewK8sCommonServiceImpl(Logger *zap.SugaredLogger, k8sUtils *k8s.K8sService K8sApplicationServiceConfig: cfg, argoApplicationConfigService: argoApplicationConfigService, ClusterReadService: ClusterReadService, + asyncRunnable: asyncRunnable, } } @@ -295,7 +298,8 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque var res []bean5.BatchResourceResponse ctx, cancel := context.WithTimeout(ctx, time.Duration(impl.K8sApplicationServiceConfig.TimeOutInSeconds)*time.Second) defer cancel() - go func() { + + runnableFunc := func() { ans := impl.getManifestsByBatch(ctx, requests) select { case <-ctx.Done(): @@ -303,7 +307,9 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque default: ch <- ans } - }() + } + impl.asyncRunnable.Execute(runnableFunc) + select { case ans := <-ch: res = ans @@ -398,7 +404,7 @@ func (impl *K8sCommonServiceImpl) getManifestsByBatch(ctx context.Context, reque var wg sync.WaitGroup for j := 0; j < batchSize; j++ { wg.Add(1) - go func(j int) { + runnableFunc := func(j int) { resp := bean5.BatchResourceResponse{} response, err := impl.GetResource(ctx, &requests[i+j]) if response != nil { @@ -407,7 +413,8 @@ func (impl *K8sCommonServiceImpl) getManifestsByBatch(ctx context.Context, reque resp.Err = err res[i+j] = resp wg.Done() - }(j) + } + impl.asyncRunnable.Execute(func() { runnableFunc(j) }) } wg.Wait() i += batchSize diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index 87232666fc..4e50fc0b93 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -24,6 +24,15 @@ import ( "encoding/json" errors2 "errors" "fmt" + "github.com/devtron-labs/common-lib/async" + "io" + "log" + "net/http" + "strconv" + "strings" + "sync" + "time" + "github.com/caarlos0/env" "github.com/devtron-labs/common-lib/utils/k8s" "github.com/devtron-labs/devtron/internal/middleware" @@ -36,14 +45,7 @@ import ( "github.com/devtron-labs/devtron/pkg/cluster/repository" errors1 "github.com/juju/errors" "go.uber.org/zap" - "io" "k8s.io/apimachinery/pkg/api/errors" - "log" - "net/http" - "strconv" - "strings" - "sync" - "time" "gopkg.in/igm/sockjs-go.v3/sockjs" v1 "k8s.io/api/core/v1" @@ -458,12 +460,13 @@ type TerminalSessionHandlerImpl struct { ephemeralContainerService cluster.EphemeralContainerService argoApplicationConfigService config.ArgoApplicationConfigService ClusterReadService read.ClusterReadService + asyncRunnable *async.Runnable } func NewTerminalSessionHandlerImpl(environmentService environment.EnvironmentService, logger *zap.SugaredLogger, k8sUtil *k8s.K8sServiceImpl, ephemeralContainerService cluster.EphemeralContainerService, argoApplicationConfigService config.ArgoApplicationConfigService, - ClusterReadService read.ClusterReadService) *TerminalSessionHandlerImpl { + ClusterReadService read.ClusterReadService, asyncRunnable *async.Runnable) *TerminalSessionHandlerImpl { return &TerminalSessionHandlerImpl{ environmentService: environmentService, logger: logger, @@ -471,6 +474,7 @@ func NewTerminalSessionHandlerImpl(environmentService environment.EnvironmentSer ephemeralContainerService: ephemeralContainerService, argoApplicationConfigService: argoApplicationConfigService, ClusterReadService: ClusterReadService, + asyncRunnable: asyncRunnable, } } @@ -516,18 +520,18 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR }) config, client, err := impl.getClientSetAndRestConfigForTerminalConn(req) - go func() { + impl.asyncRunnable.Execute(func() { err := impl.saveEphemeralContainerTerminalAccessAudit(req) if err != nil { impl.logger.Errorw("error in saving ephemeral container terminal access audit,so skipping auditing", "err", err) } - }() + }) if err != nil { impl.logger.Errorw("error in fetching config", "err", err) return http.StatusInternalServerError, nil, err } - go WaitForTerminal(client, config, req) + impl.asyncRunnable.Execute(func() { WaitForTerminal(client, config, req) }) return http.StatusOK, &TerminalMessage{SessionID: sessionID}, nil } diff --git a/pkg/variables/ScopedVariableService.go b/pkg/variables/ScopedVariableService.go index bf9e628e84..bfde4840b9 100644 --- a/pkg/variables/ScopedVariableService.go +++ b/pkg/variables/ScopedVariableService.go @@ -18,8 +18,13 @@ package variables import ( "fmt" + "regexp" + "strings" + "sync" + "github.com/argoproj/argo-workflows/v3/errors" "github.com/caarlos0/env" + "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/devtron/internal/sql/repository/app" repository3 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository" "github.com/devtron-labs/devtron/pkg/cluster/repository" @@ -34,9 +39,6 @@ import ( "github.com/go-pg/pg" "go.uber.org/zap" "golang.org/x/exp/slices" - "regexp" - "strings" - "sync" ) type ScopedVariableService interface { @@ -55,15 +57,17 @@ type ScopedVariableServiceImpl struct { qualifierMappingService resourceQualifiers.QualifierMappingService VariableNameConfig *VariableConfig VariableCache *cache.VariableCacheObj + asyncRunnable *async.Runnable } func NewScopedVariableServiceImpl(logger *zap.SugaredLogger, scopedVariableRepository repository2.ScopedVariableRepository, appRepository app.AppRepository, environmentRepository repository3.EnvironmentRepository, devtronResourceSearchableKeyService read.DevtronResourceSearchableKeyService, clusterRepository repository.ClusterRepository, - qualifierMappingService resourceQualifiers.QualifierMappingService) (*ScopedVariableServiceImpl, error) { + qualifierMappingService resourceQualifiers.QualifierMappingService, asyncRunnable *async.Runnable) (*ScopedVariableServiceImpl, error) { scopedVariableService := &ScopedVariableServiceImpl{ logger: logger, scopedVariableRepository: scopedVariableRepository, qualifierMappingService: qualifierMappingService, VariableCache: &cache.VariableCacheObj{CacheLock: &sync.Mutex{}}, + asyncRunnable: asyncRunnable, } cfg, err := GetVariableNameConfig() if err != nil { @@ -83,7 +87,7 @@ type VariableConfig struct { func loadVariableCache(cfg *VariableConfig, service *ScopedVariableServiceImpl) { if cfg.VariableCacheEnabled { - go service.loadVarCache() + service.asyncRunnable.Execute(func() { service.loadVarCache() }) } } func GetVariableNameConfig() (*VariableConfig, error) { diff --git a/pkg/workflow/dag/WorkflowDagExecutor.go b/pkg/workflow/dag/WorkflowDagExecutor.go index a53e90dbd6..aed15b9c0e 100644 --- a/pkg/workflow/dag/WorkflowDagExecutor.go +++ b/pkg/workflow/dag/WorkflowDagExecutor.go @@ -1050,7 +1050,10 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger } else { ciArtifactArr = append(ciArtifactArr, pluginArtifacts[0]) } - go impl.WriteCiSuccessEvent(request, pipelineModal, buildArtifact) + runnableFunc := func() { + impl.WriteCiSuccessEvent(request, pipelineModal, buildArtifact) + } + impl.asyncRunnable.Execute(runnableFunc) async := false // execute auto trigger in batch on CI success event @@ -1072,7 +1075,7 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger for j := 0; j < batchSize; j++ { wg.Add(1) index := i + j - go func(index int) { + runnableFunc := func(index int) { defer wg.Done() ciArtifact := ciArtifactArr[index] // handle individual CiArtifact success event @@ -1080,7 +1083,8 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger if err != nil { impl.logger.Errorw("error on handle ci success event", "ciArtifactId", ciArtifact.Id, "err", err) } - }(index) + } + impl.asyncRunnable.Execute(func() { runnableFunc(index) }) } wg.Wait() i += batchSize diff --git a/pkg/workflow/status/WorkflowStatusService.go b/pkg/workflow/status/WorkflowStatusService.go index 78d962f884..286c405766 100644 --- a/pkg/workflow/status/WorkflowStatusService.go +++ b/pkg/workflow/status/WorkflowStatusService.go @@ -19,6 +19,9 @@ package status import ( "context" "fmt" + "github.com/devtron-labs/common-lib/async" + "time" + bean2 "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/api/helm-app/service/bean" "github.com/devtron-labs/devtron/client/argocdServer" @@ -46,7 +49,6 @@ import ( "github.com/go-pg/pg" "go.uber.org/zap" "k8s.io/utils/strings/slices" - "time" ) type WorkflowStatusService interface { @@ -88,6 +90,7 @@ type WorkflowStatusServiceImpl struct { appListingService app.AppListingService deploymentConfigService common2.DeploymentConfigService cdWorkflowRunnerService cd.CdWorkflowRunnerService + asyncRunnable *async.Runnable } func NewWorkflowStatusServiceImpl(logger *zap.SugaredLogger, @@ -110,6 +113,7 @@ func NewWorkflowStatusServiceImpl(logger *zap.SugaredLogger, appListingService app.AppListingService, deploymentConfigService common2.DeploymentConfigService, cdWorkflowRunnerService cd.CdWorkflowRunnerService, + asyncRunnable *async.Runnable, ) (*WorkflowStatusServiceImpl, error) { impl := &WorkflowStatusServiceImpl{ logger: logger, @@ -134,6 +138,7 @@ func NewWorkflowStatusServiceImpl(logger *zap.SugaredLogger, appListingService: appListingService, deploymentConfigService: deploymentConfigService, cdWorkflowRunnerService: cdWorkflowRunnerService, + asyncRunnable: asyncRunnable, } config, err := types.GetCdConfig() if err != nil { @@ -196,7 +201,9 @@ func (impl *WorkflowStatusServiceImpl) CheckHelmAppStatusPeriodicallyAndUpdateIn impl.logger.Errorw("error in getting latest pipeline override by cdWorkflowId", "err", err, "cdWorkflowId", wfr.CdWorkflowId) return err } - go impl.appService.WriteCDSuccessEvent(pipelineOverride.Pipeline.AppId, pipelineOverride.Pipeline.EnvironmentId, pipelineOverride) + impl.asyncRunnable.Execute(func() { + impl.appService.WriteCDSuccessEvent(pipelineOverride.Pipeline.AppId, pipelineOverride.Pipeline.EnvironmentId, pipelineOverride) + }) err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(bean3.TriggerContext{}, pipelineOverride) if err != nil { impl.logger.Errorw("error on handling deployment success event", "wfr", wfr, "err", err) diff --git a/wire_gen.go b/wire_gen.go index ba06c486ed..286d488059 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run github.com/google/wire/cmd/wire +//go:generate go run -mod=mod github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject @@ -365,7 +365,8 @@ func InitializeApp() (*App, error) { k8sInformerFactoryImpl := informer.NewK8sInformerFactoryImpl(sugaredLogger, syncMap, k8sServiceImpl) cronLoggerImpl := cron.NewCronLoggerImpl(sugaredLogger) clusterReadServiceImpl := read2.NewClusterReadServiceImpl(sugaredLogger, clusterRepositoryImpl) - clusterServiceImpl, err := cluster.NewClusterServiceImpl(clusterRepositoryImpl, sugaredLogger, k8sServiceImpl, k8sInformerFactoryImpl, userAuthRepositoryImpl, userRepositoryImpl, roleGroupRepositoryImpl, environmentVariables, cronLoggerImpl, clusterReadServiceImpl) + runnable := asyncProvider.NewAsyncRunnable(sugaredLogger) + clusterServiceImpl, err := cluster.NewClusterServiceImpl(clusterRepositoryImpl, sugaredLogger, k8sServiceImpl, k8sInformerFactoryImpl, userAuthRepositoryImpl, userRepositoryImpl, roleGroupRepositoryImpl, environmentVariables, cronLoggerImpl, clusterReadServiceImpl, runnable) if err != nil { return nil, err } @@ -378,14 +379,14 @@ func InitializeApp() (*App, error) { return nil, err } argoApplicationConfigServiceImpl := config2.NewArgoApplicationConfigServiceImpl(sugaredLogger, k8sServiceImpl, clusterRepositoryImpl) - k8sCommonServiceImpl := k8s2.NewK8sCommonServiceImpl(sugaredLogger, k8sServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl) + k8sCommonServiceImpl := k8s2.NewK8sCommonServiceImpl(sugaredLogger, k8sServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl, runnable) versionServiceImpl := version.NewVersionServiceImpl(sugaredLogger) acdAuthConfig, err := util3.GetACDAuthConfig() if err != nil { return nil, err } argoCDConfigGetterImpl := config3.NewArgoCDConfigGetter(beanConfig, environmentVariables, acdAuthConfig, clusterReadServiceImpl, sugaredLogger, k8sServiceImpl) - argoCDConnectionManagerImpl, err := connection.NewArgoCDConnectionManagerImpl(sugaredLogger, settingsManager, moduleRepositoryImpl, environmentVariables, k8sServiceImpl, k8sCommonServiceImpl, versionServiceImpl, gitOpsConfigReadServiceImpl, k8sRuntimeConfig, argoCDConfigGetterImpl) + argoCDConnectionManagerImpl, err := connection.NewArgoCDConnectionManagerImpl(sugaredLogger, settingsManager, moduleRepositoryImpl, environmentVariables, k8sServiceImpl, k8sCommonServiceImpl, versionServiceImpl, gitOpsConfigReadServiceImpl, k8sRuntimeConfig, argoCDConfigGetterImpl, runnable) if err != nil { return nil, err } @@ -404,7 +405,6 @@ func InitializeApp() (*App, error) { } chartTemplateServiceImpl := util.NewChartTemplateServiceImpl(sugaredLogger) gitOperationServiceImpl := git.NewGitOperationServiceImpl(sugaredLogger, gitFactory, gitOpsConfigReadServiceImpl, chartTemplateServiceImpl, environmentVariables) - runnable := asyncProvider.NewAsyncRunnable(sugaredLogger) repositoryCredsK8sClientImpl := repoCredsK8sClient.NewRepositoryCredsK8sClientImpl(sugaredLogger, k8sServiceImpl) argoClientWrapperServiceEAImpl := argocdServer.NewArgoClientWrapperServiceEAImpl(sugaredLogger, repositoryCredsK8sClientImpl, argoCDConfigGetterImpl) argoK8sClientImpl := argocdServer.NewArgoK8sClientImpl(sugaredLogger, k8sServiceImpl) @@ -518,7 +518,7 @@ func InitializeApp() (*App, error) { if err != nil { return nil, err } - scopedVariableServiceImpl, err := variables.NewScopedVariableServiceImpl(sugaredLogger, scopedVariableRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, devtronResourceSearchableKeyServiceImpl, clusterRepositoryImpl, qualifierMappingServiceImpl) + scopedVariableServiceImpl, err := variables.NewScopedVariableServiceImpl(sugaredLogger, scopedVariableRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, devtronResourceSearchableKeyServiceImpl, clusterRepositoryImpl, qualifierMappingServiceImpl, runnable) if err != nil { return nil, err } @@ -620,7 +620,7 @@ func InitializeApp() (*App, error) { workflowStageRepositoryImpl := repository18.NewWorkflowStageRepositoryImpl(sugaredLogger, db) workFlowStageStatusServiceImpl := workflowStatus.NewWorkflowStageFlowStatusServiceImpl(sugaredLogger, workflowStageRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowRepositoryImpl, transactionUtilImpl) cdWorkflowRunnerServiceImpl := cd.NewCdWorkflowRunnerServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl, workFlowStageStatusServiceImpl, transactionUtilImpl) - appServiceImpl := app2.NewAppService(pipelineOverrideRepositoryImpl, utilMergeUtil, sugaredLogger, pipelineRepositoryImpl, eventRESTClientImpl, eventSimpleFactoryImpl, appRepositoryImpl, configMapRepositoryImpl, chartRepositoryImpl, cdWorkflowRepositoryImpl, commonServiceImpl, chartTemplateServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineStatusTimelineResourcesServiceImpl, pipelineStatusSyncDetailServiceImpl, pipelineStatusTimelineServiceImpl, appServiceConfig, appStatusServiceImpl, installedAppReadServiceImpl, installedAppVersionHistoryRepositoryImpl, scopedVariableCMCSManagerImpl, acdConfig, gitOpsConfigReadServiceImpl, gitOperationServiceImpl, deploymentTemplateServiceImpl, appListingServiceImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl, cdWorkflowRunnerServiceImpl) + appServiceImpl := app2.NewAppService(pipelineOverrideRepositoryImpl, utilMergeUtil, sugaredLogger, pipelineRepositoryImpl, eventRESTClientImpl, eventSimpleFactoryImpl, appRepositoryImpl, configMapRepositoryImpl, chartRepositoryImpl, cdWorkflowRepositoryImpl, commonServiceImpl, chartTemplateServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineStatusTimelineResourcesServiceImpl, pipelineStatusSyncDetailServiceImpl, pipelineStatusTimelineServiceImpl, appServiceConfig, appStatusServiceImpl, installedAppReadServiceImpl, installedAppVersionHistoryRepositoryImpl, scopedVariableCMCSManagerImpl, acdConfig, gitOpsConfigReadServiceImpl, gitOperationServiceImpl, deploymentTemplateServiceImpl, appListingServiceImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl, cdWorkflowRunnerServiceImpl, runnable) scopedVariableManagerImpl, err := variables.NewScopedVariableManagerImpl(sugaredLogger, scopedVariableServiceImpl, variableEntityMappingServiceImpl, variableSnapshotHistoryServiceImpl, variableTemplateParserImpl) if err != nil { return nil, err @@ -686,7 +686,7 @@ func InitializeApp() (*App, error) { return nil, err } blobStorageConfigServiceImpl := pipeline.NewBlobStorageConfigServiceImpl(sugaredLogger, k8sServiceImpl, ciCdConfig) - handlerServiceImpl := trigger.NewHandlerServiceImpl(sugaredLogger, workflowServiceImpl, ciPipelineMaterialRepositoryImpl, ciPipelineRepositoryImpl, ciArtifactRepositoryImpl, pipelineStageServiceImpl, userServiceImpl, ciTemplateReadServiceImpl, appCrudOperationServiceImpl, environmentRepositoryImpl, appRepositoryImpl, scopedVariableManagerImpl, customTagServiceImpl, ciCdPipelineOrchestratorImpl, attributesServiceImpl, pluginInputVariableParserImpl, globalPluginServiceImpl, ciServiceImpl, ciWorkflowRepositoryImpl, clientImpl, ciLogServiceImpl, blobStorageConfigServiceImpl, clusterServiceImplExtended, environmentServiceImpl, k8sServiceImpl) + handlerServiceImpl := trigger.NewHandlerServiceImpl(sugaredLogger, workflowServiceImpl, ciPipelineMaterialRepositoryImpl, ciPipelineRepositoryImpl, ciArtifactRepositoryImpl, pipelineStageServiceImpl, userServiceImpl, ciTemplateReadServiceImpl, appCrudOperationServiceImpl, environmentRepositoryImpl, appRepositoryImpl, scopedVariableManagerImpl, customTagServiceImpl, ciCdPipelineOrchestratorImpl, attributesServiceImpl, pluginInputVariableParserImpl, globalPluginServiceImpl, ciServiceImpl, ciWorkflowRepositoryImpl, clientImpl, ciLogServiceImpl, blobStorageConfigServiceImpl, clusterServiceImplExtended, environmentServiceImpl, k8sServiceImpl, runnable) gitWebhookServiceImpl := gitWebhook.NewGitWebhookServiceImpl(sugaredLogger, gitWebhookRepositoryImpl, handlerServiceImpl) gitWebhookRestHandlerImpl := restHandler.NewGitWebhookRestHandlerImpl(sugaredLogger, gitWebhookServiceImpl) ecrConfig, err := pipeline.GetEcrConfig() @@ -771,7 +771,7 @@ func InitializeApp() (*App, error) { scanToolExecutionHistoryMappingRepositoryImpl := repository24.NewScanToolExecutionHistoryMappingRepositoryImpl(db, sugaredLogger) cdWorkflowReadServiceImpl := read20.NewCdWorkflowReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) imageScanServiceImpl := imageScanning.NewImageScanServiceImpl(sugaredLogger, imageScanHistoryRepositoryImpl, imageScanResultRepositoryImpl, imageScanObjectMetaRepositoryImpl, cveStoreRepositoryImpl, imageScanDeployInfoRepositoryImpl, userServiceImpl, appRepositoryImpl, environmentServiceImpl, ciArtifactRepositoryImpl, policyServiceImpl, pipelineRepositoryImpl, ciPipelineRepositoryImpl, scanToolMetadataRepositoryImpl, scanToolExecutionHistoryMappingRepositoryImpl, cvePolicyRepositoryImpl, cdWorkflowReadServiceImpl) - devtronAppsHandlerServiceImpl, err := devtronApps.NewHandlerServiceImpl(sugaredLogger, cdWorkflowCommonServiceImpl, gitOpsManifestPushServiceImpl, gitOpsConfigReadServiceImpl, argoK8sClientImpl, acdConfig, argoClientWrapperServiceImpl, pipelineStatusTimelineServiceImpl, chartTemplateServiceImpl, workflowEventPublishServiceImpl, manifestCreationServiceImpl, deployedConfigurationHistoryServiceImpl, pipelineStageServiceImpl, globalPluginServiceImpl, customTagServiceImpl, pluginInputVariableParserImpl, prePostCdScriptHistoryServiceImpl, scopedVariableCMCSManagerImpl, imageDigestPolicyServiceImpl, userServiceImpl, helmAppServiceImpl, enforcerUtilImpl, userDeploymentRequestServiceImpl, helmAppClientImpl, eventSimpleFactoryImpl, eventRESTClientImpl, environmentVariables, appRepositoryImpl, ciPipelineMaterialRepositoryImpl, imageScanHistoryReadServiceImpl, imageScanDeployInfoReadServiceImpl, imageScanDeployInfoServiceImpl, pipelineRepositoryImpl, pipelineOverrideRepositoryImpl, manifestPushConfigRepositoryImpl, chartRepositoryImpl, environmentRepositoryImpl, cdWorkflowRepositoryImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciTemplateReadServiceImpl, gitMaterialReadServiceImpl, appLabelRepositoryImpl, ciPipelineRepositoryImpl, appWorkflowRepositoryImpl, dockerArtifactStoreRepositoryImpl, imageScanServiceImpl, k8sServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, ciCdPipelineOrchestratorImpl, gitOperationServiceImpl, attributesServiceImpl, clusterRepositoryImpl, cdWorkflowRunnerServiceImpl, clusterServiceImplExtended, ciLogServiceImpl, workflowServiceImpl, blobStorageConfigServiceImpl) + devtronAppsHandlerServiceImpl, err := devtronApps.NewHandlerServiceImpl(sugaredLogger, cdWorkflowCommonServiceImpl, gitOpsManifestPushServiceImpl, gitOpsConfigReadServiceImpl, argoK8sClientImpl, acdConfig, argoClientWrapperServiceImpl, pipelineStatusTimelineServiceImpl, chartTemplateServiceImpl, workflowEventPublishServiceImpl, manifestCreationServiceImpl, deployedConfigurationHistoryServiceImpl, pipelineStageServiceImpl, globalPluginServiceImpl, customTagServiceImpl, pluginInputVariableParserImpl, prePostCdScriptHistoryServiceImpl, scopedVariableCMCSManagerImpl, imageDigestPolicyServiceImpl, userServiceImpl, helmAppServiceImpl, enforcerUtilImpl, userDeploymentRequestServiceImpl, helmAppClientImpl, eventSimpleFactoryImpl, eventRESTClientImpl, environmentVariables, appRepositoryImpl, ciPipelineMaterialRepositoryImpl, imageScanHistoryReadServiceImpl, imageScanDeployInfoReadServiceImpl, imageScanDeployInfoServiceImpl, pipelineRepositoryImpl, pipelineOverrideRepositoryImpl, manifestPushConfigRepositoryImpl, chartRepositoryImpl, environmentRepositoryImpl, cdWorkflowRepositoryImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciTemplateReadServiceImpl, gitMaterialReadServiceImpl, appLabelRepositoryImpl, ciPipelineRepositoryImpl, appWorkflowRepositoryImpl, dockerArtifactStoreRepositoryImpl, imageScanServiceImpl, k8sServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, ciCdPipelineOrchestratorImpl, gitOperationServiceImpl, attributesServiceImpl, clusterRepositoryImpl, cdWorkflowRunnerServiceImpl, clusterServiceImplExtended, ciLogServiceImpl, workflowServiceImpl, blobStorageConfigServiceImpl, runnable) if err != nil { return nil, err } @@ -825,7 +825,7 @@ func InitializeApp() (*App, error) { k8sResourceHistoryServiceImpl := kubernetesResourceAuditLogs.Newk8sResourceHistoryServiceImpl(k8sResourceHistoryRepositoryImpl, sugaredLogger, appRepositoryImpl, environmentRepositoryImpl) ephemeralContainersRepositoryImpl := repository5.NewEphemeralContainersRepositoryImpl(db, transactionUtilImpl) ephemeralContainerServiceImpl := cluster.NewEphemeralContainerServiceImpl(ephemeralContainersRepositoryImpl, sugaredLogger) - terminalSessionHandlerImpl := terminal.NewTerminalSessionHandlerImpl(environmentServiceImpl, sugaredLogger, k8sServiceImpl, ephemeralContainerServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl) + terminalSessionHandlerImpl := terminal.NewTerminalSessionHandlerImpl(environmentServiceImpl, sugaredLogger, k8sServiceImpl, ephemeralContainerServiceImpl, argoApplicationConfigServiceImpl, clusterReadServiceImpl, runnable) fluxApplicationServiceImpl := fluxApplication.NewFluxApplicationServiceImpl(sugaredLogger, helmAppReadServiceImpl, clusterServiceImplExtended, helmAppClientImpl, pumpImpl) k8sApplicationServiceImpl, err := application2.NewK8sApplicationServiceImpl(sugaredLogger, clusterServiceImplExtended, pumpImpl, helmAppServiceImpl, k8sServiceImpl, acdAuthConfig, k8sResourceHistoryServiceImpl, k8sCommonServiceImpl, terminalSessionHandlerImpl, ephemeralContainerServiceImpl, ephemeralContainersRepositoryImpl, fluxApplicationServiceImpl, clusterReadServiceImpl) if err != nil { @@ -833,7 +833,7 @@ func InitializeApp() (*App, error) { } argoApplicationServiceImpl := argoApplication.NewArgoApplicationServiceImpl(sugaredLogger, clusterRepositoryImpl, k8sServiceImpl, helmAppClientImpl, helmAppServiceImpl, k8sApplicationServiceImpl, argoApplicationConfigServiceImpl, deploymentConfigServiceImpl) argoApplicationReadServiceImpl := read22.NewArgoApplicationReadServiceImpl(sugaredLogger, clusterRepositoryImpl, k8sServiceImpl, helmAppClientImpl, helmAppServiceImpl) - argoApplicationServiceExtendedImpl := argoApplication.NewArgoApplicationServiceExtendedServiceImpl(acdAuthConfig, argoApplicationServiceImpl, argoClientWrapperServiceImpl, argoApplicationReadServiceImpl, clusterServiceImplExtended) + argoApplicationServiceExtendedImpl := argoApplication.NewArgoApplicationServiceExtendedServiceImpl(acdAuthConfig, argoApplicationServiceImpl, argoClientWrapperServiceImpl, argoApplicationReadServiceImpl, clusterServiceImplExtended, runnable) installedAppResourceServiceImpl := resource.NewInstalledAppResourceServiceImpl(sugaredLogger, installedAppRepositoryImpl, appStoreApplicationVersionRepositoryImpl, argoClientWrapperServiceImpl, acdAuthConfig, installedAppVersionHistoryRepositoryImpl, helmAppServiceImpl, helmAppReadServiceImpl, appStatusServiceImpl, k8sCommonServiceImpl, k8sApplicationServiceImpl, k8sServiceImpl, deploymentConfigServiceImpl, ociRegistryConfigRepositoryImpl, argoApplicationServiceExtendedImpl) chartGroupEntriesRepositoryImpl := repository28.NewChartGroupEntriesRepositoryImpl(db, sugaredLogger) chartGroupReposotoryImpl := repository28.NewChartGroupReposotoryImpl(db, sugaredLogger) @@ -855,7 +855,7 @@ func InitializeApp() (*App, error) { return nil, err } cdPipelineEventPublishServiceImpl := out.NewCDPipelineEventPublishServiceImpl(sugaredLogger, pubSubClientServiceImpl) - workflowStatusServiceImpl, err := status2.NewWorkflowStatusServiceImpl(sugaredLogger, workflowDagExecutorImpl, pipelineStatusTimelineServiceImpl, appServiceImpl, appStatusServiceImpl, acdConfig, appServiceConfig, pipelineStatusSyncDetailServiceImpl, argoClientWrapperServiceImpl, cdPipelineEventPublishServiceImpl, cdWorkflowRepositoryImpl, pipelineOverrideRepositoryImpl, installedAppVersionHistoryRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, installedAppRepositoryImpl, installedAppReadServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineRepositoryImpl, appListingServiceImpl, deploymentConfigServiceImpl, cdWorkflowRunnerServiceImpl) + workflowStatusServiceImpl, err := status2.NewWorkflowStatusServiceImpl(sugaredLogger, workflowDagExecutorImpl, pipelineStatusTimelineServiceImpl, appServiceImpl, appStatusServiceImpl, acdConfig, appServiceConfig, pipelineStatusSyncDetailServiceImpl, argoClientWrapperServiceImpl, cdPipelineEventPublishServiceImpl, cdWorkflowRepositoryImpl, pipelineOverrideRepositoryImpl, installedAppVersionHistoryRepositoryImpl, appRepositoryImpl, environmentRepositoryImpl, installedAppRepositoryImpl, installedAppReadServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineRepositoryImpl, appListingServiceImpl, deploymentConfigServiceImpl, cdWorkflowRunnerServiceImpl, runnable) if err != nil { return nil, err } @@ -895,7 +895,7 @@ func InitializeApp() (*App, error) { deploymentTemplateActionImpl := batch.NewDeploymentTemplateActionImpl(sugaredLogger, appRepositoryImpl, chartServiceImpl) deploymentActionImpl := batch.NewDeploymentActionImpl(pipelineBuilderImpl, sugaredLogger, appRepositoryImpl, environmentServiceImpl, appWorkflowRepositoryImpl, ciPipelineRepositoryImpl, pipelineRepositoryImpl, dataHolderActionImpl, deploymentTemplateActionImpl) workflowActionImpl := batch.NewWorkflowActionImpl(sugaredLogger, appRepositoryImpl, appWorkflowServiceImpl, buildActionImpl, deploymentActionImpl) - batchOperationRestHandlerImpl := restHandler.NewBatchOperationRestHandlerImpl(userServiceImpl, enforcerImpl, workflowActionImpl, teamServiceImpl, sugaredLogger, enforcerUtilImpl) + batchOperationRestHandlerImpl := restHandler.NewBatchOperationRestHandlerImpl(userServiceImpl, enforcerImpl, workflowActionImpl, teamServiceImpl, sugaredLogger, enforcerUtilImpl, runnable) batchOperationRouterImpl := router.NewBatchOperationRouterImpl(batchOperationRestHandlerImpl, sugaredLogger) chartGroupRestHandlerImpl := chartGroup2.NewChartGroupRestHandlerImpl(chartGroupServiceImpl, sugaredLogger, userServiceImpl, enforcerImpl, validate) chartGroupRouterImpl := chartGroup2.NewChartGroupRouterImpl(chartGroupRestHandlerImpl) @@ -958,7 +958,7 @@ func InitializeApp() (*App, error) { webhookListenerRouterImpl := router.NewWebhookListenerRouterImpl(webhookEventHandlerImpl) appFilteringRestHandlerImpl := appList.NewAppFilteringRestHandlerImpl(sugaredLogger, teamServiceImpl, enforcerImpl, userServiceImpl, clusterServiceImplExtended, environmentServiceImpl, teamReadServiceImpl) appFilteringRouterImpl := appList2.NewAppFilteringRouterImpl(appFilteringRestHandlerImpl) - resourceTreeServiceImpl := resourceTree.NewServiceImpl(sugaredLogger, appListingServiceImpl, appStatusServiceImpl, argoApplicationServiceExtendedImpl, cdApplicationStatusUpdateHandlerImpl, helmAppReadServiceImpl, helmAppServiceImpl, k8sApplicationServiceImpl, k8sCommonServiceImpl, environmentReadServiceImpl) + resourceTreeServiceImpl := resourceTree.NewServiceImpl(sugaredLogger, appListingServiceImpl, appStatusServiceImpl, argoApplicationServiceExtendedImpl, cdApplicationStatusUpdateHandlerImpl, helmAppReadServiceImpl, helmAppServiceImpl, k8sApplicationServiceImpl, k8sCommonServiceImpl, environmentReadServiceImpl, runnable) appListingRestHandlerImpl := appList.NewAppListingRestHandlerImpl(appListingServiceImpl, enforcerImpl, pipelineBuilderImpl, sugaredLogger, enforcerUtilImpl, deploymentGroupServiceImpl, userServiceImpl, k8sCommonServiceImpl, installedAppDBExtendedServiceImpl, installedAppResourceServiceImpl, pipelineRepositoryImpl, k8sApplicationServiceImpl, deploymentConfigServiceImpl, resourceTreeServiceImpl) appListingRouterImpl := appList2.NewAppListingRouterImpl(appListingRestHandlerImpl) appInfoRestHandlerImpl := appInfo.NewAppInfoRestHandlerImpl(sugaredLogger, appCrudOperationServiceImpl, userServiceImpl, validate, enforcerUtilImpl, enforcerImpl, helmAppServiceImpl, enforcerUtilHelmImpl, genericNoteServiceImpl, commonEnforcementUtilImpl) @@ -1088,7 +1088,7 @@ func InitializeApp() (*App, error) { cdWorkflowServiceImpl := cd.NewCdWorkflowServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) cdWorkflowRunnerReadServiceImpl := read20.NewCdWorkflowRunnerReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) webhookServiceImpl := pipeline.NewWebhookServiceImpl(ciArtifactRepositoryImpl, sugaredLogger, ciPipelineRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowCommonServiceImpl, workFlowStageStatusServiceImpl, ciServiceImpl) - workflowEventProcessorImpl, err := in.NewWorkflowEventProcessorImpl(sugaredLogger, pubSubClientServiceImpl, cdWorkflowServiceImpl, cdWorkflowReadServiceImpl, cdWorkflowRunnerServiceImpl, cdWorkflowRunnerReadServiceImpl, workflowDagExecutorImpl, ciHandlerImpl, cdHandlerImpl, eventSimpleFactoryImpl, eventRESTClientImpl, devtronAppsHandlerServiceImpl, deployedAppServiceImpl, webhookServiceImpl, validate, environmentVariables, cdWorkflowCommonServiceImpl, cdPipelineConfigServiceImpl, userDeploymentRequestServiceImpl, serviceImpl, pipelineRepositoryImpl, ciArtifactRepositoryImpl, cdWorkflowRepositoryImpl, deploymentConfigServiceImpl, handlerServiceImpl) + workflowEventProcessorImpl, err := in.NewWorkflowEventProcessorImpl(sugaredLogger, pubSubClientServiceImpl, cdWorkflowServiceImpl, cdWorkflowReadServiceImpl, cdWorkflowRunnerServiceImpl, cdWorkflowRunnerReadServiceImpl, workflowDagExecutorImpl, ciHandlerImpl, cdHandlerImpl, eventSimpleFactoryImpl, eventRESTClientImpl, devtronAppsHandlerServiceImpl, deployedAppServiceImpl, webhookServiceImpl, validate, environmentVariables, cdWorkflowCommonServiceImpl, cdPipelineConfigServiceImpl, userDeploymentRequestServiceImpl, serviceImpl, pipelineRepositoryImpl, ciArtifactRepositoryImpl, cdWorkflowRepositoryImpl, deploymentConfigServiceImpl, handlerServiceImpl, runnable) if err != nil { return nil, err }