Skip to content

Commit 0a29c22

Browse files
committed
panic recovered go-routines
1 parent 2335f1f commit 0a29c22

File tree

5 files changed

+37
-15
lines changed

5 files changed

+37
-15
lines changed

api/restHandler/BatchOperationRestHandler.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"net/http"
2424

25+
"github.com/devtron-labs/common-lib/async"
2526
"github.com/devtron-labs/devtron/api/restHandler/common"
2627
"github.com/devtron-labs/devtron/pkg/apis/devtron/v1"
2728
"github.com/devtron-labs/devtron/pkg/apis/devtron/v1/validation"
@@ -44,17 +45,19 @@ type BatchOperationRestHandlerImpl struct {
4445
teamService team.TeamService
4546
logger *zap.SugaredLogger
4647
enforcerUtil rbac.EnforcerUtil
48+
asyncRunnable *async.Runnable
4749
}
4850

4951
func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer casbin.Enforcer, workflowAction batch.WorkflowAction,
50-
teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil) *BatchOperationRestHandlerImpl {
52+
teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil, asyncRunnable *async.Runnable) *BatchOperationRestHandlerImpl {
5153
return &BatchOperationRestHandlerImpl{
5254
userAuthService: userAuthService,
5355
enforcer: enforcer,
5456
workflowAction: workflowAction,
5557
teamService: teamService,
5658
logger: logger,
5759
enforcerUtil: enforcerUtil,
60+
asyncRunnable: asyncRunnable,
5861
}
5962
}
6063

@@ -104,13 +107,14 @@ func (handler BatchOperationRestHandlerImpl) Operate(w http.ResponseWriter, r *h
104107

105108
ctx, cancel := context.WithCancel(r.Context())
106109
if cn, ok := w.(http.CloseNotifier); ok {
107-
go func(done <-chan struct{}, closed <-chan bool) {
110+
runnableFunc := func(done <-chan struct{}, closed <-chan bool) {
108111
select {
109112
case <-done:
110113
case <-closed:
111114
cancel()
112115
}
113-
}(ctx.Done(), cn.CloseNotify())
116+
}
117+
handler.asyncRunnable.Execute(func() { runnableFunc(ctx.Done(), cn.CloseNotify()) })
114118
}
115119
err = handler.workflowAction.Execute(&workflow, emptyProps, r.Context())
116120
if err != nil {

pkg/argoApplication/ArgoApplicationServiceExtended.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
application2 "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
2424
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
25+
"github.com/devtron-labs/common-lib/async"
2526
"github.com/devtron-labs/common-lib/utils/k8s"
2627
openapi "github.com/devtron-labs/devtron/api/helm-app/openapiClient"
2728
"github.com/devtron-labs/devtron/client/argocdServer"
@@ -46,6 +47,7 @@ type ArgoApplicationServiceExtendedImpl struct {
4647
argoApplicationReadService read.ArgoApplicationReadService
4748
clusterService cluster.ClusterService
4849
acdClientWrapper argocdServer.ArgoClientWrapperService
50+
asyncRunnable *async.Runnable
4951
}
5052

5153
func NewArgoApplicationServiceExtendedServiceImpl(
@@ -54,13 +56,15 @@ func NewArgoApplicationServiceExtendedServiceImpl(
5456
acdClientWrapper argocdServer.ArgoClientWrapperService,
5557
argoApplicationReadService read.ArgoApplicationReadService,
5658
clusterService cluster.ClusterService,
59+
asyncRunnable *async.Runnable,
5760
) *ArgoApplicationServiceExtendedImpl {
5861
return &ArgoApplicationServiceExtendedImpl{
5962
aCDAuthConfig: aCDAuthConfig,
6063
clusterService: clusterService,
6164
argoApplicationReadService: argoApplicationReadService,
6265
acdClientWrapper: acdClientWrapper,
6366
ArgoApplicationServiceImpl: argoApplicationServiceImpl,
67+
asyncRunnable: asyncRunnable,
6468
}
6569
}
6670

@@ -328,7 +332,7 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat
328332
for _, node := range queryNodes {
329333
rQuery := transform(node, query.ApplicationName)
330334
qCount++
331-
go func(request application2.ApplicationResourceRequest) {
335+
runnableFunc := func(request application2.ApplicationResourceRequest) {
332336
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
333337
defer cancel()
334338
startTime := time.Now()
@@ -343,7 +347,8 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat
343347
} else {
344348
response <- argoApplication.Result{Response: nil, Error: fmt.Errorf("connection closed by client"), Request: &request}
345349
}
346-
}(*rQuery)
350+
}
351+
c.asyncRunnable.Execute(func() { runnableFunc(*rQuery) })
347352
}
348353

349354
if qCount == 0 {

pkg/cluster/ClusterService.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"github.com/devtron-labs/common-lib/async"
2324
informerBean "github.com/devtron-labs/common-lib/informer"
2425
"github.com/devtron-labs/common-lib/utils/k8s/commonBean"
2526
"github.com/devtron-labs/devtron/pkg/cluster/adapter"
@@ -97,6 +98,7 @@ type ClusterServiceImpl struct {
9798
userRepository repository3.UserRepository
9899
roleGroupRepository repository3.RoleGroupRepository
99100
clusterReadService read.ClusterReadService
101+
asyncRunnable *async.Runnable
100102
}
101103

102104
func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.SugaredLogger,
@@ -105,7 +107,8 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
105107
roleGroupRepository repository3.RoleGroupRepository,
106108
envVariables *globalUtil.EnvironmentVariables,
107109
cronLogger *cronUtil.CronLoggerImpl,
108-
clusterReadService read.ClusterReadService) (*ClusterServiceImpl, error) {
110+
clusterReadService read.ClusterReadService,
111+
asyncRunnable *async.Runnable) (*ClusterServiceImpl, error) {
109112
clusterService := &ClusterServiceImpl{
110113
clusterRepository: repository,
111114
logger: logger,
@@ -115,6 +118,7 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
115118
userRepository: userRepository,
116119
roleGroupRepository: roleGroupRepository,
117120
clusterReadService: clusterReadService,
121+
asyncRunnable: asyncRunnable,
118122
}
119123
// initialise cron
120124
newCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
@@ -782,7 +786,7 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB
782786
continue
783787
}
784788
wg.Add(1)
785-
go func(idx int, cluster *bean.ClusterBean) {
789+
runnableFunc := func(idx int, cluster *bean.ClusterBean) {
786790
defer wg.Done()
787791
clusterConfig := cluster.GetClusterConfig()
788792
_, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig)
@@ -796,7 +800,8 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB
796800
id = idx
797801
}
798802
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap)
799-
}(idx, cluster)
803+
}
804+
impl.asyncRunnable.Execute(func() { runnableFunc(idx, cluster) })
800805
}
801806

