Skip to content

CLOUDP-323997: Refactor replicaset controller #185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions controllers/om/replicaset/om_replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
"github.com/mongodb/mongodb-kubernetes/controllers/om"
"github.com/mongodb/mongodb-kubernetes/controllers/om/process"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/scale"
"github.com/mongodb/mongodb-kubernetes/pkg/dns"
)

// BuildFromStatefulSet returns a replica set that can be set in the Automation Config
Expand Down Expand Up @@ -65,36 +63,7 @@ func PrepareScaleDownFromMap(omClient om.Connection, rsMembers map[string][]stri
log.Debugw("Marked replica set members as non-voting", "replica set with members", rsMembers)
}

// TODO practice shows that automation agents can get stuck on setting db to "disabled" also it seems that this process
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment dated from 2021

// works correctly without explicit disabling - feel free to remove this code after some time when it is clear
// that everything works correctly without disabling

// Stage 2. Set disabled to true
//err = omClient.ReadUpdateDeployment(
// func(d om.Deployment) error {
// d.DisableProcesses(allProcesses)
// return nil
// },
//)
//
//if err != nil {
// return errors.New(fmt.Sprintf("Unable to set disabled to true, hosts: %v, err: %w", allProcesses, err))
//}
//log.Debugw("Disabled processes", "processes", allProcesses)

log.Infow("Performed some preliminary steps to support scale down", "hosts", processes)

return nil
}

func PrepareScaleDownFromStatefulSet(omClient om.Connection, statefulSet appsv1.StatefulSet, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error {
_, podNames := dns.GetDnsForStatefulSetReplicasSpecified(statefulSet, rs.Spec.GetClusterDomain(), rs.Status.Members, nil)
podNames = podNames[scale.ReplicasThisReconciliation(rs):rs.Status.Members]

if len(podNames) != 1 {
return xerrors.Errorf("dev error: the number of members being scaled down was > 1, scaling more than one member at a time is not possible! %s", podNames)
}

log.Debugw("Setting votes to 0 for members", "members", podNames)
return PrepareScaleDownFromMap(omClient, map[string][]string{rs.Name: podNames}, podNames, log)
}
3 changes: 2 additions & 1 deletion controllers/operator/common_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"encoding/pem"
"fmt"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/scale"
"reflect"

"github.com/blang/semver"
Expand Down Expand Up @@ -865,7 +866,7 @@ func publishAutomationConfigFirst(ctx context.Context, getter kubernetesClient.C
return true
}

if opts.Replicas < int(*currentSts.Spec.Replicas) {
if scale.IsScalingDown(&mdb) {
log.Debug("Scaling down operation. automationConfig needs to be updated first")
return true
}
Expand Down
151 changes: 91 additions & 60 deletions controllers/operator/mongodbreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imag
// Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read
// and what is in the MongoDbReplicaSet.Spec
func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) {
// ==== 1. Initial checks and setup ====
log := zap.S().With("ReplicaSet", request.NamespacedName)
rs := &mdbv1.MongoDB{}

Expand Down Expand Up @@ -145,6 +146,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
return r.updateStatus(ctx, rs, status, log)
}

// ==== 2. Reconcile Certificates ====
status := certs.EnsureSSLCertsForStatefulSet(ctx, r.SecretClient, r.SecretClient, *rs.Spec.Security, certs.ReplicaSetConfig(*rs), log)
if !status.IsOK() {
return r.updateStatus(ctx, rs, status, log)
Expand All @@ -156,6 +158,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
return r.updateStatus(ctx, rs, workflow.Pending("%s", err.Error()), log)
}

// ==== 3. Reconcile Features and Authentication (Vault?) ====
if status := controlledfeature.EnsureFeatureControls(*rs, conn, conn.OpsManagerVersion(), log); !status.IsOK() {
return r.updateStatus(ctx, rs, status, log)
}
Expand All @@ -171,61 +174,14 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
return r.updateStatus(ctx, rs, status, log)
}

rsCertsConfig := certs.ReplicaSetConfig(*rs)

var vaultConfig vault.VaultConfiguration
var databaseSecretPath string
if r.VaultClient != nil {
vaultConfig = r.VaultClient.VaultConfig
databaseSecretPath = r.VaultClient.DatabaseSecretPath()
}

