Skip to content

fix: unimplemented cluster cron service #5781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/cluster/wire_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
var ClusterWireSet = wire.NewSet(
repository.NewClusterRepositoryImpl,
wire.Bind(new(repository.ClusterRepository), new(*repository.ClusterRepositoryImpl)),
cluster.NewClusterServiceImpl,
cluster.NewClusterServiceImplExtended,
wire.Bind(new(cluster.ClusterService), new(*cluster.ClusterServiceImplExtended)),

Expand Down
1 change: 1 addition & 0 deletions api/k8s/application/k8sApplicationRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
handler.logger.Infow("get pod logs request", "request", request)
handler.requestValidationAndRBAC(w, r, token, request)
lastEventId := r.Header.Get(bean2.LastEventID)
isReconnect := false
Expand Down
3 changes: 0 additions & 3 deletions api/k8s/wire_k8sApp.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,4 @@ var K8sApplicationWireSet = wire.NewSet(
informer.NewGlobalMapClusterNamespace,
informer.NewK8sInformerFactoryImpl,
wire.Bind(new(informer.K8sInformerFactory), new(*informer.K8sInformerFactoryImpl)),

cluster.NewClusterCronServiceImpl,
wire.Bind(new(cluster.ClusterCronService), new(*cluster.ClusterCronServiceImpl)),
)
7 changes: 5 additions & 2 deletions cmd/external-app/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 0 additions & 72 deletions pkg/cluster/ClusterCronService.go

This file was deleted.

49 changes: 39 additions & 10 deletions pkg/cluster/ClusterService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
cronUtil "github.com/devtron-labs/devtron/util/cron"
"github.com/robfig/cron/v3"
"log"
"net/http"
"net/url"
Expand All @@ -43,7 +45,7 @@ import (
"github.com/devtron-labs/devtron/internal/constants"
"github.com/devtron-labs/devtron/internal/util"
"github.com/devtron-labs/devtron/pkg/cluster/repository"
util2 "github.com/devtron-labs/devtron/util"
globalUtil "github.com/devtron-labs/devtron/util"
"github.com/go-pg/pg"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -201,7 +203,9 @@ type ClusterServiceImpl struct {
func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.SugaredLogger,
K8sUtil *k8s.K8sServiceImpl, K8sInformerFactory informer.K8sInformerFactory,
userAuthRepository repository3.UserAuthRepository, userRepository repository3.UserRepository,
roleGroupRepository repository3.RoleGroupRepository) *ClusterServiceImpl {
roleGroupRepository repository3.RoleGroupRepository,
envVariables *globalUtil.EnvironmentVariables,
cronLogger *cronUtil.CronLoggerImpl) (*ClusterServiceImpl, error) {
clusterService := &ClusterServiceImpl{
clusterRepository: repository,
logger: logger,
Expand All @@ -211,8 +215,19 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
userRepository: userRepository,
roleGroupRepository: roleGroupRepository,
}
// initialise cron
newCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
newCron.Start()
cfg := envVariables.GlobalClusterConfig
// add function into cron
_, err := newCron.AddFunc(fmt.Sprintf("@every %dm", cfg.ClusterStatusCronTime), clusterService.getAndUpdateClusterConnectionStatus)
if err != nil {
fmt.Println("error in adding cron function into cluster cron service")
return clusterService, err
}
logger.Infow("cluster cron service started successfully!", "cronTime", cfg.ClusterStatusCronTime)
go clusterService.buildInformer()
return clusterService
return clusterService, nil
}

func (impl *ClusterServiceImpl) ConvertClusterBeanToCluster(clusterBean *ClusterBean, userId int32) *repository.Cluster {
Expand Down Expand Up @@ -242,6 +257,20 @@ func (impl *ClusterServiceImpl) ConvertClusterBeanToCluster(clusterBean *Cluster
return model
}

// getAndUpdateClusterConnectionStatus is a cron function to update the connection status of all clusters
func (impl *ClusterServiceImpl) getAndUpdateClusterConnectionStatus() {
impl.logger.Debug("starting cluster connection status fetch thread")
defer impl.logger.Debug("stopped cluster connection status fetch thread")

//getting all clusters
clusters, err := impl.FindAllExceptVirtual()
if err != nil {
impl.logger.Errorw("error in getting all clusters", "err", err)
return
}
impl.ConnectClustersInBatch(clusters, true)
}

func (impl *ClusterServiceImpl) Save(parent context.Context, bean *ClusterBean, userId int32) (*ClusterBean, error) {
//validating config

Expand Down Expand Up @@ -289,7 +318,7 @@ func (impl *ClusterServiceImpl) Save(parent context.Context, bean *ClusterBean,

//on successful creation of new cluster, update informer cache for namespace group by cluster
//here sync for ea mode only
if util2.IsBaseStack() {
if globalUtil.IsBaseStack() {
impl.SyncNsInformer(bean)
}
impl.logger.Info("saving secret for cluster informer")
Expand Down Expand Up @@ -530,7 +559,7 @@ func (impl *ClusterServiceImpl) Update(ctx context.Context, bean *ClusterBean, u
bean.Id = model.Id

//here sync for ea mode only
if bean.HasConfigOrUrlChanged && util2.IsBaseStack() {
if bean.HasConfigOrUrlChanged && globalUtil.IsBaseStack() {
impl.SyncNsInformer(bean)
}
impl.logger.Infow("saving secret for cluster informer")
Expand Down Expand Up @@ -643,7 +672,7 @@ func (impl *ClusterServiceImpl) buildInformer() {
impl.K8sInformerFactory.BuildInformer(clusterInfo)
}

func (impl ClusterServiceImpl) DeleteFromDb(bean *ClusterBean, userId int32) error {
func (impl *ClusterServiceImpl) DeleteFromDb(bean *ClusterBean, userId int32) error {
existingCluster, err := impl.clusterRepository.FindById(bean.Id)
if err != nil {
impl.logger.Errorw("No matching entry found for delete.", "id", bean.Id)
Expand All @@ -668,7 +697,7 @@ func (impl ClusterServiceImpl) DeleteFromDb(bean *ClusterBean, userId int32) err
return nil
}

func (impl ClusterServiceImpl) CheckIfConfigIsValid(cluster *ClusterBean) error {
func (impl *ClusterServiceImpl) CheckIfConfigIsValid(cluster *ClusterBean) error {
clusterConfig := cluster.GetClusterConfig()
response, err := impl.K8sUtil.DiscoveryClientGetLiveZCall(clusterConfig)
if err != nil {
Expand Down Expand Up @@ -1068,7 +1097,7 @@ func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sCli
mutex.Unlock()
}

func (impl ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBean) *v1alpha1.Cluster {
func (impl *ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBean) *v1alpha1.Cluster {
configMap := bean.Config
serverUrl := bean.ServerUrl
bearerToken := ""
Expand Down Expand Up @@ -1097,7 +1126,7 @@ func (impl ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBe
return cl
}

func (impl ClusterServiceImpl) GetClusterConfigByClusterId(clusterId int) (*k8s.ClusterConfig, error) {
func (impl *ClusterServiceImpl) GetClusterConfigByClusterId(clusterId int) (*k8s.ClusterConfig, error) {
clusterBean, err := impl.FindById(clusterId)
if err != nil {
impl.logger.Errorw("error in getting clusterBean by cluster id", "err", err, "clusterId", clusterId)
Expand All @@ -1108,7 +1137,7 @@ func (impl ClusterServiceImpl) GetClusterConfigByClusterId(clusterId int) (*k8s.
return clusterConfig, nil
}

func (impl ClusterServiceImpl) IsClusterReachable(clusterId int) (bool, error) {
func (impl *ClusterServiceImpl) IsClusterReachable(clusterId int) (bool, error) {
cluster, err := impl.clusterRepository.FindById(clusterId)
if err != nil {
impl.logger.Errorw("error in finding cluster from clusterId", "envId", clusterId)
Expand Down
27 changes: 6 additions & 21 deletions pkg/cluster/ClusterServiceExtended.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@ import (
cluster3 "github.com/argoproj/argo-cd/v2/pkg/apiclient/cluster"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/devtron-labs/common-lib/utils/k8s"
repository5 "github.com/devtron-labs/devtron/pkg/auth/user/repository"
"github.com/devtron-labs/devtron/pkg/k8s/informer"

cluster2 "github.com/devtron-labs/devtron/client/argocdServer/cluster"
"github.com/devtron-labs/devtron/client/grafana"
"github.com/devtron-labs/devtron/internal/constants"
"github.com/devtron-labs/devtron/internal/util"
appStoreBean "github.com/devtron-labs/devtron/pkg/appStore/bean"
repository2 "github.com/devtron-labs/devtron/pkg/appStore/installedApp/repository"
"github.com/devtron-labs/devtron/pkg/cluster/repository"
"go.uber.org/zap"
)

// extends ClusterServiceImpl and enhances method of ClusterService with full mode specific errors
Expand All @@ -50,30 +46,19 @@ type ClusterServiceImplExtended struct {
*ClusterServiceImpl
}

func NewClusterServiceImplExtended(repository repository.ClusterRepository, environmentRepository repository.EnvironmentRepository,
grafanaClient grafana.GrafanaClient, logger *zap.SugaredLogger, installedAppRepository repository2.InstalledAppRepository,
K8sUtil *k8s.K8sServiceImpl,
clusterServiceCD cluster2.ServiceClient, K8sInformerFactory informer.K8sInformerFactory,
userAuthRepository repository5.UserAuthRepository,
userRepository repository5.UserRepository, roleGroupRepository repository5.RoleGroupRepository,
gitOpsConfigReadService config.GitOpsConfigReadService) *ClusterServiceImplExtended {
func NewClusterServiceImplExtended(environmentRepository repository.EnvironmentRepository,
grafanaClient grafana.GrafanaClient, installedAppRepository repository2.InstalledAppRepository,
clusterServiceCD cluster2.ServiceClient,
gitOpsConfigReadService config.GitOpsConfigReadService,
clusterServiceImpl *ClusterServiceImpl) *ClusterServiceImplExtended {
clusterServiceExt := &ClusterServiceImplExtended{
environmentRepository: environmentRepository,
grafanaClient: grafanaClient,
installedAppRepository: installedAppRepository,
clusterServiceCD: clusterServiceCD,
gitOpsConfigReadService: gitOpsConfigReadService,
ClusterServiceImpl: &ClusterServiceImpl{
clusterRepository: repository,
logger: logger,
K8sUtil: K8sUtil,
K8sInformerFactory: K8sInformerFactory,
userAuthRepository: userAuthRepository,
userRepository: userRepository,
roleGroupRepository: roleGroupRepository,
},
ClusterServiceImpl: clusterServiceImpl,
}
go clusterServiceExt.buildInformer()
return clusterServiceExt
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/capacity/k8sCapacityService.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (impl *K8sCapacityServiceImpl) GetClusterCapacityDetail(ctx context.Context
if err != nil {
if client.IsClusterUnReachableError(err) {
impl.logger.Errorw("k8s cluster unreachable", "err", err)
return nil, &util.ApiError{HttpStatusCode: http.StatusBadRequest, UserMessage: err.Error()}
return nil, &util.ApiError{HttpStatusCode: http.StatusBadRequest, UserMessage: err.Error(), InternalMessage: err.Error()}
}
impl.logger.Errorw("error in getting node list", "err", err, "clusterId", cluster.Id)
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions util/GlobalConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type EnvironmentVariables struct {
DevtronSecretConfig *DevtronSecretConfig
DeploymentServiceTypeConfig *DeploymentServiceTypeConfig
TerminalEnvVariables *TerminalEnvVariables
GlobalClusterConfig *GlobalClusterConfig
}

type DeploymentServiceTypeConfig struct {
Expand All @@ -43,6 +44,10 @@ type GlobalEnvVariables struct {
ExecuteWireNilChecker bool `env:"EXECUTE_WIRE_NIL_CHECKER" envDefault:"false"`
}

type GlobalClusterConfig struct {
ClusterStatusCronTime int `env:"CLUSTER_STATUS_CRON_TIME" envDefault:"15"`
}

type DevtronSecretConfig struct {
DevtronSecretName string `env:"DEVTRON_SECRET_NAME" envDefault:"devtron-secret"`
DevtronDexSecretNamespace string `env:"DEVTRON_DEX_SECRET_NAMESPACE" envDefault:"devtroncd"`
Expand All @@ -58,6 +63,7 @@ func GetEnvironmentVariables() (*EnvironmentVariables, error) {
DevtronSecretConfig: &DevtronSecretConfig{},
DeploymentServiceTypeConfig: &DeploymentServiceTypeConfig{},
TerminalEnvVariables: &TerminalEnvVariables{},
GlobalClusterConfig: &GlobalClusterConfig{},
}
err := env.Parse(cfg)
if err != nil {
Expand Down
Loading
Loading