802807
wg.Wait()

pkg/k8s/K8sCommonService.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"github.com/caarlos0/env"
23+
"github.com/devtron-labs/common-lib/async"
2324
"github.com/devtron-labs/common-lib/utils/k8s"
2425
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
2526
"github.com/devtron-labs/devtron/api/bean/AppView"
@@ -75,6 +76,7 @@ type K8sCommonServiceImpl struct {
7576
K8sApplicationServiceConfig *K8sApplicationServiceConfig
7677
argoApplicationConfigService config.ArgoApplicationConfigService
7778
ClusterReadService read.ClusterReadService
79+
asyncRunnable *async.Runnable
7880
}
7981
type K8sApplicationServiceConfig struct {
8082
BatchSize int `env:"BATCH_SIZE" envDefault:"5" description:"there is feature to get URL's of services/ingresses. so to extract those, we need to parse all the servcie and ingress objects of the application. this BATCH_SIZE flag controls the no of these objects get parsed in one go."`
@@ -83,7 +85,7 @@ type K8sApplicationServiceConfig struct {
8385

8486
func NewK8sCommonServiceImpl(Logger *zap.SugaredLogger, k8sUtils *k8s.K8sServiceImpl,
8587
argoApplicationConfigService config.ArgoApplicationConfigService,
86-
ClusterReadService read.ClusterReadService) *K8sCommonServiceImpl {
88+
ClusterReadService read.ClusterReadService, asyncRunnable *async.Runnable) *K8sCommonServiceImpl {
8789
cfg := &K8sApplicationServiceConfig{}
8890
err := env.Parse(cfg)
8991
if err != nil {
@@ -95,6 +97,7 @@ func NewK8sCommonServiceImpl(Logger *zap.SugaredLogger, k8sUtils *k8s.K8sService
9597
K8sApplicationServiceConfig: cfg,
9698
argoApplicationConfigService: argoApplicationConfigService,
9799
ClusterReadService: ClusterReadService,
100+
asyncRunnable: asyncRunnable,
98101
}
99102
}
100103

@@ -295,15 +298,18 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque
295298
var res []bean5.BatchResourceResponse
296299
ctx, cancel := context.WithTimeout(ctx, time.Duration(impl.K8sApplicationServiceConfig.TimeOutInSeconds)*time.Second)
297300
defer cancel()
298-
go func() {
301+
302+
runnableFunc := func() {
299303
ans := impl.getManifestsByBatch(ctx, requests)
300304
select {
301305
case <-ctx.Done():
302306
return
303307
default:
304308
ch <- ans
305309
}
306-
}()
310+
}
311+
impl.asyncRunnable.Execute(runnableFunc)
312+
307313
select {
308314
case ans := <-ch:
309315
res = ans
@@ -398,7 +404,7 @@ func (impl *K8sCommonServiceImpl) getManifestsByBatch(ctx context.Context, reque
398404
var wg sync.WaitGroup
399405
for j := 0; j < batchSize; j++ {
400406
wg.Add(1)
401-
go func(j int) {
407+
runnableFunc := func(j int) {
402408
resp := bean5.BatchResourceResponse{}
403409
response, err := impl.GetResource(ctx, &requests[i+j])
404410
if response != nil {
@@ -407,7 +413,8 @@ func (impl *K8sCommonServiceImpl) getManifestsByBatch(ctx context.Context, reque
407413
resp.Err = err
408414
res[i+j] = resp
409415
wg.Done()
410-
}(j)
416+
}
417+
impl.asyncRunnable.Execute(func() { runnableFunc(j) })
411418
}
412419
wg.Wait()
413420
i += batchSize

pkg/workflow/dag/WorkflowDagExecutor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,15 +1075,16 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger
10751075
for j := 0; j < batchSize; j++ {
10761076
wg.Add(1)
10771077
index := i + j
1078-
go func(index int) {
1078+
runnableFunc := func(index int) {
10791079
defer wg.Done()
10801080
ciArtifact := ciArtifactArr[index]
10811081
// handle individual CiArtifact success event
10821082
err = impl.handleCiSuccessEvent(triggerContext, ciArtifact, async, request.UserId)
10831083
if err != nil {
10841084
impl.logger.Errorw("error on handle ci success event", "ciArtifactId", ciArtifact.Id, "err", err)
10851085
}
1086-
}(index)
1086+
}
1087+
impl.asyncRunnable.Execute(func() { runnableFunc(index) })
10871088
}
10881089
wg.Wait()
10891090
i += batchSize

0 commit comments

Comments
 (0)