Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1alpha1/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type BackupStatus struct {
BackupDate string `json:"backupDate,omitempty"`
// Get the backup Type
BackupType string `json:"backupType,omitempty"`
// Get the Gtid
Gtid string `json:"gtid,omitempty"`
// Conditions represents the backup resource conditions list.
Conditions []BackupCondition `json:"conditions,omitempty"`
}
Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ type MysqlClusterStatus struct {
ReadyNodes int `json:"readyNodes,omitempty"`
// State
State ClusterState `json:"state,omitempty"`
// LastBackup
LastBackup string `json:"lastbackup,omitempty"`
LastBackupGtid string `json:"lastbackupGtid,omitempty"`
// Conditions contains the list of the cluster conditions fulfilled.
Conditions []ClusterCondition `json:"conditions,omitempty"`
// Nodes contains the list of the node status fulfilled.
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/mysqlcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *MysqlCluster) ValidateDelete() error {
return nil
}

// TODO: Add NFSServerAddress webhook & backup schedule.
// Add NFSServerAddress webhook & backup schedule.
func (r *MysqlCluster) validateNFSServerAddress(oldCluster *MysqlCluster) error {
isIP := net.ParseIP(r.Spec.NFSServerAddress) != nil
if len(r.Spec.NFSServerAddress) != 0 && !isIP {
Expand Down
18 changes: 14 additions & 4 deletions backup/cronbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,22 @@ func (j *CronJob) scheduledBackupsRunningCount() int {
backupsList := &apiv1alpha1.BackupList{}
// select all backups with labels recurrent=true and and not completed of the cluster
selector := j.backupSelector()
client.MatchingFields{"status.completed": "false"}.ApplyToList(selector)
// Because k8s do not support fieldSelector with custom resources
// https://github.com/kubernetes/kubernetes/issues/51046
// So this cannot use fields selector.
// client.MatchingFields{"status.completed": "false"}.ApplyToList(selector)

if err := j.Client.List(context.TODO(), backupsList, selector); err != nil {
if err := j.Client.List(context.TODO(), backupsList); err != nil {
log.Error(err, "failed getting backups", "selector", selector)
return 0
}

return len(backupsList.Items)
var rest []apiv1alpha1.Backup
for _, b := range backupsList.Items {
if !b.Status.Completed {
rest = append(rest, b)
}
}
return len(rest)
}

func (j *CronJob) backupSelector() *client.ListOptions {
Expand Down Expand Up @@ -133,10 +141,12 @@ func (j *CronJob) createBackup() (*apiv1alpha1.Backup, error) {
//RemoteDeletePolicy: j.BackupRemoteDeletePolicy,
HostName: fmt.Sprintf("%s-mysql-0", j.ClusterName),
},
Status: apiv1alpha1.BackupStatus{Completed: false},
}
if len(j.NFSServerAddress) > 0 {
backup.Spec.NFSServerAddress = j.NFSServerAddress
}

return backup, j.Client.Create(context.TODO(), backup)
}

Expand Down
12 changes: 9 additions & 3 deletions backup/syncer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

v1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/backup"
Expand Down Expand Up @@ -101,6 +102,9 @@ func (s *jobSyncer) updateStatus(job *batchv1.Job) {
if backType := s.job.Annotations[utils.JobAnonationType]; backType != "" {
s.backup.Status.BackupType = backType
}
if gtid := s.job.Annotations[utils.JobAnonationGtid]; gtid != "" {
s.backup.Status.Gtid = gtid
}
}

// check for failed condition
Expand Down Expand Up @@ -152,13 +156,15 @@ func (s *jobSyncer) ensurePodSpec(in corev1.PodSpec) corev1.PodSpec {
"/bin/bash", "-c", "--",
}
backupToDir, DateTime := utils.BuildBackupName(s.backup.Spec.ClusterName)
// add the gtid script
strAnnonations := fmt.Sprintf(`curl -X PATCH -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" -H "Content-Type: application/json-patch+json" \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest a more native way to add annotations 😊

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will do it next time.

--cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/apis/batch/v1/namespaces/%s/jobs/%s \
-d '[{"op": "add", "path": "/metadata/annotations/backupName", "value": "%s"}, {"op": "add", "path": "/metadata/annotations/backupDate", "value": "%s"}, {"op": "add", "path": "/metadata/annotations/backupType", "value": "NFS"}]';`,
s.backup.Namespace, s.backup.GetNameForJob(), backupToDir, DateTime)
-d "[{\"op\": \"add\", \"path\": \"/metadata/annotations/backupName\", \"value\": \"%s\"}, {\"op\": \"add\", \"path\": \"/metadata/annotations/backupDate\", \"value\": \"%s\"},{\"op\": \"add\", \"path\": \"/metadata/annotations/gtid\", \"value\": \"$( cat /backup/%s/xtrabackup_binlog_info|awk '{print $3}')\"}, {\"op\": \"add\", \"path\": \"/metadata/annotations/backupType\", \"value\": \"NFS\"}]";`,
s.backup.Namespace, s.backup.GetNameForJob(), backupToDir, DateTime, backupToDir)
log.Log.Info(strAnnonations)
// Add the check DiskUsage
// use expr because shell cannot compare float number
checkUsage := `[ $(expr $(df /backup|awk 'NR>1 {print $4}') \> $(du /backup |awk 'END {if (NR > 1) {print $1 /(NR-1)} else print 0}')) -eq '1' ] || { echo disk available may be too small; exit 1;};`
checkUsage := `[ $(expr $(df /backup|awk 'NR>1 {print $4}') \> $(echo $(du -d1 /backup |awk 'END {if (NR > 1) {print $1 /(NR-1)} else print 0}')|cut -d. -f1)) -eq '1' ] || { echo disk available may be too small; exit 1;};`
in.Containers[0].Args = []string{
checkUsage + fmt.Sprintf("mkdir -p /backup/%s;"+
"curl --user $BACKUP_USER:$BACKUP_PASSWORD %s/download|xbstream -x -C /backup/%s; err1=${PIPESTATUS[0]};"+
Expand Down
3 changes: 3 additions & 0 deletions charts/mysql-operator/crds/mysql.radondb.com_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ spec:
- type
type: object
type: array
gtid:
description: Get the Gtid
type: string
required:
- completed
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,11 @@ spec:
- type
type: object
type: array
lastbackup:
description: LastBackup
type: string
lastbackupGtid:
type: string
nodes:
description: Nodes contains the list of the node status fulfilled.
items:
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/mysql.radondb.com_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ spec:
- type
type: object
type: array
gtid:
description: Get the Gtid
type: string
required:
- completed
type: object
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/mysql.radondb.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,11 @@ spec:
- type
type: object
type: array
lastbackup:
description: LastBackup
type: string
lastbackupGtid:
type: string
nodes:
description: Nodes contains the list of the node status fulfilled.
items:
Expand Down
2 changes: 1 addition & 1 deletion controllers/backupcron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (r *BackupCronReconciler) updateClusterSchedule(ctx context.Context, cluste
log.V(1).Info("cluster already added to cron.", "key", cluster)

// change scheduler for already added crons
if !reflect.DeepEqual(entry.Schedule, schedule) {
if !reflect.DeepEqual(entry.Schedule, schedule) || j.NFSServerAddress != cluster.Spec.NFSServerAddress {
log.Info("update cluster scheduler", "key", cluster,
"scheduler", cluster.Spec.BackupSchedule)

Expand Down
37 changes: 36 additions & 1 deletion mysqlcluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package syncer
import (
"context"
"fmt"
"sort"
"strconv"
"time"

"github.com/presslabs/controller-util/pkg/syncer"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -153,11 +155,44 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
if len(s.Status.Conditions) > maxStatusesQuantity {
s.Status.Conditions = s.Status.Conditions[len(s.Status.Conditions)-maxStatusesQuantity:]
}

// update backup Status
s.updateLastBackup()
// Update all nodes' status.
return syncer.SyncResult{}, s.updateNodeStatus(ctx, s.cli, list.Items)
}

func (s *StatusSyncer) updateLastBackup() error {
// 1. fetch all finished backup cr
backupsList := &apiv1alpha1.BackupList{}
labelSet := labels.Set{"cluster": s.Name}
if err := s.cli.List(context.TODO(), backupsList, &client.ListOptions{
Namespace: s.Namespace, LabelSelector: labelSet.AsSelector(),
}); err != nil {
return err
}
var finisheds []apiv1alpha1.Backup
for _, b := range backupsList.Items {
if b.Status.Completed {
finisheds = append(finisheds, b)
}
}
// 2. sort descent
sort.Slice(finisheds, func(i, j int) bool {
return finisheds[i].ObjectMeta.CreationTimestamp.Before(&finisheds[j].ObjectMeta.CreationTimestamp)
})
// 3. get first backup which has backup Name
for _, b := range finisheds {
if len(b.Status.BackupName) != 0 {
s.Status.LastBackup = b.Status.BackupName
s.Status.LastBackupGtid = b.Status.Gtid
break
}

}

return nil
}

// updateClusterStatus update the cluster status and returns condition.
func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition {
clusterCondition := apiv1alpha1.ClusterCondition{
Expand Down
9 changes: 5 additions & 4 deletions sidecar/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not authenticated!", http.StatusForbidden)
return
}
backName, Datetime, err := RunTakeBackupCommand(s.cfg)
backName, Datetime, Gtid, err := RunTakeBackupCommand(s.cfg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
msg, _ := json.Marshal(utils.JsonResult{Status: backupSuccessful, BackupName: backName, Date: Datetime})
msg, _ := json.Marshal(utils.JsonResult{Status: backupSuccessful, BackupName: backName, Gtid: Gtid, Date: Datetime})
w.Write(msg)
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func transportWithTimeout(connectTimeout time.Duration) http.RoundTripper {
}
}

func setAnnonations(cfg *Config, backname string, DateTime string, BackupType string) error {
func setAnnonations(cfg *Config, backname string, DateTime string, Gtid, BackupType string) error {
config, err := rest.InClusterConfig()
if err != nil {
return err
Expand All @@ -231,6 +231,7 @@ func setAnnonations(cfg *Config, backname string, DateTime string, BackupType st
}
job.Annotations[utils.JobAnonationName] = backname
job.Annotations[utils.JobAnonationDate] = DateTime
job.Annotations[utils.JobAnonationGtid] = Gtid
job.Annotations[utils.JobAnonationType] = BackupType
_, err = clientset.BatchV1().Jobs(cfg.NameSpace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
Expand Down Expand Up @@ -266,7 +267,7 @@ func requestABackup(cfg *Config, host string, endpoint string) (*http.Response,
var result utils.JsonResult
json.NewDecoder(resp.Body).Decode(&result)

err = setAnnonations(cfg, result.BackupName, result.Date, "S3") // set annotation
err = setAnnonations(cfg, result.BackupName, result.Date, result.Gtid, "S3") // set annotation
if err != nil {
return nil, fmt.Errorf("fail to set annotation: %s", err)
}
Expand Down
51 changes: 44 additions & 7 deletions sidecar/takebackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,66 @@ limitations under the License.
package sidecar

import (
"bufio"
"fmt"
"os"
"os/exec"
"strings"
"sync"
)

// RunTakeBackupCommand starts a backup command
func RunTakeBackupCommand(cfg *Config) (string, string, error) {
func RunTakeBackupCommand(cfg *Config) (string, string, string, error) {
// cfg->XtrabackupArgs()
xtrabackup := exec.Command(xtrabackupCommand, cfg.XtrabackupArgs()...)

var err error
backupName, DateTime := cfg.XBackupName()
Gtid := ""
xcloud := exec.Command(xcloudCommand, cfg.XCloudArgs(backupName)...)
log.Info("xargs ", "xargs", strings.Join(cfg.XCloudArgs(backupName), " "))
if xcloud.Stdin, err = xtrabackup.StdoutPipe(); err != nil {
log.Error(err, "failed to pipline")
return "", "", err
return "", "", "", err
}
xtrabackup.Stderr = os.Stderr
//xtrabackup.Stderr = os.Stderr
xcloud.Stderr = os.Stderr

var wg sync.WaitGroup
Stderr, err := xtrabackup.StderrPipe()
if err != nil {
return "", "", "", fmt.Errorf("RunCommand: cmd.StderrPipe(): %v", err)
}
if err := xtrabackup.Start(); err != nil {
log.Error(err, "failed to start xtrabackup command")
return "", "", err
return "", "", "", err
}
if err := xcloud.Start(); err != nil {
log.Error(err, "fail start xcloud ")
return "", "", err
return "", "", "", err
}
scanner := bufio.NewScanner(Stderr)
//scanner.Split(ScanLinesR)
wg.Add(1)
go func() {
for scanner.Scan() {
text := scanner.Text()
fmt.Println(text)
if index := strings.Index(text, "GTID"); index != -1 {
// Mysql5.7 examples: MySQL binlog position: filename 'mysql-bin.000002', position '588', GTID of the last change '319bd6eb-2ea2-11ed-bf40-7e1ef582b427:1-2'
// MySQL8.0 no gtid: MySQL binlog position: filename 'mysql-bin.000025', position '156'
length := len("GTID of the last change")
Gtid = strings.Trim(text[index+length:], " '") // trim space and \'
if len(Gtid) != 0 {
log.Info("Catch gtid: " + Gtid)
}

}
}
wg.Done()
}()

wg.Wait()
// pipe command fail one, whole things fail
errorChannel := make(chan error, 2)
go func() {
Expand All @@ -55,11 +85,18 @@ func RunTakeBackupCommand(cfg *Config) (string, string, error) {
go func() {
errorChannel <- xtrabackup.Wait()
}()
defer xtrabackup.Wait()
defer xcloud.Wait()

for i := 0; i < 2; i++ {
if err = <-errorChannel; err != nil {
return "", "", err
log.Info("catch error , need to stop")
_ = xtrabackup.Process.Kill()
_ = xcloud.Process.Kill()

return "", "", "", err
}
}
return backupName, DateTime, nil

return backupName, DateTime, Gtid, nil
}
3 changes: 3 additions & 0 deletions utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ const (
JobAnonationName = "backupName"
// Job Annonations date
JobAnonationDate = "backupDate"
// Job Anonations Gtid
JobAnonationGtid = "gtid"
// Job Annonations type
JobAnonationType = "backupType"
)
Expand Down Expand Up @@ -211,5 +213,6 @@ const (
type JsonResult struct {
Status string `json:"status"`
BackupName string `json:"backupName"`
Gtid string `json:"gtid"`
Date string `json:"date"`
}