@@ -20,6 +20,8 @@ import (
20
20
"context"
21
21
"encoding/json"
22
22
"fmt"
23
+ cronUtil "github.com/devtron-labs/devtron/util/cron"
24
+ "github.com/robfig/cron/v3"
23
25
"log"
24
26
"net/http"
25
27
"net/url"
@@ -43,7 +45,7 @@ import (
43
45
"github.com/devtron-labs/devtron/internal/constants"
44
46
"github.com/devtron-labs/devtron/internal/util"
45
47
"github.com/devtron-labs/devtron/pkg/cluster/repository"
46
- util2 "github.com/devtron-labs/devtron/util"
48
+ globalUtil "github.com/devtron-labs/devtron/util"
47
49
"github.com/go-pg/pg"
48
50
"go.uber.org/zap"
49
51
)
@@ -201,7 +203,9 @@ type ClusterServiceImpl struct {
201
203
func NewClusterServiceImpl (repository repository.ClusterRepository , logger * zap.SugaredLogger ,
202
204
K8sUtil * k8s.K8sServiceImpl , K8sInformerFactory informer.K8sInformerFactory ,
203
205
userAuthRepository repository3.UserAuthRepository , userRepository repository3.UserRepository ,
204
- roleGroupRepository repository3.RoleGroupRepository ) * ClusterServiceImpl {
206
+ roleGroupRepository repository3.RoleGroupRepository ,
207
+ envVariables * globalUtil.EnvironmentVariables ,
208
+ cronLogger * cronUtil.CronLoggerImpl ) (* ClusterServiceImpl , error ) {
205
209
clusterService := & ClusterServiceImpl {
206
210
clusterRepository : repository ,
207
211
logger : logger ,
@@ -211,8 +215,19 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
211
215
userRepository : userRepository ,
212
216
roleGroupRepository : roleGroupRepository ,
213
217
}
218
+ // initialise cron
219
+ newCron := cron .New (cron .WithChain (cron .Recover (cronLogger )))
220
+ newCron .Start ()
221
+ cfg := envVariables .GlobalClusterConfig
222
+ // add function into cron
223
+ _ , err := newCron .AddFunc (fmt .Sprintf ("@every %dm" , cfg .ClusterStatusCronTime ), clusterService .getAndUpdateClusterConnectionStatus )
224
+ if err != nil {
225
+ fmt .Println ("error in adding cron function into cluster cron service" )
226
+ return clusterService , err
227
+ }
228
+ logger .Infow ("cluster cron service started successfully!" , "cronTime" , cfg .ClusterStatusCronTime )
214
229
go clusterService .buildInformer ()
215
- return clusterService
230
+ return clusterService , nil
216
231
}
217
232
218
233
func (impl * ClusterServiceImpl ) ConvertClusterBeanToCluster (clusterBean * ClusterBean , userId int32 ) * repository.Cluster {
@@ -242,6 +257,20 @@ func (impl *ClusterServiceImpl) ConvertClusterBeanToCluster(clusterBean *Cluster
242
257
return model
243
258
}
244
259
260
+ // getAndUpdateClusterConnectionStatus is a cron function to update the connection status of all clusters
261
+ func (impl * ClusterServiceImpl ) getAndUpdateClusterConnectionStatus () {
262
+ impl .logger .Debug ("starting cluster connection status fetch thread" )
263
+ defer impl .logger .Debug ("stopped cluster connection status fetch thread" )
264
+
265
+ //getting all clusters
266
+ clusters , err := impl .FindAllExceptVirtual ()
267
+ if err != nil {
268
+ impl .logger .Errorw ("error in getting all clusters" , "err" , err )
269
+ return
270
+ }
271
+ impl .ConnectClustersInBatch (clusters , true )
272
+ }
273
+
245
274
func (impl * ClusterServiceImpl ) Save (parent context.Context , bean * ClusterBean , userId int32 ) (* ClusterBean , error ) {
246
275
//validating config
247
276
@@ -289,7 +318,7 @@ func (impl *ClusterServiceImpl) Save(parent context.Context, bean *ClusterBean,
289
318
290
319
//on successful creation of new cluster, update informer cache for namespace group by cluster
291
320
//here sync for ea mode only
292
- if util2 .IsBaseStack () {
321
+ if globalUtil .IsBaseStack () {
293
322
impl .SyncNsInformer (bean )
294
323
}
295
324
impl .logger .Info ("saving secret for cluster informer" )
@@ -530,7 +559,7 @@ func (impl *ClusterServiceImpl) Update(ctx context.Context, bean *ClusterBean, u
530
559
bean .Id = model .Id
531
560
532
561
//here sync for ea mode only
533
- if bean .HasConfigOrUrlChanged && util2 .IsBaseStack () {
562
+ if bean .HasConfigOrUrlChanged && globalUtil .IsBaseStack () {
534
563
impl .SyncNsInformer (bean )
535
564
}
536
565
impl .logger .Infow ("saving secret for cluster informer" )
@@ -643,7 +672,7 @@ func (impl *ClusterServiceImpl) buildInformer() {
643
672
impl .K8sInformerFactory .BuildInformer (clusterInfo )
644
673
}
645
674
646
- func (impl ClusterServiceImpl ) DeleteFromDb (bean * ClusterBean , userId int32 ) error {
675
+ func (impl * ClusterServiceImpl ) DeleteFromDb (bean * ClusterBean , userId int32 ) error {
647
676
existingCluster , err := impl .clusterRepository .FindById (bean .Id )
648
677
if err != nil {
649
678
impl .logger .Errorw ("No matching entry found for delete." , "id" , bean .Id )
@@ -668,7 +697,7 @@ func (impl ClusterServiceImpl) DeleteFromDb(bean *ClusterBean, userId int32) err
668
697
return nil
669
698
}
670
699
671
- func (impl ClusterServiceImpl ) CheckIfConfigIsValid (cluster * ClusterBean ) error {
700
+ func (impl * ClusterServiceImpl ) CheckIfConfigIsValid (cluster * ClusterBean ) error {
672
701
clusterConfig := cluster .GetClusterConfig ()
673
702
response , err := impl .K8sUtil .DiscoveryClientGetLiveZCall (clusterConfig )
674
703
if err != nil {
@@ -1068,7 +1097,7 @@ func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sCli
1068
1097
mutex .Unlock ()
1069
1098
}
1070
1099
1071
- func (impl ClusterServiceImpl ) ConvertClusterBeanObjectToCluster (bean * ClusterBean ) * v1alpha1.Cluster {
1100
+ func (impl * ClusterServiceImpl ) ConvertClusterBeanObjectToCluster (bean * ClusterBean ) * v1alpha1.Cluster {
1072
1101
configMap := bean .Config
1073
1102
serverUrl := bean .ServerUrl
1074
1103
bearerToken := ""
@@ -1097,7 +1126,7 @@ func (impl ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBe
1097
1126
return cl
1098
1127
}
1099
1128
1100
- func (impl ClusterServiceImpl ) GetClusterConfigByClusterId (clusterId int ) (* k8s.ClusterConfig , error ) {
1129
+ func (impl * ClusterServiceImpl ) GetClusterConfigByClusterId (clusterId int ) (* k8s.ClusterConfig , error ) {
1101
1130
clusterBean , err := impl .FindById (clusterId )
1102
1131
if err != nil {
1103
1132
impl .logger .Errorw ("error in getting clusterBean by cluster id" , "err" , err , "clusterId" , clusterId )
@@ -1108,7 +1137,7 @@ func (impl ClusterServiceImpl) GetClusterConfigByClusterId(clusterId int) (*k8s.
1108
1137
return clusterConfig , nil
1109
1138
}
1110
1139
1111
- func (impl ClusterServiceImpl ) IsClusterReachable (clusterId int ) (bool , error ) {
1140
+ func (impl * ClusterServiceImpl ) IsClusterReachable (clusterId int ) (bool , error ) {
1112
1141
cluster , err := impl .clusterRepository .FindById (clusterId )
1113
1142
if err != nil {
1114
1143
impl .logger .Errorw ("error in finding cluster from clusterId" , "envId" , clusterId )
0 commit comments