Skip to content

misc: go routines wrapped into panic safe function #6589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions api/restHandler/BatchOperationRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"net/http"

"github.com/devtron-labs/common-lib/async"
"github.com/devtron-labs/devtron/api/restHandler/common"
"github.com/devtron-labs/devtron/pkg/apis/devtron/v1"
"github.com/devtron-labs/devtron/pkg/apis/devtron/v1/validation"
Expand All @@ -44,17 +45,19 @@ type BatchOperationRestHandlerImpl struct {
teamService team.TeamService
logger *zap.SugaredLogger
enforcerUtil rbac.EnforcerUtil
asyncRunnable *async.Runnable
}

func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer casbin.Enforcer, workflowAction batch.WorkflowAction,
teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil) *BatchOperationRestHandlerImpl {
teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil, asyncRunnable *async.Runnable) *BatchOperationRestHandlerImpl {
return &BatchOperationRestHandlerImpl{
userAuthService: userAuthService,
enforcer: enforcer,
workflowAction: workflowAction,
teamService: teamService,
logger: logger,
enforcerUtil: enforcerUtil,
asyncRunnable: asyncRunnable,
}
}

Expand Down Expand Up @@ -104,13 +107,14 @@ func (handler BatchOperationRestHandlerImpl) Operate(w http.ResponseWriter, r *h

ctx, cancel := context.WithCancel(r.Context())
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
runnableFunc := func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
handler.asyncRunnable.Execute(func() { runnableFunc(ctx.Done(), cn.CloseNotify()) })
}
err = handler.workflowAction.Execute(&workflow, emptyProps, r.Context())
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions client/argocdServer/connection/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/account"
"github.com/argoproj/argo-cd/v2/util/settings"
"github.com/devtron-labs/common-lib/async"
"github.com/devtron-labs/common-lib/utils/k8s"
"github.com/devtron-labs/devtron/client/argocdServer/bean"
config2 "github.com/devtron-labs/devtron/client/argocdServer/config"
Expand Down Expand Up @@ -81,6 +82,7 @@ type ArgoCDConnectionManagerImpl struct {
gitOpsConfigReadService config.GitOpsConfigReadService
runTimeConfig *k8s.RuntimeConfig
argoCDConfigGetter config2.ArgoCDConfigGetter
asyncRunnable *async.Runnable
}

func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger,
Expand All @@ -92,7 +94,8 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger,
versionService version.VersionService,
gitOpsConfigReadService config.GitOpsConfigReadService,
runTimeConfig *k8s.RuntimeConfig,
argoCDConfigGetter config2.ArgoCDConfigGetter) (*ArgoCDConnectionManagerImpl, error) {
argoCDConfigGetter config2.ArgoCDConfigGetter,
asyncRunnable *async.Runnable) (*ArgoCDConnectionManagerImpl, error) {
argoUserServiceImpl := &ArgoCDConnectionManagerImpl{
logger: Logger,
settingsManager: settingsManager,
Expand All @@ -105,13 +108,17 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger,
gitOpsConfigReadService: gitOpsConfigReadService,
runTimeConfig: runTimeConfig,
argoCDConfigGetter: argoCDConfigGetter,
asyncRunnable: asyncRunnable,
}
if !runTimeConfig.LocalDevMode {
grpcConfig, err := argoCDConfigGetter.GetGRPCConfig()
if err != nil {
Logger.Errorw("error in GetAllGRPCConfigs", "error", err)
}
go argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig)
runnableFunc := func() {
argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig)
}
asyncRunnable.Execute(runnableFunc)
}
return argoUserServiceImpl, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/external-app/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions pkg/app/AppService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"context"
"errors"
"fmt"
"net/url"
"strconv"
"time"