var automationAgentVersion string
if architectures.IsRunningStaticArchitecture(rs.Annotations) {
// In case the Agent *is* overridden, its version will be merged into the StatefulSet. The merging process
// happens after creating the StatefulSet definition.
if !rs.IsAgentImageOverridden() {
automationAgentVersion, err = r.getAgentVersion(conn, conn.OpsManagerVersion().VersionString, false, log)
if err != nil {
log.Errorf("Impossible to get agent version, please override the agent image by providing a pod template")
status := workflow.Failed(xerrors.Errorf("Failed to get agent version: %w", err))
return r.updateStatus(ctx, rs, status, log)
}
}
// ==== 4. Replicaset Construction ====
stsOpts, sts, err := r.getRsAndStsConfigs(ctx, rs, log, conn, projectConfig, currentAgentAuthMode, prometheusCertHash)
if err != nil {
return r.updateStatus(ctx, rs, workflow.Failed(err), log)
}

rsConfig := construct.ReplicaSetOptions(
PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)),
CurrentAgentAuthMechanism(currentAgentAuthMode),
CertificateHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.CertSecretName, databaseSecretPath, log)),
InternalClusterHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.InternalClusterSecretName, databaseSecretPath, log)),
PrometheusTLSCertHash(prometheusCertHash),
WithVaultConfig(vaultConfig),
WithLabels(rs.Labels),
WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()),
WithInitDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.InitDatabaseImageUrlEnv, r.initDatabaseNonStaticImageVersion)),
WithDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.NonStaticDatabaseEnterpriseImage, r.databaseNonStaticImageVersion)),
WithAgentImage(images.ContainerImage(r.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)),
WithMongodbImage(images.GetOfficialImage(r.imageUrls, rs.Spec.Version, rs.GetAnnotations())),
)

// ==== 5. Recovery logic ====
caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath)

if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil {
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)), log)
}

sts := construct.DatabaseStatefulSet(*rs, rsConfig, log)
if status := ensureRoles(rs.Spec.GetSecurity().Roles, conn, log); !status.IsOK() {
return r.updateStatus(ctx, rs, status, log)
}

if scale.ReplicasThisReconciliation(rs) < rs.Status.Members {
Copy link
Collaborator Author

@Julien-Ben Julien-Ben Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore
Related Jira ticket (from 2017): https://jira.mongodb.org/browse/HELP-3818

From the ticket, this step was necessary to scale a replica set to a single member.
We have a test covering that case

def test_replica_set_can_be_scaled_to_single_member(replica_set: MongoDB):

Note that the multi cluster replica set controller does not perform an equivalent step.

if err := replicaset.PrepareScaleDownFromStatefulSet(conn, sts, rs, log); err != nil {
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log)
}
}

agentCertSecretName := rs.GetSecurity().AgentClientCertificateSecretName(rs.Name).Name
agentCertSecretName += certs.OperatorGeneratedCertSuffix

Expand All @@ -235,20 +191,21 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) {
log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition)
automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
deploymentError := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log)
if deploymentError != nil {
log.Errorf("Recovery failed because of deployment errors, %w", deploymentError)
if reconcileStatus := r.reconcileMemberResources(ctx, rs, sts, stsOpts, log, conn); !reconcileStatus.IsOK() {
log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus)
}
if !automationConfigStatus.IsOK() {
log.Errorf("Recovery failed because of Automation Config update errors, %v", automationConfigStatus)
}
}

// ==== 6. Actual reconciliation: OM updates and Kubernetes resources updates ====
lastSpec, err := rs.GetLastSpec()
if err != nil {
lastSpec = &mdbv1.MongoDbSpec{}
}
status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, rsConfig, log),

status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, stsOpts, log),
func() workflow.Status {
return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
},
Expand All @@ -261,8 +218,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
_, _ = r.updateStatus(ctx, rs, workflow.Pending(""), log, workflowStatus.StatusOptions()...)
}

if err := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log); err != nil {
return workflow.Failed(xerrors.Errorf("Failed to create/update (Kubernetes reconciliation phase): %w", err))
if reconcileStatus := r.reconcileMemberResources(ctx, rs, sts, stsOpts, log, conn); !reconcileStatus.IsOK() {
return reconcileStatus
}

if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, r.client); !status.IsOK() {
Expand All @@ -277,6 +234,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
return r.updateStatus(ctx, rs, status, log)
}

// ==== 7. Final steps ====
if scale.IsStillScaling(rs) {
return r.updateStatus(ctx, rs, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), log, mdbstatus.MembersOption(rs))
}
Expand Down Expand Up @@ -308,6 +266,79 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
return r.updateStatus(ctx, rs, workflow.OK(), log, mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus())
}

