From 50fbaca6d503306c455a6e0ab91edfd3f0961609 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Wed, 3 Aug 2022 18:23:53 +0800 Subject: [PATCH 1/2] refactor(status): refactor --- api/v1alpha1/mysqlcluster_status_types.go | 92 ++++ api/v1alpha1/mysqlcluster_types.go | 123 ----- api/v1alpha1/zz_generated.deepcopy.go | 105 +--- .../crds/mysql.radondb.com_mysqlclusters.yaml | 156 +++--- .../mysql.radondb.com_mysqlclusters.yaml | 156 +++--- controllers/status_controller.go | 38 +- mysqlcluster/syncer/statefulset.go | 22 +- mysqlcluster/syncer/status.go | 450 ++---------------- 8 files changed, 349 insertions(+), 793 deletions(-) create mode 100644 api/v1alpha1/mysqlcluster_status_types.go diff --git a/api/v1alpha1/mysqlcluster_status_types.go b/api/v1alpha1/mysqlcluster_status_types.go new file mode 100644 index 00000000..dd61c9aa --- /dev/null +++ b/api/v1alpha1/mysqlcluster_status_types.go @@ -0,0 +1,92 @@ +/* +Copyright 2021 RadonDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ClusterState defines cluster state. +type ClusterState string + +const ( + // ClusterInitState indicates whether the cluster is initializing. + ClusterInitState ClusterState = "Initializing" + // ClusterUpdateState indicates whether the cluster is being updated. + ClusterUpdateState ClusterState = "Updating" + // ClusterReadyState indicates whether all containers in the pod are ready. + ClusterReadyState ClusterState = "Ready" + // ClusterCloseState indicates whether the cluster is closed. + ClusterCloseState ClusterState = "Closed" + // ClusterScaleInState indicates whether the cluster replicas is decreasing. + ClusterScaleInState ClusterState = "ScaleIn" + // ClusterScaleOutState indicates whether the cluster replicas is increasing. + ClusterScaleOutState ClusterState = "ScaleOut" +) + +// ClusterConditionType defines type for cluster condition type. +type ClusterConditionType string + +const ( + ClusterInitialized ClusterConditionType = "Initialized" + ClusterAvaliable ClusterConditionType = "Avaliable" + ClusterInUpgrade ClusterConditionType = "InUpgrade" + ClusterAllReady ClusterConditionType = "AllReady" +) + +// MySQLClusterCondition defines type for cluster conditions. +type MySQLClusterCondition struct { + // Type of cluster condition, values in (\"Initializing\", \"Ready\", \"Error\"). + Type ClusterConditionType `json:"type"` + // Status of the condition, one of (\"True\", \"False\", \"Unknown\"). + Status metav1.ConditionStatus `json:"status"` +} + +// MysqlClusterStatus defines the observed state of MysqlCluster +type MysqlClusterStatus struct { + // StatefulSetStatus is the status of the StatefulSet reconciled by the mysqlcluster controller. + StatefulSetStatus *appsv1.StatefulSetStatus `json:"statefulSetStatus,omitempty"` + // State is the state of the mysqlcluster. + State ClusterState `json:"state,omitempty"` + // Conditions contains the list of the mysqlcluster conditions. + Conditions []MySQLClusterCondition `json:"conditions,omitempty"` +} + +func (s *MysqlClusterStatus) GetStatefulSetStatus() *appsv1.StatefulSetStatus { + return s.StatefulSetStatus +} + +func (s *MysqlClusterStatus) GetState() ClusterState { + return s.State +} + +func (s *MysqlClusterStatus) GetConditions() []MySQLClusterCondition { + return s.Conditions +} + +func (s *MysqlClusterStatus) SetStatefulSetStatus(status *appsv1.StatefulSetStatus) { + s.StatefulSetStatus = status +} + +func (s *MysqlClusterStatus) SetState(state ClusterState) { + s.State = state +} + +func (s *MysqlClusterStatus) SetConditions(conditions []MySQLClusterCondition) { + s.Conditions = conditions +} diff --git a/api/v1alpha1/mysqlcluster_types.go b/api/v1alpha1/mysqlcluster_types.go index 932a7e46..c2e9d5a7 100644 --- a/api/v1alpha1/mysqlcluster_types.go +++ b/api/v1alpha1/mysqlcluster_types.go @@ -270,129 +270,6 @@ type Persistence struct { Size string `json:"size,omitempty"` } -// ClusterState defines cluster state. -type ClusterState string - -const ( - // ClusterInitState indicates whether the cluster is initializing. - ClusterInitState ClusterState = "Initializing" - // ClusterUpdateState indicates whether the cluster is being updated. - ClusterUpdateState ClusterState = "Updating" - // ClusterReadyState indicates whether all containers in the pod are ready. - ClusterReadyState ClusterState = "Ready" - // ClusterCloseState indicates whether the cluster is closed. - ClusterCloseState ClusterState = "Closed" - // ClusterScaleInState indicates whether the cluster replicas is decreasing. - ClusterScaleInState ClusterState = "ScaleIn" - // ClusterScaleOutState indicates whether the cluster replicas is increasing. - ClusterScaleOutState ClusterState = "ScaleOut" -) - -// ClusterConditionType defines type for cluster condition type. -type ClusterConditionType string - -const ( - // ConditionInit indicates whether the cluster is initializing. - ConditionInit ClusterConditionType = "Initializing" - // ConditionUpdate indicates whether the cluster is being updated. - ConditionUpdate ClusterConditionType = "Updating" - // ConditionReady indicates whether all containers in the pod are ready. - ConditionReady ClusterConditionType = "Ready" - // ConditionClose indicates whether the cluster is closed. - ConditionClose ClusterConditionType = "Closed" - // ConditionError indicates whether there is an error in the cluster. - ConditionError ClusterConditionType = "Error" - // ConditionScaleIn indicates whether the cluster replicas is decreasing. - ConditionScaleIn ClusterConditionType = "ScaleIn" - // ConditionScaleOut indicates whether the cluster replicas is increasing. - ConditionScaleOut ClusterConditionType = "ScaleOut" -) - -// ClusterCondition defines type for cluster conditions. -type ClusterCondition struct { - // Type of cluster condition, values in (\"Initializing\", \"Ready\", \"Error\"). - Type ClusterConditionType `json:"type"` - // Status of the condition, one of (\"True\", \"False\", \"Unknown\"). - Status corev1.ConditionStatus `json:"status"` - - // The last time this Condition type changed. - LastTransitionTime metav1.Time `json:"lastTransitionTime"` - // One word, camel-case reason for current status of the condition. - Reason string `json:"reason,omitempty"` - // Full text reason for current status of the condition. - Message string `json:"message,omitempty"` -} - -// NodeStatus defines type for status of a node into cluster. -type NodeStatus struct { - // Name of the node. - Name string `json:"name"` - // Full text reason for current status of the node. - Message string `json:"message,omitempty"` - // RaftStatus is the raft status of the node. - RaftStatus RaftStatus `json:"raftStatus,omitempty"` - // Conditions contains the list of the node conditions fulfilled. - Conditions []NodeCondition `json:"conditions,omitempty"` -} - -type RaftStatus struct { - // Role is one of (LEADER/CANDIDATE/FOLLOWER/IDLE/INVALID) - Role string `json:"role,omitempty"` - // Leader is the name of the Leader of the current node. - Leader string `json:"leader,omitempty"` - // Nodes is a list of nodes that can be identified by the current node. - Nodes []string `json:"nodes,omitempty"` -} - -// NodeCondition defines type for representing node conditions. -type NodeCondition struct { - // Type of the node condition. - Type NodeConditionType `json:"type"` - // Status of the node, one of (\"True\", \"False\", \"Unknown\"). - Status corev1.ConditionStatus `json:"status"` - // The last time this Condition type changed. - LastTransitionTime metav1.Time `json:"lastTransitionTime"` -} - -// The index of the NodeStatus.Conditions. -type NodeConditionsIndex uint8 - -const ( - IndexLagged NodeConditionsIndex = iota - IndexLeader - IndexReadOnly - IndexReplicating -) - -// NodeConditionType defines type for node condition type. -type NodeConditionType string - -const ( - // NodeConditionLagged represents if the node is lagged. - NodeConditionLagged NodeConditionType = "Lagged" - // NodeConditionLeader represents if the node is leader or not. - NodeConditionLeader NodeConditionType = "Leader" - // NodeConditionReadOnly repesents if the node is read only or not - NodeConditionReadOnly NodeConditionType = "ReadOnly" - // NodeConditionReplicating represents if the node is replicating or not. - NodeConditionReplicating NodeConditionType = "Replicating" -) - -// MysqlClusterStatus defines the observed state of MysqlCluster -type MysqlClusterStatus struct { - // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster - // Important: Run "make" to regenerate code after modifying this file - - // ReadyNodes represents number of the nodes that are in ready state. - ReadyNodes int `json:"readyNodes,omitempty"` - // State - State ClusterState `json:"state,omitempty"` - // Conditions contains the list of the cluster conditions fulfilled. - Conditions []ClusterCondition `json:"conditions,omitempty"` - // Nodes contains the list of the node status fulfilled. - Nodes []NodeStatus `json:"nodes,omitempty"` -} - // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.readyNodes diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index a41a7223..ca55b626 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -21,7 +21,8 @@ limitations under the License. package v1alpha1 import ( - "k8s.io/api/core/v1" + "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -143,33 +144,32 @@ func (in *BackupStatus) DeepCopy() *BackupStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ClusterCondition) DeepCopyInto(out *ClusterCondition) { +func (in *MetricsOpts) DeepCopyInto(out *MetricsOpts) { *out = *in - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) + in.Resources.DeepCopyInto(&out.Resources) } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterCondition. -func (in *ClusterCondition) DeepCopy() *ClusterCondition { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsOpts. +func (in *MetricsOpts) DeepCopy() *MetricsOpts { if in == nil { return nil } - out := new(ClusterCondition) + out := new(MetricsOpts) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *MetricsOpts) DeepCopyInto(out *MetricsOpts) { +func (in *MySQLClusterCondition) DeepCopyInto(out *MySQLClusterCondition) { *out = *in - in.Resources.DeepCopyInto(&out.Resources) } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsOpts. -func (in *MetricsOpts) DeepCopy() *MetricsOpts { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MySQLClusterCondition. +func (in *MySQLClusterCondition) DeepCopy() *MySQLClusterCondition { if in == nil { return nil } - out := new(MetricsOpts) + out := new(MySQLClusterCondition) in.DeepCopyInto(out) return out } @@ -283,19 +283,15 @@ func (in *MysqlClusterSpec) DeepCopy() *MysqlClusterSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MysqlClusterStatus) DeepCopyInto(out *MysqlClusterStatus) { *out = *in + if in.StatefulSetStatus != nil { + in, out := &in.StatefulSetStatus, &out.StatefulSetStatus + *out = new(v1.StatefulSetStatus) + (*in).DeepCopyInto(*out) + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]ClusterCondition, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } - if in.Nodes != nil { - in, out := &in.Nodes, &out.Nodes - *out = make([]NodeStatus, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } + *out = make([]MySQLClusterCondition, len(*in)) + copy(*out, *in) } } @@ -412,51 +408,12 @@ func (in *MysqlUserList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *NodeCondition) DeepCopyInto(out *NodeCondition) { - *out = *in - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeCondition. -func (in *NodeCondition) DeepCopy() *NodeCondition { - if in == nil { - return nil - } - out := new(NodeCondition) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { - *out = *in - in.RaftStatus.DeepCopyInto(&out.RaftStatus) - if in.Conditions != nil { - in, out := &in.Conditions, &out.Conditions - *out = make([]NodeCondition, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeStatus. -func (in *NodeStatus) DeepCopy() *NodeStatus { - if in == nil { - return nil - } - out := new(NodeStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Persistence) DeepCopyInto(out *Persistence) { *out = *in if in.AccessModes != nil { in, out := &in.AccessModes, &out.AccessModes - *out = make([]v1.PersistentVolumeAccessMode, len(*in)) + *out = make([]corev1.PersistentVolumeAccessMode, len(*in)) copy(*out, *in) } if in.StorageClass != nil { @@ -495,12 +452,12 @@ func (in *PodPolicy) DeepCopyInto(out *PodPolicy) { } if in.Affinity != nil { in, out := &in.Affinity, &out.Affinity - *out = new(v1.Affinity) + *out = new(corev1.Affinity) (*in).DeepCopyInto(*out) } if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations - *out = make([]v1.Toleration, len(*in)) + *out = make([]corev1.Toleration, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -518,26 +475,6 @@ func (in *PodPolicy) DeepCopy() *PodPolicy { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *RaftStatus) DeepCopyInto(out *RaftStatus) { - *out = *in - if in.Nodes != nil { - in, out := &in.Nodes, &out.Nodes - *out = make([]string, len(*in)) - copy(*out, *in) - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RaftStatus. -func (in *RaftStatus) DeepCopy() *RaftStatus { - if in == nil { - return nil - } - out := new(RaftStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SecretSelector) DeepCopyInto(out *SecretSelector) { *out = *in diff --git a/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml b/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml index 88d891c5..b3078b7d 100644 --- a/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml +++ b/charts/mysql-operator/crds/mysql.radondb.com_mysqlclusters.yaml @@ -1346,22 +1346,10 @@ spec: description: MysqlClusterStatus defines the observed state of MysqlCluster properties: conditions: - description: Conditions contains the list of the cluster conditions - fulfilled. + description: Conditions contains the list of the mysqlcluster conditions. items: - description: ClusterCondition defines type for cluster conditions. + description: MySQLClusterCondition defines type for cluster conditions. properties: - lastTransitionTime: - description: The last time this Condition type changed. - format: date-time - type: string - message: - description: Full text reason for current status of the condition. - type: string - reason: - description: One word, camel-case reason for current status - of the condition. - type: string status: description: Status of the condition, one of (\"True\", \"False\", \"Unknown\"). @@ -1371,74 +1359,94 @@ spec: \"Ready\", \"Error\"). type: string required: - - lastTransitionTime - status - type type: object type: array - nodes: - description: Nodes contains the list of the node status fulfilled. - items: - description: NodeStatus defines type for status of a node into cluster. - properties: - conditions: - description: Conditions contains the list of the node conditions - fulfilled. - items: - description: NodeCondition defines type for representing node - conditions. - properties: - lastTransitionTime: - description: The last time this Condition type changed. - format: date-time - type: string - status: - description: Status of the node, one of (\"True\", \"False\", - \"Unknown\"). - type: string - type: - description: Type of the node condition. - type: string - required: - - lastTransitionTime - - status - - type - type: object - type: array - message: - description: Full text reason for current status of the node. - type: string - name: - description: Name of the node. - type: string - raftStatus: - description: RaftStatus is the raft status of the node. + state: + description: State is the state of the mysqlcluster. + type: string + statefulSetStatus: + description: StatefulSetStatus is the status of the StatefulSet reconciled + by the mysqlcluster controller. + properties: + collisionCount: + description: collisionCount is the count of hash collisions for + the StatefulSet. The StatefulSet controller uses this field + as a collision avoidance mechanism when it needs to create the + name for the newest ControllerRevision. + format: int32 + type: integer + conditions: + description: Represents the latest available observations of a + statefulset's current state. + items: + description: StatefulSetCondition describes the state of a statefulset + at a certain point. properties: - leader: - description: Leader is the name of the Leader of the current - node. + lastTransitionTime: + description: Last time the condition transitioned from one + status to another. + format: date-time type: string - nodes: - description: Nodes is a list of nodes that can be identified - by the current node. - items: - type: string - type: array - role: - description: Role is one of (LEADER/CANDIDATE/FOLLOWER/IDLE/INVALID) + message: + description: A human readable message indicating details + about the transition. type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, + Unknown. + type: string + type: + description: Type of statefulset condition. + type: string + required: + - status + - type type: object - required: - - name - type: object - type: array - readyNodes: - description: ReadyNodes represents number of the nodes that are in - ready state. - type: integer - state: - description: State - type: string + type: array + currentReplicas: + description: currentReplicas is the number of Pods created by + the StatefulSet controller from the StatefulSet version indicated + by currentRevision. + format: int32 + type: integer + currentRevision: + description: currentRevision, if not empty, indicates the version + of the StatefulSet used to generate Pods in the sequence [0,currentReplicas). + type: string + observedGeneration: + description: observedGeneration is the most recent generation + observed for this StatefulSet. It corresponds to the StatefulSet's + generation, which is updated on mutation by the API Server. + format: int64 + type: integer + readyReplicas: + description: readyReplicas is the number of Pods created by the + StatefulSet controller that have a Ready Condition. + format: int32 + type: integer + replicas: + description: replicas is the number of Pods created by the StatefulSet + controller. + format: int32 + type: integer + updateRevision: + description: updateRevision, if not empty, indicates the version + of the StatefulSet used to generate Pods in the sequence [replicas-updatedReplicas,replicas) + type: string + updatedReplicas: + description: updatedReplicas is the number of Pods created by + the StatefulSet controller from the StatefulSet version indicated + by updateRevision. + format: int32 + type: integer + required: + - replicas + type: object type: object type: object served: true diff --git a/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml b/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml index 88d891c5..b3078b7d 100644 --- a/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml +++ b/config/crd/bases/mysql.radondb.com_mysqlclusters.yaml @@ -1346,22 +1346,10 @@ spec: description: MysqlClusterStatus defines the observed state of MysqlCluster properties: conditions: - description: Conditions contains the list of the cluster conditions - fulfilled. + description: Conditions contains the list of the mysqlcluster conditions. items: - description: ClusterCondition defines type for cluster conditions. + description: MySQLClusterCondition defines type for cluster conditions. properties: - lastTransitionTime: - description: The last time this Condition type changed. - format: date-time - type: string - message: - description: Full text reason for current status of the condition. - type: string - reason: - description: One word, camel-case reason for current status - of the condition. - type: string status: description: Status of the condition, one of (\"True\", \"False\", \"Unknown\"). @@ -1371,74 +1359,94 @@ spec: \"Ready\", \"Error\"). type: string required: - - lastTransitionTime - status - type type: object type: array - nodes: - description: Nodes contains the list of the node status fulfilled. - items: - description: NodeStatus defines type for status of a node into cluster. - properties: - conditions: - description: Conditions contains the list of the node conditions - fulfilled. - items: - description: NodeCondition defines type for representing node - conditions. - properties: - lastTransitionTime: - description: The last time this Condition type changed. - format: date-time - type: string - status: - description: Status of the node, one of (\"True\", \"False\", - \"Unknown\"). - type: string - type: - description: Type of the node condition. - type: string - required: - - lastTransitionTime - - status - - type - type: object - type: array - message: - description: Full text reason for current status of the node. - type: string - name: - description: Name of the node. - type: string - raftStatus: - description: RaftStatus is the raft status of the node. + state: + description: State is the state of the mysqlcluster. + type: string + statefulSetStatus: + description: StatefulSetStatus is the status of the StatefulSet reconciled + by the mysqlcluster controller. + properties: + collisionCount: + description: collisionCount is the count of hash collisions for + the StatefulSet. The StatefulSet controller uses this field + as a collision avoidance mechanism when it needs to create the + name for the newest ControllerRevision. + format: int32 + type: integer + conditions: + description: Represents the latest available observations of a + statefulset's current state. + items: + description: StatefulSetCondition describes the state of a statefulset + at a certain point. properties: - leader: - description: Leader is the name of the Leader of the current - node. + lastTransitionTime: + description: Last time the condition transitioned from one + status to another. + format: date-time type: string - nodes: - description: Nodes is a list of nodes that can be identified - by the current node. - items: - type: string - type: array - role: - description: Role is one of (LEADER/CANDIDATE/FOLLOWER/IDLE/INVALID) + message: + description: A human readable message indicating details + about the transition. type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, + Unknown. + type: string + type: + description: Type of statefulset condition. + type: string + required: + - status + - type type: object - required: - - name - type: object - type: array - readyNodes: - description: ReadyNodes represents number of the nodes that are in - ready state. - type: integer - state: - description: State - type: string + type: array + currentReplicas: + description: currentReplicas is the number of Pods created by + the StatefulSet controller from the StatefulSet version indicated + by currentRevision. + format: int32 + type: integer + currentRevision: + description: currentRevision, if not empty, indicates the version + of the StatefulSet used to generate Pods in the sequence [0,currentReplicas). + type: string + observedGeneration: + description: observedGeneration is the most recent generation + observed for this StatefulSet. It corresponds to the StatefulSet's + generation, which is updated on mutation by the API Server. + format: int64 + type: integer + readyReplicas: + description: readyReplicas is the number of Pods created by the + StatefulSet controller that have a Ready Condition. + format: int32 + type: integer + replicas: + description: replicas is the number of Pods created by the StatefulSet + controller. + format: int32 + type: integer + updateRevision: + description: updateRevision, if not empty, indicates the version + of the StatefulSet used to generate Pods in the sequence [replicas-updatedReplicas,replicas) + type: string + updatedReplicas: + description: updatedReplicas is the number of Pods created by + the StatefulSet controller from the StatefulSet version indicated + by updateRevision. + format: int32 + type: integer + required: + - replicas + type: object type: object type: object served: true diff --git a/controllers/status_controller.go b/controllers/status_controller.go index 2aae60b9..ea30c0ac 100644 --- a/controllers/status_controller.go +++ b/controllers/status_controller.go @@ -23,12 +23,12 @@ import ( "time" "github.com/presslabs/controller-util/syncer" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" + // "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" + // "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -44,7 +44,7 @@ import ( ) // reconcileTimePeriod represents the time in which a cluster should be reconciled -var reconcileTimePeriod = time.Second * 5 +var reconcileTimePeriod = time.Second * 10 // StatusReconciler reconciles a Status object type StatusReconciler struct { @@ -96,7 +96,7 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr r.XenonExecutor.SetRootPassword(instance.Spec.MysqlOpts.RootPassword) - statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory, r.XenonExecutor) + statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client) if err := syncer.Sync(ctx, statusSyncer, r.Recorder); err != nil { return ctrl.Result{}, err } @@ -111,26 +111,7 @@ func (r *StatusReconciler) SetupWithManager(mgr ctrl.Manager) error { events := make(chan event.GenericEvent, 1024) bld := ctrl.NewControllerManagedBy(mgr). For(&apiv1alpha1.MysqlCluster{}). - Watches(&source.Kind{Type: &apiv1alpha1.MysqlCluster{}}, &handler.Funcs{ - CreateFunc: func(evt event.CreateEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { - log.Error(nil, "CreateEvent received with no metadata", "CreateEvent", evt) - return - } - - log.V(1).Info("register cluster in clusters list", "obj", evt.Object) - clusters.Store(getKey(evt.Object), event.GenericEvent(evt)) - }, - DeleteFunc: func(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { - log.Error(nil, "DeleteEvent received with no metadata", "DeleteEvent", evt) - return - } - - log.V(1).Info("remove cluster from clusters list", "obj", evt.Object) - clusters.Delete(getKey(evt.Object)) - }, - }). + Owns(&appsv1.StatefulSet{}). Watches(&source.Channel{Source: events}, &handler.EnqueueRequestForObject{}) // create a runnable function that dispatches events to events channel @@ -155,10 +136,3 @@ func (r *StatusReconciler) SetupWithManager(mgr ctrl.Manager) error { return bld.Complete(r) } -// getKey returns a string that represents the key under which cluster is registered -func getKey(obj klog.KMetadata) string { - return types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }.String() -} diff --git a/mysqlcluster/syncer/statefulset.go b/mysqlcluster/syncer/statefulset.go index 5f521506..f3fa9a8c 100644 --- a/mysqlcluster/syncer/statefulset.go +++ b/mysqlcluster/syncer/statefulset.go @@ -337,7 +337,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { // Check if the pod is healthy. err := wait.PollImmediate(time.Second*2, time.Second*30, func() (bool, error) { s.cli.Get(ctx, client.ObjectKeyFromObject(&pod), &pod) - if pod.ObjectMeta.Labels["healthy"] == "yes" { + if PodHealthy(&pod) { return true, nil } return false, nil @@ -378,7 +378,7 @@ func (s *StatefulSetSyncer) mutate() error { for k, v := range s.Spec.PodPolicy.Labels { s.sfs.Spec.Template.ObjectMeta.Labels[k] = v } - s.sfs.Spec.Template.ObjectMeta.Labels["role"] = string(utils.Candidate) + s.sfs.Spec.Template.ObjectMeta.Labels["role"] = string(utils.Follower) s.sfs.Spec.Template.ObjectMeta.Labels["healthy"] = "no" s.sfs.Spec.Template.Annotations = s.Spec.PodPolicy.Annotations @@ -495,7 +495,7 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err return fmt.Errorf("update revision is empty") } // Check version, if not latest, delete node. - if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { + if s.podIsUpdated(pod) { s.log.Info("pod is already updated", "pod name", pod.Name) } else { s.Status.State = apiv1alpha1.ClusterUpdateState @@ -508,7 +508,7 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err if pod.DeletionTimestamp != nil { return false, nil } - if pod.ObjectMeta.Labels["controller-revision-hash"] != s.sfs.Status.UpdateRevision { + if !s.podIsUpdated(pod) { if err := s.cli.Delete(ctx, pod); err != nil { return false, err } @@ -542,16 +542,14 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err return false, fmt.Errorf("pod %s is in failed phase", pod.Name) } - if pod.ObjectMeta.Labels["healthy"] == "yes" && - pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { + if PodHealthy(pod) && s.podIsUpdated(pod) { return true, nil } // fix issue#219. When 2->5 rolling update, Because of PDB, minAvaliable 50%, if Spec Replicas is 5, sfs Spec first be set to 3, then to be set 5 // pod healthy is yes,but controller-revision-hash will never correct, it must return,otherwise wait for 2 hours. // https://kubernetes.io/zh/docs/tasks/run-application/configure-pdb/ - if pod.ObjectMeta.Labels["healthy"] == "yes" && - pod.ObjectMeta.Labels["controller-revision-hash"] != s.sfs.Status.UpdateRevision { + if PodHealthy(pod) && !s.podIsUpdated(pod) { return false, fmt.Errorf("pod %s is ready, wait next schedule", pod.Name) } return false, nil @@ -622,3 +620,11 @@ func (s *StatefulSetSyncer) podsAllUpdated(ctx context.Context) bool { } return len(podlist.Items) == 0 } + +func (s *StatefulSetSyncer) podIsUpdated(pod *corev1.Pod) bool { + return pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision +} + +func PodHealthy(pod *corev1.Pod) bool { + return pod.ObjectMeta.Labels["healthy"] == "yes" +} diff --git a/mysqlcluster/syncer/status.go b/mysqlcluster/syncer/status.go index cea4b69c..3108edce 100644 --- a/mysqlcluster/syncer/status.go +++ b/mysqlcluster/syncer/status.go @@ -18,30 +18,31 @@ package syncer import ( "context" - "fmt" - "time" "github.com/presslabs/controller-util/syncer" - corev1 "k8s.io/api/core/v1" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/go-logr/logr" - apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" - "github.com/radondb/radondb-mysql-kubernetes/internal" + "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" "github.com/radondb/radondb-mysql-kubernetes/mysqlcluster" "github.com/radondb/radondb-mysql-kubernetes/utils" ) -// The max quantity of the statuses. -const maxStatusesQuantity = 10 +var ( + Initialized = v1alpha1.ClusterInitialized + AllReady = v1alpha1.ClusterAllReady + Avaliable = v1alpha1.ClusterAvaliable + InUpgrade = v1alpha1.ClusterInUpgrade -// The retry time for check node status. -const checkNodeStatusRetry = 3 + CondTrue = metav1.ConditionTrue + CondFalse = metav1.ConditionFalse + CondUnknown = metav1.ConditionUnknown +) // StatusSyncer used to update the status. type StatusSyncer struct { @@ -49,22 +50,25 @@ type StatusSyncer struct { cli client.Client - // Mysql query runner. - internal.SQLRunnerFactory - // XenonExecutor is used to execute Xenon HTTP instructions. - internal.XenonExecutor + statefulset *appsv1.StatefulSet // Logger log logr.Logger } // NewStatusSyncer returns a pointer to StatusSyncer. -func NewStatusSyncer(c *mysqlcluster.MysqlCluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory, xenonExecutor internal.XenonExecutor) *StatusSyncer { +func NewStatusSyncer(c *mysqlcluster.MysqlCluster, cli client.Client) *StatusSyncer { return &StatusSyncer{ - MysqlCluster: c, - cli: cli, - SQLRunnerFactory: sqlRunnerFactory, - XenonExecutor: xenonExecutor, - log: logf.Log.WithName("syncer.StatusSyncer"), + MysqlCluster: c, + cli: cli, + statefulset: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.GetNameForResource(utils.StatefulSet), + Namespace: c.Namespace, + }, + }, + // SQLRunnerFactory: sqlRunnerFactory, + // XenonExecutor: xenonExecutor, + log: logf.Log.WithName("syncer.StatusSyncer"), } } @@ -84,387 +88,37 @@ func (s *StatusSyncer) GetOwner() runtime.Object { return s.MysqlCluster } // Sync persists data into the external store. func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { - clusterCondition := s.updateClusterStatus() - - list := corev1.PodList{} - err := s.cli.List( - ctx, - &list, - &client.ListOptions{ - Namespace: s.Namespace, - LabelSelector: s.GetLabels().AsSelector(), - }, - ) - if err != nil { + // Get statefulset. + if err := s.cli.Get(ctx, client.ObjectKeyFromObject(s.statefulset), s.statefulset); err != nil { return syncer.SyncResult{}, err } - - // get ready nodes. - var readyNodes []corev1.Pod - for _, pod := range list.Items { - if pod.ObjectMeta.Labels[utils.LableRebuild] == "true" { - if err := s.AutoRebuild(ctx, &pod); err != nil { - s.log.Error(err, "failed to AutoRebuild", "pod", pod.Name, "namespace", pod.Namespace) - } - continue - } - for _, cond := range pod.Status.Conditions { - switch cond.Type { - case corev1.ContainersReady: - if cond.Status == corev1.ConditionTrue { - readyNodes = append(readyNodes, pod) - } - case corev1.PodScheduled: - if cond.Reason == corev1.PodReasonUnschedulable { - // When an error occurs, it is first recorded in the condition, - // but the cluster status is not updated immediately. - clusterCondition = apiv1alpha1.ClusterCondition{ - Type: apiv1alpha1.ConditionError, - Status: corev1.ConditionTrue, - LastTransitionTime: metav1.NewTime(time.Now()), - Reason: corev1.PodReasonUnschedulable, - Message: cond.Message, - } - } - } - } - } - - s.Status.ReadyNodes = len(readyNodes) - if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 { - if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil { - clusterCondition.Message = fmt.Sprintf("%s", err) - clusterCondition.Type = apiv1alpha1.ConditionError - } else { - s.Status.State = apiv1alpha1.ClusterReadyState - clusterCondition.Type = apiv1alpha1.ConditionReady - } - } - - if len(s.Status.Conditions) == 0 { - s.Status.Conditions = append(s.Status.Conditions, clusterCondition) - } else { - lastCond := s.Status.Conditions[len(s.Status.Conditions)-1] - if lastCond.Type != clusterCondition.Type { - s.Status.Conditions = append(s.Status.Conditions, clusterCondition) - } - } - if len(s.Status.Conditions) > maxStatusesQuantity { - s.Status.Conditions = s.Status.Conditions[len(s.Status.Conditions)-maxStatusesQuantity:] - } - - // Update all nodes' status. - return syncer.SyncResult{}, s.updateNodeStatus(ctx, s.cli, list.Items) -} - -// updateClusterStatus update the cluster status and returns condition. -func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition { - clusterCondition := apiv1alpha1.ClusterCondition{ - Type: apiv1alpha1.ConditionInit, - Status: corev1.ConditionTrue, - LastTransitionTime: metav1.NewTime(time.Now()), - } - - oldState := s.Status.State - // If the state does not exist, the cluster is being initialized. - if oldState == "" { - s.Status.State = apiv1alpha1.ClusterInitState - return clusterCondition - } - // If the expected number of replicas and the actual number - // of replicas are both 0, the cluster has been closed. - if int(*s.Spec.Replicas) == 0 && s.Status.ReadyNodes == 0 { - clusterCondition.Type = apiv1alpha1.ConditionClose - s.Status.State = apiv1alpha1.ClusterCloseState - return clusterCondition - } - // When the cluster is ready or closed, the number of replicas changes, - // indicating that the cluster is updating nodes. - if oldState == apiv1alpha1.ClusterReadyState || oldState == apiv1alpha1.ClusterCloseState { - if int(*s.Spec.Replicas) > s.Status.ReadyNodes { - clusterCondition.Type = apiv1alpha1.ConditionScaleOut - s.Status.State = apiv1alpha1.ClusterScaleOutState - return clusterCondition - } else if int(*s.Spec.Replicas) < s.Status.ReadyNodes { - clusterCondition.Type = apiv1alpha1.ConditionScaleIn - s.Status.State = apiv1alpha1.ClusterScaleInState - return clusterCondition - } - } - - clusterCondition.Type = apiv1alpha1.ClusterConditionType(oldState) - return clusterCondition -} - -// Rebuild Pod by deleting and creating it. -// Notice: This function just delete Pod and PVC, -// then after k8s recreate pod, it will clone and initial it. -func (s *StatusSyncer) AutoRebuild(ctx context.Context, pod *corev1.Pod) error { - ordinal, err := utils.GetOrdinal(pod.Name) - if err != nil { - return err - - } - // Set Pod UnHealthy. - pod.Labels["healthy"] = "no" - if err := s.cli.Update(ctx, pod); err != nil { - return err - } - // Delete the Pod. - if err := s.cli.Delete(ctx, pod); err != nil { - return err - } - // Delete the pvc. - pvcName := fmt.Sprintf("%s-%s-%d", utils.DataVolumeName, - s.GetNameForResource(utils.StatefulSet), ordinal) - pvc := corev1.PersistentVolumeClaim{} - - if err := s.cli.Get(ctx, - types.NamespacedName{Name: pvcName, Namespace: s.Namespace}, - &pvc); err != nil { - return err - } - if err := s.cli.Delete(ctx, &pvc); err != nil { - return err - } - return nil -} - -// updateNodeStatus update the node status. -func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error { - closeCh := make(chan func()) - for _, pod := range pods { - podName := pod.Name - host := fmt.Sprintf("%s.%s.%s", podName, s.GetNameForResource(utils.HeadlessSVC), s.Namespace) - index := s.getNodeStatusIndex(host) - node := &s.Status.Nodes[index] - node.Message = "" - - if err := s.updateNodeRaftStatus(node); err != nil { - s.log.V(1).Info("failed to get/update node raft status", "node", node.Name, "error", err) - node.Message = err.Error() - } - - isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown - var sqlRunner internal.SQLRunner - var closeConn func() - errCh := make(chan error) - go func(sqlRunner *internal.SQLRunner, errCh chan error, closeCh chan func()) { - var err error - *sqlRunner, closeConn, err = s.SQLRunnerFactory(internal.NewConfigFromClusterKey( - s.cli, s.MysqlCluster.GetClusterKey(), host)) - if err != nil { - s.log.V(1).Info("failed to get sql runner", "node", node.Name, "error", err) - errCh <- err - return - } - if closeConn != nil { - closeCh <- closeConn - return - } - errCh <- nil - }(&sqlRunner, errCh, closeCh) - - var err error - select { - case <-errCh: - case closeConn := <-closeCh: - defer closeConn() - case <-time.After(time.Second * 5): - } - if sqlRunner != nil { - isLagged, isReplicating, err = internal.CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry) - if err != nil { - s.log.V(1).Info("failed to check slave status", "node", node.Name, "error", err) - node.Message = err.Error() - } - - isReadOnly, err = internal.CheckReadOnly(sqlRunner) - if err != nil { - s.log.V(1).Info("failed to check read only", "node", node.Name, "error", err) - node.Message = err.Error() - } - - if !utils.ExistUpdateFile() && - node.RaftStatus.Role == string(utils.Leader) && - isReadOnly != corev1.ConditionFalse { - s.log.V(1).Info("try to correct the leader writeable", "node", node.Name) - sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL read_only=off")) - sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=off")) - } - } - - // update apiv1alpha1.NodeConditionLagged. - s.updateNodeCondition(node, int(apiv1alpha1.IndexLagged), isLagged) - // update apiv1alpha1.NodeConditionReplicating. - s.updateNodeCondition(node, int(apiv1alpha1.IndexReplicating), isReplicating) - // update apiv1alpha1.NodeConditionReadOnly. - s.updateNodeCondition(node, int(apiv1alpha1.IndexReadOnly), isReadOnly) - - if err = s.updatePodLabel(ctx, &pod, node); err != nil { - s.log.V(1).Info("failed to update labels", "pod", pod.Name, "error", err) - } - } - - // Delete node status of nodes that have been deleted. - if len(s.Status.Nodes) > len(pods) { - s.Status.Nodes = s.Status.Nodes[:len(pods)] - } - return nil -} - -// getNodeStatusIndex get the node index in the status. -func (s *StatusSyncer) getNodeStatusIndex(name string) int { - len := len(s.Status.Nodes) - for i := 0; i < len; i++ { - if s.Status.Nodes[i].Name == name { - return i - } - } - - lastTransitionTime := metav1.NewTime(time.Now()) - status := apiv1alpha1.NodeStatus{ - Name: name, - Conditions: []apiv1alpha1.NodeCondition{ - { - Type: apiv1alpha1.NodeConditionLagged, - Status: corev1.ConditionUnknown, - LastTransitionTime: lastTransitionTime, - }, - { - Type: apiv1alpha1.NodeConditionLeader, - Status: corev1.ConditionUnknown, - LastTransitionTime: lastTransitionTime, - }, - { - Type: apiv1alpha1.NodeConditionReadOnly, - Status: corev1.ConditionUnknown, - LastTransitionTime: lastTransitionTime, - }, - { - Type: apiv1alpha1.NodeConditionReplicating, - Status: corev1.ConditionUnknown, - LastTransitionTime: lastTransitionTime, - }, + // Set the status. + s.Status.SetStatefulSetStatus(&s.statefulset.Status) + + // TODO: conditions + s.Status.SetConditions([]v1alpha1.MySQLClusterCondition{ + { + Type: Initialized, + Status: CondTrue, }, - } - s.Status.Nodes = append(s.Status.Nodes, status) - return len -} - -// updateNodeCondition update the node condition. -func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int, status corev1.ConditionStatus) { - if node.Conditions[idx].Status != status { - t := time.Now() - s.log.V(3).Info(fmt.Sprintf("Found status change for node %q condition %q: %q -> %q; setting lastTransitionTime to %v", - node.Name, node.Conditions[idx].Type, node.Conditions[idx].Status, status, t)) - node.Conditions[idx].Status = status - node.Conditions[idx].LastTransitionTime = metav1.NewTime(t) - } -} - -// updateNodeRaftStatus Update Node RaftStatus. -func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error { - isLeader := corev1.ConditionFalse - node.RaftStatus = apiv1alpha1.RaftStatus{ - Role: string(utils.Unknown), - Leader: "UNKNOWN", - Nodes: nil, - } - - raftStatus, err := s.XenonExecutor.RaftStatus(node.Name) - if err == nil && raftStatus != nil { - node.RaftStatus = *raftStatus - if raftStatus.Role == string(utils.Leader) { - isLeader = corev1.ConditionTrue - } - } - - // update apiv1alpha1.NodeConditionLeader. - s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader) - return err -} - -func (s *StatusSyncer) reconcileXenon(readyNodes int) error { - expectXenonNodes := s.getExpectXenonNodes(readyNodes) - for _, nodeStatus := range s.Status.Nodes { - toRemove := utils.StringDiffIn(nodeStatus.RaftStatus.Nodes, expectXenonNodes) - if err := s.removeNodesFromXenon(nodeStatus.Name, toRemove); err != nil { - return err - } - toAdd := utils.StringDiffIn(expectXenonNodes, nodeStatus.RaftStatus.Nodes) - if err := s.addNodesInXenon(nodeStatus.Name, toAdd); err != nil { - return err - } - } - return nil -} - -func (s *StatusSyncer) getExpectXenonNodes(readyNodes int) []string { - expectXenonNodes := []string{} - for i := 0; i < readyNodes; i++ { - expectXenonNodes = append(expectXenonNodes, fmt.Sprintf("%s:%d", s.GetPodHostName(i), utils.XenonPort)) - } - return expectXenonNodes -} - -func (s *StatusSyncer) removeNodesFromXenon(host string, toRemove []string) error { - if err := s.XenonExecutor.XenonPing(host); err != nil { - return err - } - for _, removeHost := range toRemove { - if err := s.XenonExecutor.ClusterRemove(host, removeHost); err != nil { - return err - } - } - return nil -} + { + Type: AllReady, + Status: CondTrue, + }, + { + Type: Avaliable, + Status: CondTrue, + }, + { + Type: InUpgrade, + Status: CondTrue, + }, + }) -func (s *StatusSyncer) addNodesInXenon(host string, toAdd []string) error { - if err := s.XenonExecutor.XenonPing(host); err != nil { - return err - } - for _, addHost := range toAdd { - if err := s.XenonExecutor.ClusterAdd(host, addHost); err != nil { - return err - } - } - return nil -} + // TODO: state + s.Status.SetState(v1alpha1.ClusterReadyState) -// updatePodLabel update the pod lables. -func (s *StatusSyncer) updatePodLabel(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error { - oldPod := pod.DeepCopy() - healthy := "no" - isPodLabelsUpdated := false - if node.Conditions[apiv1alpha1.IndexLagged].Status == corev1.ConditionFalse { - if node.Conditions[apiv1alpha1.IndexLeader].Status == corev1.ConditionFalse && - node.Conditions[apiv1alpha1.IndexReadOnly].Status == corev1.ConditionTrue && - node.Conditions[apiv1alpha1.IndexReplicating].Status == corev1.ConditionTrue { - healthy = "yes" - } else if node.Conditions[apiv1alpha1.IndexLeader].Status == corev1.ConditionTrue && - node.Conditions[apiv1alpha1.IndexReplicating].Status == corev1.ConditionFalse && - node.Conditions[apiv1alpha1.IndexReadOnly].Status == corev1.ConditionFalse { - healthy = "yes" - } - } - if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning { - healthy = "no" - node.RaftStatus.Role = string(utils.Unknown) - } + // TODO: rebuild - if pod.Labels["healthy"] != healthy { - pod.Labels["healthy"] = healthy - isPodLabelsUpdated = true - } - if pod.Labels["role"] != node.RaftStatus.Role { - pod.Labels["role"] = node.RaftStatus.Role - isPodLabelsUpdated = true - } - if isPodLabelsUpdated { - if err := s.cli.Patch(ctx, pod, client.MergeFrom(oldPod)); client.IgnoreNotFound(err) != nil { - return err - } - } - return nil + return syncer.SyncResult{}, nil } From 46ed5e6885501a2d267d7170bed6559e0ad62f75 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Wed, 3 Aug 2022 18:24:39 +0800 Subject: [PATCH 2/2] refactor(cluster): move mysql check to probe. --- build/mysql/Dockerfile | 14 +- build/xenon/Dockerfile | 3 + cmd/mysql/main.go | 138 +++++++++++++++++ cmd/xenon/main.go | 74 +++++---- go.mod | 1 - internal/incluster.go | 255 ++++++++++++++++++++++++++++++++ internal/xenon_executor.go | 8 +- mysqlcluster/container/mysql.go | 38 +++-- mysqlcluster/container/xenon.go | 10 +- mysqlcluster/syncer/role.go | 2 +- sidecar/config.go | 44 ------ utils/incluster.go | 81 ---------- 12 files changed, 486 insertions(+), 182 deletions(-) create mode 100644 cmd/mysql/main.go create mode 100644 internal/incluster.go delete mode 100644 utils/incluster.go diff --git a/build/mysql/Dockerfile b/build/mysql/Dockerfile index 94e101c9..4ff543a8 100644 --- a/build/mysql/Dockerfile +++ b/build/mysql/Dockerfile @@ -12,15 +12,15 @@ RUN if [ $(date +%z) = "+0800" ] ; then go env -w GOPROXY=https://goproxy.cn,dir RUN go mod download # Copy the go source -COPY cmd/healthcheck/main.go cmd/healthcheck/main.go -COPY utils/ utils/ -COPY internal/ internal/ -# COPY api/ api/ -# COPY mysqlcluster/ mysqlcluster/ +COPY cmd/mysql/main.go main.go +COPY internal/ internal/ +COPY api/ api/ +COPY utils/ /utils/ +COPY mysqlcluster/ mysqlcluster/ # Build -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o bin/healthcheck cmd/healthcheck/main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o bin/mysqlchecker main.go FROM percona/percona-server:5.7.34 -COPY --from=builder /bin/healthcheck /healthcheck \ No newline at end of file +COPY --from=builder /bin/mysqlchecker /mysqlchecker \ No newline at end of file diff --git a/build/xenon/Dockerfile b/build/xenon/Dockerfile index 268ed3dc..8c692f82 100644 --- a/build/xenon/Dockerfile +++ b/build/xenon/Dockerfile @@ -18,6 +18,9 @@ WORKDIR /workspace # Copy the go source COPY cmd/xenon/main.go cmd/xenon/main.go COPY utils/ utils/ +COPY api/ api/ +COPY internal/ internal/ +COPY mysqlcluster/ mysqlcluster/ COPY go.mod go.mod COPY go.sum go.sum RUN go env -w GO111MODULE=on && go mod download diff --git a/cmd/mysql/main.go b/cmd/mysql/main.go new file mode 100644 index 00000000..4d5c86d4 --- /dev/null +++ b/cmd/mysql/main.go @@ -0,0 +1,138 @@ +/* +Copyright 2022 RadonDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "os" + + log "github.com/sirupsen/logrus" + + "github.com/radondb/radondb-mysql-kubernetes/internal" +) + +var ( + ns string + podName string + + newMySQLChecker = internal.NewMysqlChecker + getMyRole = internal.GetMyRole + getMyHealthy = internal.GetMyHealthy +) + +const ( + mysqlUser = "root" + mysqlHost = "127.0.0.1" + mysqlPort = 3306 + mysqlPwd = "" +) + +func init() { + ns = os.Getenv("NAMESPACE") + podName = os.Getenv("POD_NAME") +} + +func main() { + if len(os.Args) < 2 { + log.Fatalf("Usage: %s liveness|readiness|command|startup", os.Args[0]) + } + switch os.Args[1] { + case "liveness": + if err := liveness(); err != nil { + log.Fatalf("liveness failed: %s", err.Error()) + } + log.Info("node is liveness") + case "readiness": + if err := readiness(); err != nil { + log.Fatalf("readiness failed: %s", err.Error()) + } + log.Info("node is readiness") + case "startup": + if err := startup(); err != nil { + log.Fatalf("readiness failed: %s", err.Error()) + } + case "command": + if err := command(); err != nil { + log.Fatalf("start failed: %s", err.Error()) + } + default: + log.Fatalf("Usage: %s liveness|readiness|command|startup", os.Args[0]) + } +} + +func liveness() error { + if internal.SleepForever() { + log.Infof("sleep-forever is set, skip readiness check") + return nil + } + sqlrunner, closeFn, err := internal.NewSQLRunner(localMySQLConfig()) + if err != nil { + return err + } + defer closeFn() + + mc := newMySQLChecker(sqlrunner, localClientOptions()) + return mc.CheckMySQLLiveness() +} + +func readiness() error { + if internal.SleepForever() { + log.Infof("sleep-forever is set, skip readiness check") + return nil + } + + sqlrunner, closeFn, err := internal.NewSQLRunner(localMySQLConfig()) + if err != nil { + log.Errorf("failed to create sqlrunner: %s", err.Error()) + } + defer closeFn() + + mc := newMySQLChecker(sqlrunner, localClientOptions()) + labels := mc.ReadLabels() + role, oldHealthy := getMyRole(labels), getMyHealthy(labels) == "yes" + log.Infof("role: %s, healthy: %t", role, oldHealthy) + + if newHealthy := mc.CheckMySQLHealthy(role); newHealthy != oldHealthy { + healthyLebel := internal.ConvertHealthy(newHealthy) + log.Infof("patch healthy label: %s", healthyLebel) + + if err := mc.PatchLabel("healthy", healthyLebel); err != nil { + log.Errorf("failed to patch healthy label: %s", err.Error()) + } + } + return nil +} + +func command() error { + return nil +} + +func startup() error { + return nil +} + +func localMySQLConfig() *internal.Config { + return &internal.Config{ + User: mysqlUser, + Host: mysqlHost, + Port: mysqlPort, + Password: mysqlPwd, + } +} + +func localClientOptions() *internal.ClientOptions { + return &internal.ClientOptions{NameSpace: ns, PodName: podName} +} diff --git a/cmd/xenon/main.go b/cmd/xenon/main.go index 0547a458..2c0691ee 100644 --- a/cmd/xenon/main.go +++ b/cmd/xenon/main.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 RadonDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package main import ( @@ -14,12 +30,12 @@ import ( "time" _ "github.com/go-sql-driver/mysql" + "github.com/radondb/radondb-mysql-kubernetes/internal" + "github.com/radondb/radondb-mysql-kubernetes/utils" . "github.com/radondb/radondb-mysql-kubernetes/utils" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -114,13 +130,21 @@ func main() { } } -// TODO func leaderStart() error { + log.Info("leader start") + cli := internal.NewInclusterClient(localClientOptions()) + if err := updateRoleLabel(cli, string(utils.Leader)); err != nil { + return err + } return nil } func leaderStop() error { log.Infof("leader stop started") + cli := internal.NewInclusterClient(localClientOptions()) + if err := updateRoleLabel(cli, string(utils.Follower)); err != nil { + return err + } conn, err := getLocalMySQLConn() if err != nil { return fmt.Errorf("failed to get the connection of local MySQL: %s", err.Error()) @@ -208,13 +232,17 @@ func leaderStop() error { } func liveness() error { - return XenonPingMyself() + log.Info("liveness probe starts") + return internal.XenonPingMyself() } +// TODO: readiness fail sometimes. func readiness() error { - role := GetRole() + log.Info("readiness probe starts") + role := internal.GetRole() if role != string(Leader) { - return PatchRoleLabelTo(myself(role)) + cli := internal.NewInclusterClient(localClientOptions()) + return updateRoleLabel(cli, role) } return nil } @@ -281,7 +309,8 @@ func postStart() error { if !isSubset { // Step 5: Rebuild me log.Infof("mySet is not a subset of leaderSet, rebuild me, mySet: %s, leaderSet: %s", mySet, leaderSet) - if err := PatchRebuildLabelTo(myself("")); err != nil { + cli := internal.NewInclusterClient(localClientOptions()) + if err := rebuildMe(cli); err != nil { return err } return nil @@ -298,12 +327,8 @@ func preStop() error { return nil } -func myself(role string) MySQLNode { - return MySQLNode{ - PodName: podName, - Namespace: ns, - Role: role, - } +func localClientOptions() *internal.ClientOptions { + return &internal.ClientOptions{NameSpace: ns, PodName: podName} } func enableAutoRebuild() bool { @@ -477,7 +502,7 @@ func disableMyRaft() error { func WaitXenonAvailable(timeout time.Duration) error { err := wait.PollImmediate(2*time.Second, timeout, func() (bool, error) { - if err := XenonPingMyself(); err != nil { + if err := internal.XenonPingMyself(); err != nil { return false, nil } return true, nil @@ -545,23 +570,18 @@ func getLeader() string { return "" } -func PatchRebuildLabelTo(n MySQLNode) error { - patch := `{"metadata":{"labels":{"rebuild":"true"}}}` - err := patchPodLabel(n, patch) - if err != nil { - return fmt.Errorf("failed to patch pod rebuild label: %s", err.Error()) +func updateRoleLabel(cli internal.Client, role string) error { + log.Infof("updating role label to %s", role) + if err := cli.PatchLabel("role", role); err != nil { + return fmt.Errorf("failed to update role label: %s", err.Error()) } return nil } -func patchPodLabel(n MySQLNode, patch string) error { - clientset, err := GetClientSet() - if err != nil { - return fmt.Errorf("failed to create clientset: %s", err.Error()) - } - _, err = clientset.CoreV1().Pods(n.Namespace).Patch(context.TODO(), n.PodName, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) - if err != nil { - return err +func rebuildMe(cli internal.Client) error { + log.Info("rebuilding me") + if err := cli.PatchLabel("rebuild", "true"); err != nil { + return fmt.Errorf("failed to patch label rebuild: %s", err.Error()) } return nil } diff --git a/go.mod b/go.mod index ba247d3f..04ea47d2 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,5 @@ require ( k8s.io/api v0.21.3 k8s.io/apimachinery v0.21.3 k8s.io/client-go v0.21.3 - k8s.io/klog/v2 v2.8.0 sigs.k8s.io/controller-runtime v0.9.5 ) diff --git a/internal/incluster.go b/internal/incluster.go new file mode 100644 index 00000000..0aeb3bb6 --- /dev/null +++ b/internal/incluster.go @@ -0,0 +1,255 @@ +package internal + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/radondb/radondb-mysql-kubernetes/utils" +) + +type MySQLChecker interface { + CheckMySQLHealthy(role string) bool + CheckMySQLLiveness() error + Client +} + +// mysqlChecker is a implementation of MySQLChecker, it checks the local mysql and updates the POD labels. +type mysqlChecker struct { + SQLRunner + Client +} + +type Client interface { + ReadLabels() map[string]string + PatchLabel(k, v string) error +} + +// inClusterClient is a implementation of client, it operates the pod itself through the clientSet. +type inClusterClient struct { + *kubernetes.Clientset + ClientOptions +} + +type ClientOptions struct { + NameSpace string + PodName string +} + +type RaftStatus struct { + Leader string `json:"leader"` + State string `json:"state"` + Nodes []string `json:"nodes"` +} + +func GetInclusterClientSet() *kubernetes.Clientset { + // creates the in-cluster config + config, err := rest.InClusterConfig() + if err != nil { + log.Fatalf("failed to create InClusterConfig: %s", err.Error()) + } + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatalf("failed to get clientset: %s", err.Error()) + } + return clientset +} + +func XenonPingMyself() error { + args := []string{"xenon", "ping"} + cmd := exec.Command("xenoncli", args...) + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to exec xenoncli xenon ping: %v", err) + } + return nil +} + +func GetRaftStatus() *RaftStatus { + args := []string{"raft", "status"} + cmd := exec.Command("xenoncli", args...) + res, err := cmd.Output() + if err != nil { + log.Fatalf("failed to exec xenoncli raft status: %v", err) + } + raftStatus := RaftStatus{} + if err := json.Unmarshal(res, &raftStatus); err != nil { + log.Fatalf("failed to unmarshal raft status: %v", err) + } + return &raftStatus +} + +func GetRole() string { + return GetRaftStatus().State +} + +func NewInclusterClient(options *ClientOptions) *inClusterClient { + return &inClusterClient{ + GetInclusterClientSet(), + *options, + } +} + +func (c *inClusterClient) PatchLabel(k, v string) error { + pod, err := c.Clientset.CoreV1().Pods(c.NameSpace).Get(context.TODO(), c.PodName, metav1.GetOptions{}) + if err != nil { + return err + } + pod.Labels[k] = v + _, err = c.Clientset.CoreV1().Pods(c.NameSpace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + return err +} + +func (c *inClusterClient) ReadLabels() map[string]string { + pod, err := c.Clientset.CoreV1().Pods(c.NameSpace).Get(context.TODO(), c.PodName, metav1.GetOptions{}) + if err != nil { + log.Errorf("failed to get pod: %s", err.Error()) + return nil + } + return pod.Labels +} + +func GetMyRole(labels map[string]string) string { + if role, ok := labels["role"]; ok { + return role + } + return string(utils.Unknown) +} + +func GetMyHealthy(labels map[string]string) string { + if health, ok := labels["healthy"]; ok { + return health + } + return "no" +} + +func ConvertHealthy(isHealthy bool) string { + if isHealthy { + return "yes" + } + return "no" +} + +func NewMysqlChecker(s SQLRunner, options *ClientOptions) MySQLChecker { + return &mysqlChecker{ + s, + NewInclusterClient(options), + } +} + +func SleepForever() bool { + _, err := os.Stat("/var/lib/mysql/sleep-forever") + if os.IsNotExist(err) { + return false + } + if err != nil { + log.Errorf("failed to check sleep-forever: %s", err.Error()) + return false + } + return true +} + +func (m *mysqlChecker) CheckMySQLLiveness() error { + return m.select1() +} + +func (m *mysqlChecker) select1() error { + if _, err := m.QueryRows(NewQuery("select 1;")); err != nil { + return err + } + return nil +} + +func (m *mysqlChecker) CheckMySQLHealthy(role string) bool { + res := m.checkMySQL() + switch role { + case string(utils.Leader): + return m.checkLeader(res) + case string(utils.Follower): + return m.checkFollower(res) + default: + log.Infof("check %s", role) + return m.isReadonly() + } +} + +func (m *mysqlChecker) isReadonly() bool { + var readOnly uint8 + if err := m.QueryRow(NewQuery("select @@global.read_only;"), &readOnly); err != nil { + log.Errorf("failed to get read_only: %s", err.Error()) + return false + } + return readOnly == 1 +} + +type checkResult struct { + readOnly, isLagged, isReplicating bool +} + +func (m *mysqlChecker) checkMySQL() checkResult { + isLagged, isReplicating := m.showSlaveStatus() + return checkResult{ + readOnly: m.isReadonly(), + isLagged: isLagged, + isReplicating: isReplicating, + } +} + +func (m *mysqlChecker) checkLeader(res checkResult) bool { + log.Infof("check leader, readonly: %t, isLagged: %t, isReplicating: %t", res.readOnly, res.isLagged, res.isReplicating) + + if !m.existUpdateFile() && res.readOnly { + log.Errorf("im leader but read_only is on") + if err := m.setGlobalReadOnly(false); err != nil { + log.Errorf("failed to set read_only to off: %s", err.Error()) + return false + } + } + return !res.readOnly && !res.isReplicating +} + +func (m *mysqlChecker) checkFollower(res checkResult) bool { + log.Infof("check follower, readonly: %t, isLagged: %t, isReplicating: %t", res.readOnly, res.isLagged, res.isReplicating) + return res.readOnly && !res.isLagged && res.isReplicating +} + +func (m *mysqlChecker) showSlaveStatus() (bool, bool) { + lagged, replicating, err := CheckSlaveStatusWithRetry(m.SQLRunner, 3) + if err != nil { + log.Errorf("failed to show slave status: %s", err.Error()) + return false, false + } + isLagged := lagged == corev1.ConditionTrue + isReplicating := replicating == corev1.ConditionTrue + return isLagged, isReplicating +} + +func (m *mysqlChecker) setGlobalReadOnly(on bool) error { + option := "OFF" + if on { + option = "ON" + } + log.Infof("try to turn %s read_only", option) + + if err := m.SQLRunner.QueryExec(NewQuery("SET GLOBAL read_only==?;", option)); err != nil { + log.Errorf("failed to set global read_only: %s", err.Error()) + return err + } + if err := m.SQLRunner.QueryExec(NewQuery("SET GLOBAL super_read_only==?;", option)); err != nil { + log.Errorf("failed to set global super_read_only: %s", err.Error()) + return err + } + return nil +} + +func (m *mysqlChecker) existUpdateFile() bool { + return utils.ExistUpdateFile() +} diff --git a/internal/xenon_executor.go b/internal/xenon_executor.go index d5276827..4e88a95c 100644 --- a/internal/xenon_executor.go +++ b/internal/xenon_executor.go @@ -20,7 +20,7 @@ import ( "fmt" "net/http" - apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" + // apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" "github.com/radondb/radondb-mysql-kubernetes/utils" ) @@ -32,7 +32,7 @@ type xenonExecutor struct { type XenonExecutor interface { GetRootPassword() string SetRootPassword(rootPassword string) - RaftStatus(host string) (*apiv1alpha1.RaftStatus, error) + RaftStatus(host string) (*RaftStatus, error) XenonPing(host string) error RaftTryToLeader(host string) error ClusterAdd(host string, toAdd string) error @@ -52,7 +52,7 @@ func (executor *xenonExecutor) SetRootPassword(rootPassword string) { } // RaftStatus gets the raft status of incoming host through http. -func (executor *xenonExecutor) RaftStatus(host string) (*apiv1alpha1.RaftStatus, error) { +func (executor *xenonExecutor) RaftStatus(host string) (*RaftStatus, error) { req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.rootPassword, utils.RaftStatus, nil)) if err != nil { return nil, err @@ -73,7 +73,7 @@ func (executor *xenonExecutor) RaftStatus(host string) (*apiv1alpha1.RaftStatus, for _, node := range nodesJson { nodes = append(nodes, node.(string)) } - return &apiv1alpha1.RaftStatus{Role: out["state"].(string), Leader: out["leader"].(string), Nodes: nodes}, nil + return &RaftStatus{State: out["state"].(string), Leader: out["leader"].(string), Nodes: nodes}, nil } // RaftTryToLeader try setting up incoming host to the leader node. diff --git a/mysqlcluster/container/mysql.go b/mysqlcluster/container/mysql.go index ddfb42fe..f877efca 100644 --- a/mysqlcluster/container/mysql.go +++ b/mysqlcluster/container/mysql.go @@ -53,16 +53,34 @@ func (c *mysql) getCommand() []string { // getEnvVars get the container env. func (c *mysql) getEnvVars() []corev1.EnvVar { - if c.Spec.MysqlOpts.InitTokuDB { - return []corev1.EnvVar{ - { - Name: "INIT_TOKUDB", - Value: "1", + envVar := []corev1.EnvVar{ + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, }, - } + }, + } + if c.Spec.MysqlOpts.InitTokuDB { + envVar = append(envVar, corev1.EnvVar{ + Name: "INIT_TOKUDB", + Value: "1", + }) } - return nil + return envVar } // getLifecycle get the container lifecycle. @@ -98,7 +116,7 @@ func (c *mysql) getProbeSet() *ProbeSet { Command: []string{ "sh", "-c", - "if [ -f '/var/lib/mysql/sleep-forever' ] ;then exit 0 ; fi; pgrep mysqld", + "/mysqlchecker liveness", }, }, }, @@ -114,11 +132,11 @@ func (c *mysql) getProbeSet() *ProbeSet { Command: []string{ "sh", "-c", - `if [ -f '/var/lib/mysql/sleep-forever' ] ;then exit 0 ; fi; test $(mysql -usuper -h127.0.0.1 -NB -e "SELECT 1") -eq 1`, + `/mysqlchecker readiness`, }, }, }, - InitialDelaySeconds: 10, + InitialDelaySeconds: 15, TimeoutSeconds: 5, PeriodSeconds: 10, SuccessThreshold: 1, diff --git a/mysqlcluster/container/xenon.go b/mysqlcluster/container/xenon.go index 024e1fbb..347066fb 100644 --- a/mysqlcluster/container/xenon.go +++ b/mysqlcluster/container/xenon.go @@ -123,11 +123,7 @@ func (c *xenon) getProbeSet() *ProbeSet { LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{ - "sh", - "-c", - "pgrep xenon && xenoncli xenon ping", - }, + Command: []string{"sh", "-c", "/xenonchecker liveness"}, }, }, InitialDelaySeconds: 30, @@ -139,14 +135,14 @@ func (c *xenon) getProbeSet() *ProbeSet { ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{"sh", "-c", "xenoncli xenon ping"}, + Command: []string{"sh", "-c", "/xenonchecker readiness"}, }, }, InitialDelaySeconds: 10, TimeoutSeconds: 5, PeriodSeconds: 10, SuccessThreshold: 1, - FailureThreshold: 3, + FailureThreshold: 6, }, StartupProbe: nil, } diff --git a/mysqlcluster/syncer/role.go b/mysqlcluster/syncer/role.go index 26e236e9..d1fa9547 100644 --- a/mysqlcluster/syncer/role.go +++ b/mysqlcluster/syncer/role.go @@ -42,7 +42,7 @@ func NewRoleSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) syncer.Inter return syncer.NewObjectSyncer("Role", c.Unwrap(), role, cli, func() error { role.Rules = []rbacv1.PolicyRule{ { - Verbs: []string{"get", "patch"}, + Verbs: []string{"get", "patch", "update"}, APIGroups: []string{""}, Resources: []string{"pods"}, }, diff --git a/sidecar/config.go b/sidecar/config.go index e387ddd9..d345ea60 100644 --- a/sidecar/config.go +++ b/sidecar/config.go @@ -427,50 +427,6 @@ INSTALL PLUGIN validate_password SONAME 'validate_password.so'; return utils.StringToBytes(sql) } -// // buildClientConfig used to build client.conf. -// func (cfg *Config) buildClientConfig() (*ini.File, error) { -// conf := ini.Empty() -// sec := conf.Section("client") - -// if _, err := sec.NewKey("host", "127.0.0.1"); err != nil { -// return nil, err -// } - -// if _, err := sec.NewKey("port", fmt.Sprintf("%d", utils.MysqlPort)); err != nil { -// return nil, err -// } - -// if _, err := sec.NewKey("user", cfg.OperatorUser); err != nil { -// return nil, err -// } - -// if _, err := sec.NewKey("password", cfg.OperatorPassword); err != nil { -// return nil, err -// } - -// return conf, nil -// } - -// // buildLeaderStart build the leader-start.sh. -// func (cfg *Config) buildLeaderStart() []byte { -// str := fmt.Sprintf(`#!/usr/bin/env bash -// curl -X PATCH -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" -H "Content-Type: application/json-patch+json" \ -// --cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/api/v1/namespaces/%s/pods/$HOSTNAME \ -// -d '[{"op": "replace", "path": "/metadata/labels/role", "value": "leader"}]' -// `, cfg.NameSpace) -// return utils.StringToBytes(str) -// } - -// // buildLeaderStop build the leader-stop.sh. -// func (cfg *Config) buildLeaderStop() []byte { -// str := fmt.Sprintf(`#!/usr/bin/env bash -// curl -X PATCH -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" -H "Content-Type: application/json-patch+json" \ -// --cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/api/v1/namespaces/%s/pods/$HOSTNAME \ -// -d '[{"op": "replace", "path": "/metadata/labels/role", "value": "follower"}]' -// `, cfg.NameSpace) -// return utils.StringToBytes(str) -// } - /* The function is equivalent to the following shell script template: #!/bin/sh if [ ! -d {{.DataDir}} ] ; then diff --git a/utils/incluster.go b/utils/incluster.go deleted file mode 100644 index 2fc738e4..00000000 --- a/utils/incluster.go +++ /dev/null @@ -1,81 +0,0 @@ -package utils - -import ( - "context" - "encoding/json" - "fmt" - "log" - "os/exec" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" -) - -type raftStatus struct { - Leader string `json:"leader"` - State string `json:"state"` - Nodes []string `json:"nodes"` -} - -type MySQLNode struct { - PodName string - Namespace string - Role string -} - -func GetClientSet() (*kubernetes.Clientset, error) { - // Creates the in-cluster config - config, err := rest.InClusterConfig() - if err != nil { - return nil, fmt.Errorf("failed to create in-cluster config: %v", err) - } - // Creates the clientset - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, fmt.Errorf("failed to create clientset: %v", err) - } - return clientset, nil -} - -func PatchRoleLabelTo(n MySQLNode) error { - // Creates the clientset - clientset, err := GetClientSet() - if err != nil { - return fmt.Errorf("failed to create clientset: %v", err) - } - patch := fmt.Sprintf(`{"metadata":{"labels":{"role":"%s"}}}`, n.Role) - _, err = clientset.CoreV1().Pods(n.Namespace).Patch(context.TODO(), n.PodName, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) - if err != nil { - return fmt.Errorf("failed to patch pod role label: %v", err) - } - return nil -} - -func XenonPingMyself() error { - args := []string{"xenon", "ping"} - cmd := exec.Command("xenoncli", args...) - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to exec xenoncli xenon ping: %v", err) - } - return nil -} - -func GetRaftStatus() *raftStatus { - args := []string{"raft", "status"} - cmd := exec.Command("xenoncli", args...) - res, err := cmd.Output() - if err != nil { - log.Fatalf("failed to exec xenoncli raft status: %v", err) - } - raftStatus := raftStatus{} - if err := json.Unmarshal(res, &raftStatus); err != nil { - log.Fatalf("failed to unmarshal raft status: %v", err) - } - return &raftStatus -} - -func GetRole() string { - return GetRaftStatus().State -}