health2 "github.com/argoproj/gitops-engine/pkg/health"
argoApplication "github.com/devtron-labs/devtron/client/argocdServer/bean"
"github.com/devtron-labs/devtron/internal/sql/models"
Expand All @@ -40,12 +44,10 @@ import (
"github.com/devtron-labs/devtron/pkg/deployment/manifest/deploymentTemplate/read"
bean4 "github.com/devtron-labs/devtron/pkg/deployment/trigger/devtronApps/bean"
"github.com/devtron-labs/devtron/pkg/workflow/cd"
"net/url"
"strconv"
"time"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/caarlos0/env"
"github.com/devtron-labs/common-lib/async"
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
"github.com/devtron-labs/common-lib/utils/k8s/health"
"github.com/devtron-labs/devtron/api/bean"
Expand Down Expand Up @@ -124,6 +126,7 @@ type AppServiceImpl struct {
deploymentConfigService common2.DeploymentConfigService
envConfigOverrideReadService read.EnvConfigOverrideService
cdWorkflowRunnerService cd.CdWorkflowRunnerService
asyncRunnable *async.Runnable
}

type AppService interface {
Expand Down Expand Up @@ -164,7 +167,8 @@ func NewAppService(
appListingService AppListingService,
deploymentConfigService common2.DeploymentConfigService,
envConfigOverrideReadService read.EnvConfigOverrideService,
cdWorkflowRunnerService cd.CdWorkflowRunnerService) *AppServiceImpl {
cdWorkflowRunnerService cd.CdWorkflowRunnerService,
asyncRunnable *async.Runnable) *AppServiceImpl {
appServiceImpl := &AppServiceImpl{
mergeUtil: mergeUtil,
pipelineOverrideRepository: pipelineOverrideRepository,
Expand Down Expand Up @@ -195,6 +199,7 @@ func NewAppService(
deploymentConfigService: deploymentConfigService,
envConfigOverrideReadService: envConfigOverrideReadService,
cdWorkflowRunnerService: cdWorkflowRunnerService,
asyncRunnable: asyncRunnable,
}
return appServiceImpl
}
Expand Down Expand Up @@ -320,7 +325,7 @@ func (impl *AppServiceImpl) UpdateDeploymentStatusForGitOpsPipelines(app *v1alph
}
if isSucceeded {
impl.logger.Infow("writing cd success event", "gitHash", gitHash, "pipelineOverride", pipelineOverride)
go impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride)
impl.asyncRunnable.Execute(func() { impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) })
}
} else {
impl.logger.Debugw("event received for older triggered revision", "gitHash", gitHash)
Expand Down
9 changes: 7 additions & 2 deletions pkg/argoApplication/ArgoApplicationServiceExtended.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
application2 "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/devtron-labs/common-lib/async"
"github.com/devtron-labs/common-lib/utils/k8s"
openapi "github.com/devtron-labs/devtron/api/helm-app/openapiClient"
"github.com/devtron-labs/devtron/client/argocdServer"
Expand All @@ -46,6 +47,7 @@ type ArgoApplicationServiceExtendedImpl struct {
argoApplicationReadService read.ArgoApplicationReadService
clusterService cluster.ClusterService
acdClientWrapper argocdServer.ArgoClientWrapperService
asyncRunnable *async.Runnable
}

func NewArgoApplicationServiceExtendedServiceImpl(
Expand All @@ -54,13 +56,15 @@ func NewArgoApplicationServiceExtendedServiceImpl(
acdClientWrapper argocdServer.ArgoClientWrapperService,
argoApplicationReadService read.ArgoApplicationReadService,
clusterService cluster.ClusterService,
asyncRunnable *async.Runnable,
) *ArgoApplicationServiceExtendedImpl {
return &ArgoApplicationServiceExtendedImpl{
aCDAuthConfig: aCDAuthConfig,
clusterService: clusterService,
argoApplicationReadService: argoApplicationReadService,
acdClientWrapper: acdClientWrapper,
ArgoApplicationServiceImpl: argoApplicationServiceImpl,
asyncRunnable: asyncRunnable,
}
}

Expand Down Expand Up @@ -328,7 +332,7 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat
for _, node := range queryNodes {
rQuery := transform(node, query.ApplicationName)
qCount++
go func(request application2.ApplicationResourceRequest) {
runnableFunc := func(request application2.ApplicationResourceRequest) {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
startTime := time.Now()
Expand All @@ -343,7 +347,8 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat
} else {
response <- argoApplication.Result{Response: nil, Error: fmt.Errorf("connection closed by client"), Request: &request}
}
}(*rQuery)
}
c.asyncRunnable.Execute(func() { runnableFunc(*rQuery) })
}

if qCount == 0 {
Expand Down
11 changes: 10 additions & 1 deletion pkg/build/trigger/HandlerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/caarlos0/env"
"github.com/devtron-labs/common-lib/async"
blob_storage "github.com/devtron-labs/common-lib/blob-storage"
"github.com/devtron-labs/common-lib/utils"
bean4 "github.com/devtron-labs/common-lib/utils/bean"
Expand Down Expand Up @@ -116,6 +117,7 @@ type HandlerServiceImpl struct {
clusterService cluster.ClusterService
envService environment.EnvironmentService
K8sUtil *k8s.K8sServiceImpl
asyncRunnable *async.Runnable
}

func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.WorkflowService,
Expand All @@ -141,6 +143,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W
clusterService cluster.ClusterService,
envService environment.EnvironmentService,
K8sUtil *k8s.K8sServiceImpl,
asyncRunnable *async.Runnable,
) *HandlerServiceImpl {
buildxCacheFlags := &BuildxCacheFlags{}
err := env.Parse(buildxCacheFlags)
Expand Down Expand Up @@ -174,6 +177,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W
clusterService: clusterService,
envService: envService,
K8sUtil: K8sUtil,
asyncRunnable: asyncRunnable,
}
config, err := types.GetCiConfig()
if err != nil {
Expand Down Expand Up @@ -628,7 +632,12 @@ func (impl *HandlerServiceImpl) triggerCiPipeline(trigger types.Trigger) (int, e
}

middleware.CiTriggerCounter.WithLabelValues(pipeline.App.AppName, pipeline.Name).Inc()
go impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest)

runnableFunc := func() {
impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest)
}
impl.asyncRunnable.Execute(runnableFunc)