// reconcileMemberResources handles the synchronization of kubernetes resources, which can be statefulsets, services etc.
// All the resources required in the k8s cluster (as opposed to the automation config) for creating the replicaset
// should be reconciled in this method.
func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Context, rs *mdbv1.MongoDB, sts appsv1.StatefulSet, stsOpts func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, log *zap.SugaredLogger, conn om.Connection) workflow.Status {

if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil {
return workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err))
}

if status := ensureRoles(rs.GetSecurity().Roles, conn, log); !status.IsOK() {
return status
}

return r.reconcileStatefulSet(ctx, rs, sts, stsOpts, log)
}

func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, rs *mdbv1.MongoDB, sts appsv1.StatefulSet, stsOpts func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, log *zap.SugaredLogger) workflow.Status {

if err := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, stsOpts, log); err != nil {
return workflow.Failed(xerrors.Errorf("Failed to create/update (Kubernetes reconciliation phase): %w", err))
}

if status := statefulset.GetStatefulSetStatus(ctx, rs.Namespace, rs.Name, r.client); !status.IsOK() {
return status
}
return workflow.OK()
}

func (r *ReconcileMongoDbReplicaSet) getRsAndStsConfigs(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger, conn om.Connection, projectConfig mdbv1.ProjectConfig, currentAgentAuthMode string, prometheusCertHash string) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, appsv1.StatefulSet, error) {

rsCertsConfig := certs.ReplicaSetConfig(*rs)

var vaultConfig vault.VaultConfiguration
var databaseSecretPath string
if r.VaultClient != nil {
vaultConfig = r.VaultClient.VaultConfig
databaseSecretPath = r.VaultClient.DatabaseSecretPath()
}

// Database container and versions configuration
var automationAgentVersion string
var err error
if architectures.IsRunningStaticArchitecture(rs.Annotations) {
// In case the Agent *is* overridden, its version will be merged into the StatefulSet. The merging process
// happens after creating the StatefulSet definition.
if !rs.IsAgentImageOverridden() {
automationAgentVersion, err = r.getAgentVersion(conn, conn.OpsManagerVersion().VersionString, false, log)
if err != nil {
return nil, appsv1.StatefulSet{}, xerrors.Errorf("Impossible to get agent version, please override the agent image by providing a pod template: %w", err)
}
}
}

opts := construct.ReplicaSetOptions(
PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)),
CurrentAgentAuthMechanism(currentAgentAuthMode),
CertificateHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.CertSecretName, databaseSecretPath, log)),
InternalClusterHash(enterprisepem.ReadHashFromSecret(ctx, r.SecretClient, rs.Namespace, rsCertsConfig.InternalClusterSecretName, databaseSecretPath, log)),
PrometheusTLSCertHash(prometheusCertHash),
WithVaultConfig(vaultConfig),
WithLabels(rs.Labels),
WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()),
WithInitDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.InitDatabaseImageUrlEnv, r.initDatabaseNonStaticImageVersion)),
WithDatabaseNonStaticImage(images.ContainerImage(r.imageUrls, util.NonStaticDatabaseEnterpriseImage, r.databaseNonStaticImageVersion)),
WithAgentImage(images.ContainerImage(r.imageUrls, architectures.MdbAgentImageRepo, automationAgentVersion)),
WithMongodbImage(images.GetOfficialImage(r.imageUrls, rs.Spec.Version, rs.GetAnnotations())),
)

sts := construct.DatabaseStatefulSet(*rs, opts, log)

return opts, sts, nil
}

func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigMap {
data := make(map[string]string)

Expand Down Expand Up @@ -408,7 +439,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c
// Only "concrete" RS members should be observed
// - if scaling down, let's observe only members that will remain after scale-down operation
// - if scaling up, observe only current members, because new ones might not exist yet
err := agents.WaitForRsAgentsToRegister(set, util_int.Min(membersNumberBefore, int(*set.Spec.Replicas)), rs.Spec.GetClusterDomain(), conn, log, rs)
err := agents.WaitForRsAgentsToRegister(set, util_int.Min(membersNumberBefore, scale.ReplicasThisReconciliation(rs)), rs.Spec.GetClusterDomain(), conn, log, rs)
if err != nil && !isRecovering {
return workflow.Failed(err)
}
Expand All @@ -426,7 +457,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c
// TLS to be completely disabled first.
updatedMembers = membersNumberBefore
} else {
updatedMembers = int(*set.Spec.Replicas)
updatedMembers = scale.ReplicasThisReconciliation(rs)
}

replicaSet := replicaset.BuildFromStatefulSetWithReplicas(r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, set, rs.GetSpec(), updatedMembers, rs.CalculateFeatureCompatibilityVersion())
Expand Down