Skip to content

Commit da1bcf1

Browse files
committed
panic recovered go-routines
1 parent 0a29c22 commit da1bcf1

File tree

10 files changed

+100
-56
lines changed

10 files changed

+100
-56
lines changed

pkg/app/AppService.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"net/url"
24+
"strconv"
25+
"time"
26+
2327
health2 "github.com/argoproj/gitops-engine/pkg/health"
2428
argoApplication "github.com/devtron-labs/devtron/client/argocdServer/bean"
2529
"github.com/devtron-labs/devtron/internal/sql/models"
@@ -40,12 +44,10 @@ import (
4044
"github.com/devtron-labs/devtron/pkg/deployment/manifest/deploymentTemplate/read"
4145
bean4 "github.com/devtron-labs/devtron/pkg/deployment/trigger/devtronApps/bean"
4246
"github.com/devtron-labs/devtron/pkg/workflow/cd"
43-
"net/url"
44-
"strconv"
45-
"time"
4647

4748
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
4849
"github.com/caarlos0/env"
50+
"github.com/devtron-labs/common-lib/async"
4951
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
5052
"github.com/devtron-labs/common-lib/utils/k8s/health"
5153
"github.com/devtron-labs/devtron/api/bean"
@@ -124,6 +126,7 @@ type AppServiceImpl struct {
124126
deploymentConfigService common2.DeploymentConfigService
125127
envConfigOverrideReadService read.EnvConfigOverrideService
126128
cdWorkflowRunnerService cd.CdWorkflowRunnerService
129+
asyncRunnable *async.Runnable
127130
}
128131

129132
type AppService interface {
@@ -151,7 +154,7 @@ func NewAppService(
151154
cdWorkflowRepository pipelineConfig.CdWorkflowRepository,
152155
commonService commonService.CommonService,
153156
chartTemplateService internalUtil.ChartTemplateService,
154-
cdPipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository,
157+
pipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository,
155158
pipelineStatusTimelineResourcesService status2.PipelineStatusTimelineResourcesService,
156159
pipelineStatusSyncDetailService status2.PipelineStatusSyncDetailService,
157160
pipelineStatusTimelineService status2.PipelineStatusTimelineService,
@@ -164,7 +167,8 @@ func NewAppService(
164167
appListingService AppListingService,
165168
deploymentConfigService common2.DeploymentConfigService,
166169
envConfigOverrideReadService read.EnvConfigOverrideService,
167-
cdWorkflowRunnerService cd.CdWorkflowRunnerService) *AppServiceImpl {
170+
cdWorkflowRunnerService cd.CdWorkflowRunnerService,
171+
asyncRunnable *async.Runnable) *AppServiceImpl {
168172
appServiceImpl := &AppServiceImpl{
169173
mergeUtil: mergeUtil,
170174
pipelineOverrideRepository: pipelineOverrideRepository,
@@ -178,7 +182,7 @@ func NewAppService(
178182
cdWorkflowRepository: cdWorkflowRepository,
179183
commonService: commonService,
180184
chartTemplateService: chartTemplateService,
181-
pipelineStatusTimelineRepository: cdPipelineStatusTimelineRepo,
185+
pipelineStatusTimelineRepository: pipelineStatusTimelineRepo,
182186
pipelineStatusTimelineResourcesService: pipelineStatusTimelineResourcesService,
183187
pipelineStatusSyncDetailService: pipelineStatusSyncDetailService,
184188
pipelineStatusTimelineService: pipelineStatusTimelineService,
@@ -195,6 +199,7 @@ func NewAppService(
195199
deploymentConfigService: deploymentConfigService,
196200
envConfigOverrideReadService: envConfigOverrideReadService,
197201
cdWorkflowRunnerService: cdWorkflowRunnerService,
202+
asyncRunnable: asyncRunnable,
198203
}
199204
return appServiceImpl
200205
}
@@ -320,7 +325,7 @@ func (impl *AppServiceImpl) UpdateDeploymentStatusForGitOpsPipelines(app *v1alph
320325
}
321326
if isSucceeded {
322327
impl.logger.Infow("writing cd success event", "gitHash", gitHash, "pipelineOverride", pipelineOverride)
323-
go impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride)
328+
impl.asyncRunnable.Execute(func() { impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) })
324329
}
325330
} else {
326331
impl.logger.Debugw("event received for older triggered revision", "gitHash", gitHash)

pkg/clusterTerminalAccess/UserTerminalAccessService.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,18 @@ import (
2121
"encoding/json"
2222
"errors"
2323
"fmt"
24+
"strconv"
25+
"strings"
26+
"sync"
27+
"time"
28+
2429
"github.com/caarlos0/env/v6"
30+
"github.com/devtron-labs/common-lib/async"
2531
k8s2 "github.com/devtron-labs/common-lib/utils/k8s"
2632
"github.com/devtron-labs/devtron/api/helm-app/service/bean"
2733
"github.com/devtron-labs/devtron/internal/sql/models"
2834
"github.com/devtron-labs/devtron/internal/sql/repository"
35+
"github.com/devtron-labs/devtron/pkg/asyncProvider"
2936
utils1 "github.com/devtron-labs/devtron/pkg/clusterTerminalAccess/clusterTerminalUtils"
3037
"github.com/devtron-labs/devtron/pkg/k8s"
3138
bean2 "github.com/devtron-labs/devtron/pkg/k8s/bean"
@@ -43,10 +50,6 @@ import (
4350
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4451
"k8s.io/apimachinery/pkg/runtime"
4552
"k8s.io/apimachinery/pkg/runtime/schema"
46-
"strconv"
47-
"strings"
48-
"sync"
49-
"time"
5053
)
5154

5255
type UserTerminalAccessService interface {
@@ -75,6 +78,7 @@ type UserTerminalAccessServiceImpl struct {
7578
terminalSessionHandler terminal.TerminalSessionHandler
7679
K8sCapacityService capacity.K8sCapacityService
7780
k8sUtil *k8s2.K8sServiceImpl
81+
asyncRunnable *async.Runnable
7882
}
7983

8084
type UserTerminalAccessSessionData struct {
@@ -114,14 +118,15 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR
114118
terminalSessionHandler: terminalSessionHandler,
115119
K8sCapacityService: K8sCapacityService,
116120
k8sUtil: k8sUtil,
121+
asyncRunnable: asyncProvider.NewAsyncRunnable(logger),
117122
}
118123
podStatusSyncCron.Start()
119124
_, err := podStatusSyncCron.AddFunc(fmt.Sprintf("@every %ds", config.TerminalPodStatusSyncTimeInSecs), accessServiceImpl.SyncPodStatus)
120125
if err != nil {
121126
logger.Errorw("error occurred while starting cron job", "time in secs", config.TerminalPodStatusSyncTimeInSecs)
122127
return nil, err
123128
}
124-
go accessServiceImpl.SyncRunningInstances()
129+
accessServiceImpl.asyncRunnable.Execute(func() { accessServiceImpl.SyncRunningInstances() })
125130
return accessServiceImpl, err
126131
}
127132
func (impl *UserTerminalAccessServiceImpl) ValidateShell(podName, namespace, shellName, containerName string, clusterId int) (bool, string, error) {

pkg/deployment/deployedApp/status/resourceTree/ResourceTreeService.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ package resourceTree
1919
import (
2020
"context"
2121
"fmt"
22+
"github.com/devtron-labs/common-lib/async"
23+
"strconv"
24+
"time"
25+
2226
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
2327
"github.com/argoproj/gitops-engine/pkg/health"
2428
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
@@ -42,8 +46,6 @@ import (
4246
application2 "github.com/devtron-labs/devtron/pkg/k8s/application"
4347
util2 "github.com/devtron-labs/devtron/util"
4448
"go.uber.org/zap"
45-
"strconv"
46-
"time"
4749
)
4850

4951
type Service interface {
@@ -66,6 +68,7 @@ type ServiceImpl struct {
6668
k8sApplicationService application2.K8sApplicationService
6769
k8sCommonService k8s.K8sCommonService
6870
environmentReadService read2.EnvironmentReadService
71+
asyncRunnable *async.Runnable
6972
}
7073

7174
func NewServiceImpl(logger *zap.SugaredLogger,
@@ -78,6 +81,7 @@ func NewServiceImpl(logger *zap.SugaredLogger,
7881
k8sApplicationService application2.K8sApplicationService,
7982
k8sCommonService k8s.K8sCommonService,
8083
environmentReadService read2.EnvironmentReadService,
84+
asyncRunnable *async.Runnable,
8185
) *ServiceImpl {
8286
serviceImpl := &ServiceImpl{
8387
logger: logger,
@@ -90,6 +94,7 @@ func NewServiceImpl(logger *zap.SugaredLogger,
9094
k8sApplicationService: k8sApplicationService,
9195
k8sCommonService: k8sCommonService,
9296
environmentReadService: environmentReadService,
97+
asyncRunnable: asyncRunnable,
9398
}
9499
return serviceImpl
95100
}
@@ -164,7 +169,7 @@ func (impl *ServiceImpl) FetchResourceTree(ctx context.Context, appId int, envId
164169
}
165170
}
166171
resourceTree = util2.InterfaceToMapAdapter(resp)
167-
go func() {
172+
impl.asyncRunnable.Execute(func() {
168173
if resp.Status == string(health.HealthStatusHealthy) {
169174
err = impl.cdApplicationStatusUpdateHandler.SyncPipelineStatusForResourceTreeCall(cdPipeline)
170175
if err != nil {
@@ -176,7 +181,7 @@ func (impl *ServiceImpl) FetchResourceTree(ctx context.Context, appId int, envId
176181
if err != nil {
177182
impl.logger.Warnw("error in updating app status", "err", err, "appId", cdPipeline.AppId, "envId", cdPipeline.EnvironmentId)
178183
}
179-
}()
184+
})
180185
k8sAppDetail := AppView.AppDetailContainer{
181186
DeploymentDetailContainer: AppView.DeploymentDetailContainer{
182187
ClusterId: cdPipeline.Environment.ClusterId,

pkg/deployment/trigger/devtronApps/HandlerService.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ package devtronApps
1919
import (
2020
"bufio"
2121
"context"
22+
"os"
23+
"time"
24+
25+
"github.com/devtron-labs/common-lib/async"
2226
pubsub "github.com/devtron-labs/common-lib/pubsub-lib"
2327
util5 "github.com/devtron-labs/common-lib/utils/k8s"
2428
bean3 "github.com/devtron-labs/devtron/api/bean"
@@ -36,6 +40,7 @@ import (
3640
"github.com/devtron-labs/devtron/pkg/app"
3741
bean4 "github.com/devtron-labs/devtron/pkg/app/bean"
3842
"github.com/devtron-labs/devtron/pkg/app/status"
43+
"github.com/devtron-labs/devtron/pkg/asyncProvider"
3944
"github.com/devtron-labs/devtron/pkg/attributes"
4045
"github.com/devtron-labs/devtron/pkg/auth/user"
4146
userBean "github.com/devtron-labs/devtron/pkg/auth/user/bean"
@@ -70,8 +75,6 @@ import (
7075
util2 "github.com/devtron-labs/devtron/util/event"
7176
"github.com/devtron-labs/devtron/util/rbac"
7277
"go.uber.org/zap"
73-
"os"
74-
"time"
7578
)
7679

7780
/*
@@ -166,6 +169,7 @@ type HandlerServiceImpl struct {
166169
ciLogService pipeline.CiLogService
167170
workflowService executor.WorkflowService
168171
blobConfigStorageService pipeline.BlobStorageConfigService
172+
asyncRunnable *async.Runnable
169173
}
170174

171175
func NewHandlerServiceImpl(logger *zap.SugaredLogger,
@@ -293,6 +297,7 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
293297
ciLogService: ciLogService,
294298
workflowService: workflowService,
295299
blobConfigStorageService: blobConfigStorageService,
300+
asyncRunnable: asyncProvider.NewAsyncRunnable(logger),
296301
}
297302
config, err := types.GetCdConfig()
298303
if err != nil {

pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"net/http"
24+
"path"
25+
"regexp"
26+
"strconv"
27+
"strings"
28+
"time"
29+
2330
bean3 "github.com/devtron-labs/devtron/api/bean"
2431
"github.com/devtron-labs/devtron/api/bean/gitOps"
2532
bean6 "github.com/devtron-labs/devtron/api/helm-app/bean"
@@ -57,12 +64,6 @@ import (
5764
"google.golang.org/grpc/codes"
5865
status2 "google.golang.org/grpc/status"
5966
"helm.sh/helm/v3/pkg/chart"
60-
"net/http"
61-
"path"
62-
"regexp"
63-
"strconv"
64-
"strings"
65-
"time"
6667
)
6768

6869
func (impl *HandlerServiceImpl) TriggerStageForBulk(triggerRequest bean.TriggerRequest) error {
@@ -741,7 +742,9 @@ func (impl *HandlerServiceImpl) triggerPipeline(overrideRequest *bean3.ValuesOve
741742
}
742743
}
743744

744-
go impl.writeCDTriggerEvent(overrideRequest, valuesOverrideResponse.Artifact, valuesOverrideResponse.PipelineOverride.PipelineReleaseCounter, valuesOverrideResponse.PipelineOverride.Id, overrideRequest.WfrId)
745+
impl.asyncRunnable.Execute(func() {
746+
impl.writeCDTriggerEvent(overrideRequest, valuesOverrideResponse.Artifact, valuesOverrideResponse.PipelineOverride.PipelineReleaseCounter, valuesOverrideResponse.PipelineOverride.Id, overrideRequest.WfrId)
747+
})
745748

746749
_ = impl.markImageScanDeployed(newCtx, overrideRequest.AppId, overrideRequest.EnvId, overrideRequest.ClusterId,
747750
valuesOverrideResponse.Artifact.ImageDigest, valuesOverrideResponse.Artifact.ScanEnabled, valuesOverrideResponse.Artifact.Image)

pkg/eventProcessor/in/WorkflowEventProcessorService.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@ import (
2121
"encoding/json"
2222
"errors"
2323
"fmt"
24+
"slices"
25+
"strconv"
26+
"sync"
27+
"time"
28+
2429
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
30+
"github.com/devtron-labs/common-lib/async"
31+
commonConstants "github.com/devtron-labs/common-lib/constants"
2532
pubsub "github.com/devtron-labs/common-lib/pubsub-lib"
2633
"github.com/devtron-labs/common-lib/pubsub-lib/model"
2734
"github.com/devtron-labs/common-lib/utils/registry"
@@ -63,10 +70,6 @@ import (
6370
"go.uber.org/zap"
6471
"gopkg.in/go-playground/validator.v9"
6572
"k8s.io/utils/pointer"
66-
"slices"
67-
"strconv"
68-
"sync"
69-
"time"
7073
)
7174

7275
type WorkflowEventProcessorImpl struct {
@@ -90,6 +93,7 @@ type WorkflowEventProcessorImpl struct {
9093
cdPipelineConfigService pipeline.CdPipelineConfigService
9194
userDeploymentRequestService service.UserDeploymentRequestService
9295
ucid ucid.Service
96+
asyncRunnable *async.Runnable
9397

9498
devtronAppReleaseContextMap map[int]bean.DevtronAppReleaseContextType
9599
devtronAppReleaseContextMapLock *sync.Mutex
@@ -156,14 +160,15 @@ func NewWorkflowEventProcessorImpl(logger *zap.SugaredLogger,
156160
cdWorkflowRepository: cdWorkflowRepository,
157161
deploymentConfigService: deploymentConfigService,
158162
ciHandlerService: ciHandlerService,
163+
asyncRunnable: async.NewAsyncRunnable(logger, commonConstants.Orchestrator),
159164
}
160165
appServiceConfig, err := app.GetAppServiceConfig()
161166
if err != nil {
162167
return nil, err
163168
}
164169
impl.appServiceConfig = appServiceConfig
165170
// handle incomplete deployment requests after restart
166-
go impl.ProcessIncompleteDeploymentReq()
171+
impl.asyncRunnable.Execute(func() { impl.ProcessIncompleteDeploymentReq() })
167172
return impl, nil
168173
}
169174

0 commit comments

Comments
 (0)