return savedCiWf.Id, err
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/cluster/ClusterService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/devtron-labs/common-lib/async"
informerBean "github.com/devtron-labs/common-lib/informer"
"github.com/devtron-labs/common-lib/utils/k8s/commonBean"
"github.com/devtron-labs/devtron/pkg/cluster/adapter"
Expand Down Expand Up @@ -97,6 +98,7 @@ type ClusterServiceImpl struct {
userRepository repository3.UserRepository
roleGroupRepository repository3.RoleGroupRepository
clusterReadService read.ClusterReadService
asyncRunnable *async.Runnable
}

func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.SugaredLogger,
Expand All @@ -105,7 +107,8 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
roleGroupRepository repository3.RoleGroupRepository,
envVariables *globalUtil.EnvironmentVariables,
cronLogger *cronUtil.CronLoggerImpl,
clusterReadService read.ClusterReadService) (*ClusterServiceImpl, error) {
clusterReadService read.ClusterReadService,
asyncRunnable *async.Runnable) (*ClusterServiceImpl, error) {
clusterService := &ClusterServiceImpl{
clusterRepository: repository,
logger: logger,
Expand All @@ -115,6 +118,7 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
userRepository: userRepository,
roleGroupRepository: roleGroupRepository,
clusterReadService: clusterReadService,
asyncRunnable: asyncRunnable,
}
// initialise cron
newCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
Expand Down Expand Up @@ -782,7 +786,7 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB
continue
}
wg.Add(1)
go func(idx int, cluster *bean.ClusterBean) {
runnableFunc := func(idx int, cluster *bean.ClusterBean) {
defer wg.Done()
clusterConfig := cluster.GetClusterConfig()
_, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig)
Expand All @@ -796,7 +800,8 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB
id = idx
}
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap)
}(idx, cluster)
}
impl.asyncRunnable.Execute(func() { runnableFunc(idx, cluster) })
}

wg.Wait()
Expand Down
21 changes: 15 additions & 6 deletions pkg/clusterTerminalAccess/UserTerminalAccessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/caarlos0/env/v6"
"github.com/devtron-labs/common-lib/async"
k8s2 "github.com/devtron-labs/common-lib/utils/k8s"
"github.com/devtron-labs/devtron/api/helm-app/service/bean"
"github.com/devtron-labs/devtron/internal/sql/models"
Expand All @@ -43,10 +49,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"strconv"
"strings"
"sync"
"time"
)

type UserTerminalAccessService interface {
Expand Down Expand Up @@ -75,6 +77,7 @@ type UserTerminalAccessServiceImpl struct {
terminalSessionHandler terminal.TerminalSessionHandler
K8sCapacityService capacity.K8sCapacityService
k8sUtil *k8s2.K8sServiceImpl
asyncRunnable *async.Runnable
}

type UserTerminalAccessSessionData struct {
Expand All @@ -98,7 +101,12 @@ func GetTerminalAccessConfig() (*models.UserTerminalSessionConfig, error) {
return config, err
}

func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessRepository repository.TerminalAccessRepository, config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, terminalSessionHandler terminal.TerminalSessionHandler, K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, cronLogger *cron3.CronLoggerImpl) (*UserTerminalAccessServiceImpl, error) {
func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger,
terminalAccessRepository repository.TerminalAccessRepository,
config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService,
terminalSessionHandler terminal.TerminalSessionHandler,
K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl,
cronLogger *cron3.CronLoggerImpl, asyncRunnable *async.Runnable) (*UserTerminalAccessServiceImpl, error) {
//fetches all running and starting entities from db and start SyncStatus
podStatusSyncCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
terminalAccessDataArrayMutex := &sync.RWMutex{}
Expand All @@ -114,14 +122,15 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR
terminalSessionHandler: terminalSessionHandler,
K8sCapacityService: K8sCapacityService,
k8sUtil: k8sUtil,
asyncRunnable: asyncRunnable,
}
podStatusSyncCron.Start()
_, err := podStatusSyncCron.AddFunc(fmt.Sprintf("@every %ds", config.TerminalPodStatusSyncTimeInSecs), accessServiceImpl.SyncPodStatus)
if err != nil {
logger.Errorw("error occurred while starting cron job", "time in secs", config.TerminalPodStatusSyncTimeInSecs)
return nil, err
}
go accessServiceImpl.SyncRunningInstances()
accessServiceImpl.asyncRunnable.Execute(func() { accessServiceImpl.SyncRunningInstances() })
return accessServiceImpl, err
}
func (impl *UserTerminalAccessServiceImpl) ValidateShell(podName, namespace, shellName, containerName string, clusterId int) (bool, string, error) {
Expand Down
Loading