-
Notifications
You must be signed in to change notification settings - Fork 9
CLOUDP-323997: Refactor replicaset controller #185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -98,6 +98,7 @@ func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imag | |||
// Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read | ||||
// and what is in the MongoDbReplicaSet.Spec | ||||
func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) { | ||||
// ==== 1. Initial checks and setup ==== | ||||
log := zap.S().With("ReplicaSet", request.NamespacedName) | ||||
rs := &mdbv1.MongoDB{} | ||||
|
||||
|
@@ -145,6 +146,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
return r.updateStatus(ctx, rs, status, log) | ||||
} | ||||
|
||||
// ==== 2. Reconcile Certificates ==== | ||||
status := certs.EnsureSSLCertsForStatefulSet(ctx, r.SecretClient, r.SecretClient, *rs.Spec.Security, certs.ReplicaSetConfig(*rs), log) | ||||
if !status.IsOK() { | ||||
return r.updateStatus(ctx, rs, status, log) | ||||
|
@@ -156,6 +158,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
return r.updateStatus(ctx, rs, workflow.Pending("%s", err.Error()), log) | ||||
} | ||||
|
||||
// ==== 3. Reconcile Features and Authentication (Vault?) ==== | ||||
if status := controlledfeature.EnsureFeatureControls(*rs, conn, conn.OpsManagerVersion(), log); !status.IsOK() { | ||||
return r.updateStatus(ctx, rs, status, log) | ||||
} | ||||
|
@@ -171,61 +174,14 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
return r.updateStatus(ctx, rs, status, log) | ||||
} | ||||
|
||||
rsCertsConfig := certs.ReplicaSetConfig(*rs) | ||||
|
||||
var vaultConfig vault.VaultConfiguration | ||||
var databaseSecretPath string | ||||
if r.VaultClient != nil { | ||||
vaultConfig = r.VaultClient.VaultConfig | ||||
databaseSecretPath = r.VaultClient.DatabaseSecretPath() | ||||
} | ||||
|
||||
var automationAgentVersion string | ||||
if architectures.IsRunningStaticArchitecture(rs.Annotations) { | ||||
// In case the Agent *is* overridden, its version will be merged into the StatefulSet. The merging process | ||||
// happens after creating the StatefulSet definition. | ||||
if !rs.IsAgentImageOverridden() { | ||||
automationAgentVersion, err = r.getAgentVersion(conn, conn.OpsManagerVersion().VersionString, false, log) | ||||
if err != nil { | ||||
log.Errorf("Impossible to get agent version, please override the agent image by providing a pod template") | ||||
status := workflow.Failed(xerrors.Errorf("Failed to get agent version: %w", err)) | ||||
return r.updateStatus(ctx, rs, status, log) | ||||
} | ||||
} | ||||
// ==== 4. Replicaset Construction ==== | ||||
stsOpts, sts, err := r.getRsAndStsConfigs(ctx, rs, log, conn, projectConfig, currentAgentAuthMode, prometheusCertHash) | ||||
if err != nil { | ||||
return r.updateStatus(ctx, rs, workflow.Failed(err), log) | ||||
} | ||||
|
||||
rsConfig := construct.ReplicaSetOptions( | ||||
PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)), | ||||
CurrentAgentAuthMechanism(currentAgentAuthMode), | ||||
CertificateHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.CertSecretName, databaseSecretPath, log)), | ||||
InternalClusterHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.InternalClusterSecretName, databaseSecretPath, log)), | ||||
PrometheusTLSCertHash(prometheusCertHash), | ||||
WithVaultConfig(vaultConfig), | ||||
WithLabels(rs.Labels), | ||||
WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()), | ||||
WithInitDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.InitDatabaseImageUrlEnv, r.initDatabaseNonStaticImageVersion)), | ||||
WithDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.NonStaticDatabaseEnterpriseImage, r.databaseNonStaticImageVersion)), | ||||
WithAgentImage(images.ContainerImage(r.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)), | ||||
WithMongodbImage(images.GetOfficialImage(r.imageUrls, rs.Spec.Version, rs.GetAnnotations())), | ||||
) | ||||
|
||||
// ==== 5. Recovery logic ==== | ||||
caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) | ||||
|
||||
if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { | ||||
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)), log) | ||||
} | ||||
|
||||
sts := construct.DatabaseStatefulSet(*rs, rsConfig, log) | ||||
if status := ensureRoles(rs.Spec.GetSecurity().Roles, conn, log); !status.IsOK() { | ||||
return r.updateStatus(ctx, rs, status, log) | ||||
} | ||||
|
||||
if scale.ReplicasThisReconciliation(rs) < rs.Status.Members { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed anymore From the ticket, this step was necessary to scale a replica set to a single member. mongodb-kubernetes/docker/mongodb-kubernetes-tests/tests/replicaset/replica_set.py Line 336 in f29ac63
Note that the multi cluster replica set controller does not perform an equivalent step. |
||||
if err := replicaset.PrepareScaleDownFromStatefulSet(conn, sts, rs, log); err != nil { | ||||
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log) | ||||
} | ||||
} | ||||
|
||||
agentCertSecretName := rs.GetSecurity().AgentClientCertificateSecretName(rs.Name).Name | ||||
agentCertSecretName += certs.OperatorGeneratedCertSuffix | ||||
|
||||
|
@@ -235,20 +191,21 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) { | ||||
log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition) | ||||
automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") | ||||
deploymentError := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log) | ||||
if deploymentError != nil { | ||||
log.Errorf("Recovery failed because of deployment errors, %w", deploymentError) | ||||
if reconcileStatus := r.reconcileMemberResources(ctx, rs, sts, stsOpts, log, conn); !reconcileStatus.IsOK() { | ||||
log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus) | ||||
} | ||||
if !automationConfigStatus.IsOK() { | ||||
log.Errorf("Recovery failed because of Automation Config update errors, %v", automationConfigStatus) | ||||
} | ||||
} | ||||
|
||||
// ==== 6. Actual reconciliation: OM updates and Kubernetes resources updates ==== | ||||
lastSpec, err := rs.GetLastSpec() | ||||
if err != nil { | ||||
lastSpec = &mdbv1.MongoDbSpec{} | ||||
} | ||||
status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, rsConfig, log), | ||||
|
||||
status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, stsOpts, log), | ||||
func() workflow.Status { | ||||
return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") | ||||
}, | ||||
|
@@ -261,8 +218,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
_, _ = r.updateStatus(ctx, rs, workflow.Pending(""), log, workflowStatus.StatusOptions()...) | ||||
} | ||||
|
||||
if err := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log); err != nil { | ||||
return workflow.Failed(xerrors.Errorf("Failed to create/update (Kubernetes reconciliation phase): %w", err)) | ||||
if reconcileStatus := r.reconcileMemberResources(ctx, rs, sts, stsOpts, log, conn); !reconcileStatus.IsOK() { | ||||
return reconcileStatus | ||||
} | ||||
|
||||
if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, r.client); !status.IsOK() { | ||||
|
@@ -277,6 +234,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
return r.updateStatus(ctx, rs, status, log) | ||||
} | ||||
|
||||
// ==== 7. Final steps ==== | ||||
if scale.IsStillScaling(rs) { | ||||
return r.updateStatus(ctx, rs, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), log, mdbstatus.MembersOption(rs)) | ||||
} | ||||
|
@@ -308,6 +266,79 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco | |||
return r.updateStatus(ctx, rs, workflow.OK(), log, mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) | ||||
} | ||||
|
||||
// reconcileMemberResources handles the synchronization of kubernetes resources, which can be statefulsets, services etc. | ||||
// All the resources required in the k8s cluster (as opposed to the automation config) for creating the replicaset | ||||
// should be reconciled in this method. | ||||
func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Context, rs *mdbv1.MongoDB, sts appsv1.StatefulSet, stsOpts func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, log *zap.SugaredLogger, conn om.Connection) workflow.Status { | ||||
|
||||
if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { | ||||
return workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)) | ||||
} | ||||
|
||||
if status := ensureRoles(rs.GetSecurity().Roles, conn, log); !status.IsOK() { | ||||
return status | ||||
} | ||||
|
||||
return r.reconcileStatefulSet(ctx, rs, sts, stsOpts, log) | ||||
} | ||||
|
||||
func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, rs *mdbv1.MongoDB, sts appsv1.StatefulSet, stsOpts func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, log *zap.SugaredLogger) workflow.Status { | ||||
|
||||
if err := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, stsOpts, log); err != nil { | ||||
return workflow.Failed(xerrors.Errorf("Failed to create/update (Kubernetes reconciliation phase): %w", err)) | ||||
} | ||||
|
||||
if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, r.client); !status.IsOK() { | ||||
return status | ||||
} | ||||
return workflow.OK() | ||||
} | ||||
|
||||
func (r *ReconcileMongoDbReplicaSet) getRsAndStsConfigs(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger, conn om.Connection, projectConfig mdbv1.ProjectConfig, currentAgentAuthMode string, prometheusCertHash string) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, appsv1.StatefulSet, error) { | ||||
|
||||
rsCertsConfig := certs.ReplicaSetConfig(*rs) | ||||
|
||||
var vaultConfig vault.VaultConfiguration | ||||
var databaseSecretPath string | ||||
if r.VaultClient != nil { | ||||
vaultConfig = r.VaultClient.VaultConfig | ||||
databaseSecretPath = r.VaultClient.DatabaseSecretPath() | ||||
} | ||||
|
||||
// Database container and versions configuration | ||||
var automationAgentVersion string | ||||
var err error | ||||
if architectures.IsRunningStaticArchitecture(rs.Annotations) { | ||||
// In case the Agent *is* overridden, its version will be merged into the StatefulSet. The merging process | ||||
// happens after creating the StatefulSet definition. | ||||
if !rs.IsAgentImageOverridden() { | ||||
automationAgentVersion, err = r.getAgentVersion(conn, conn.OpsManagerVersion().VersionString, false, log) | ||||
if err != nil { | ||||
return nil, appsv1.StatefulSet{}, xerrors.Errorf("Impossible to get agent version, please override the agent image by providing a pod template: %w", err) | ||||
} | ||||
} | ||||
} | ||||
|
||||
opts := construct.ReplicaSetOptions( | ||||
PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)), | ||||
CurrentAgentAuthMechanism(currentAgentAuthMode), | ||||
CertificateHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.CertSecretName, databaseSecretPath, log)), | ||||
InternalClusterHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.InternalClusterSecretName, databaseSecretPath, log)), | ||||
PrometheusTLSCertHash(prometheusCertHash), | ||||
WithVaultConfig(vaultConfig), | ||||
WithLabels(rs.Labels), | ||||
WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()), | ||||
WithInitDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.InitDatabaseImageUrlEnv, r.initDatabaseNonStaticImageVersion)), | ||||
WithDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.NonStaticDatabaseEnterpriseImage, r.databaseNonStaticImageVersion)), | ||||
WithAgentImage(images.ContainerImage(r.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)), | ||||
WithMongodbImage(images.GetOfficialImage(r.imageUrls, rs.Spec.Version, rs.GetAnnotations())), | ||||
) | ||||
|
||||
sts := construct.DatabaseStatefulSet(*rs, opts, log) | ||||
|
||||
return opts, sts, nil | ||||
} | ||||
|
||||
func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigMap { | ||||
data := make(map[string]string) | ||||
|
||||
|
@@ -408,7 +439,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c | |||
// Only "concrete" RS members should be observed | ||||
// - if scaling down, let's observe only members that will remain after scale-down operation | ||||
// - if scaling up, observe only current members, because new ones might not exist yet | ||||
err := agents.WaitForRsAgentsToRegister(set, util_int.Min(membersNumberBefore, int(*set.Spec.Replicas)), rs.Spec.GetClusterDomain(), conn, log, rs) | ||||
err := agents.WaitForRsAgentsToRegister(set, util_int.Min(membersNumberBefore, scale.ReplicasThisReconciliation(rs)), rs.Spec.GetClusterDomain(), conn, log, rs) | ||||
if err != nil && !isRecovering { | ||||
return workflow.Failed(err) | ||||
} | ||||
|
@@ -426,7 +457,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c | |||
// TLS to be completely disabled first. | ||||
updatedMembers = membersNumberBefore | ||||
} else { | ||||
updatedMembers = int(*set.Spec.Replicas) | ||||
updatedMembers = scale.ReplicasThisReconciliation(rs) | ||||
} | ||||
|
||||
replicaSet := replicaset.BuildFromStatefulSetWithReplicas(r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, set, rs.GetSpec(), updatedMembers, rs.CalculateFeatureCompatibilityVersion()) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment dated from 2021