Skip to content

Merge users from parent clusters with current users without changing credentials #1125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cluster/cluster_staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ func (cluster *Cluster) ReseedFromParentCluster(parent *Cluster, target *ServerM
}
err = target.JobReseedMysqldump(backupfile, false)
} else if backtype == config.ConstBackupLogicalTypeMydumper {
err = target.JobReseedMyLoader(backupfile, false)
restoreflag := pmaster.LastBackupMeta.Logical != nil && pmaster.LastBackupMeta.Logical.SplitUser && cluster.Conf.BackupRestoreMysqlUser
err = target.JobReseedMyLoader(backupfile, restoreflag)
meta, err2 := cluster.JobMyLoaderParseMeta(backupfile)
if err2 != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "MyLoader metadata parsing: %s", err2)
Expand Down
85 changes: 67 additions & 18 deletions cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,20 +1025,55 @@ func (server *ServerMonitor) JobZFSSnapBack() (int64, error) {
return server.JobInsertTask("zfssnapback", "0", cluster.Conf.MonitorAddress)
}

func (server *ServerMonitor) JobReseedMysqldumpUser(dir string) error {
cluster := server.ClusterGroup
sql_log_bin := 0
resetmaster := "RESET MASTER;"
if server.DBVersion.IsMySQLOrPerconaGreater84() {
resetmaster = "RESET BINARY LOGS AND GTIDS;"
}

if server.URL == cluster.GetMaster().URL {
sql_log_bin = 1
resetmaster = ""
}

cmdstring := "%sSET sql_log_bin=%d;SET long_query_time=10;"
if server.DBVersion.IsMySQLOrPerconaGreater84() {
cmdstring = "%sSET sql_log_bin=%d;SET long_query_time=10;"
}
cmdstring = fmt.Sprintf(cmdstring, resetmaster, sql_log_bin)

usergzfile, err := server.ReadMysqldumpUser(dir)
if err != nil {
return fmt.Errorf("Error opening mysql.user file %s", err)
}

cliParams := append(cluster.GetDumpCredentials(server), server.GetSSLClientParam("client")...)
cliParams = append(cliParams, strings.Split(cluster.Conf.BackupMysqlclientOptions, " ")...)
clientCmd := exec.Command(cluster.GetMysqlclientPath(), misc.RemoveEmptyString(cliParams)...)

clientCmd.Stdin = io.MultiReader(bytes.NewBufferString(cmdstring), usergzfile)
var out bytes.Buffer
clientCmd.Stdout = &out

if err := clientCmd.Run(); err != nil {
return fmt.Errorf("Error restoring users %s", err)
}

return nil
}

func (server *ServerMonitor) JobReseedMyLoader(backupdir string, restoreUser bool) error {
var err error
cluster := server.ClusterGroup
threads := strconv.Itoa(cluster.Conf.BackupLogicalLoadThreads)

if restoreUser {
//walk dir
files, err := os.ReadDir(backupdir)
if err == nil {
if len(files) > 0 {
for _, file := range files {
if strings.HasPrefix(file.Name(), "mysql.user") && strings.HasSuffix(file.Name(), ".sql.gz.skip") {
os.Rename(filepath.Join(backupdir, file.Name()), filepath.Join(backupdir, strings.TrimSuffix(file.Name(), ".skip")))
}
}
if server.LastBackupMeta.Logical != nil && server.LastBackupMeta.Logical.SplitUser {
err = server.JobReseedMysqldumpUser(filepath.Dir(backupdir))
if err != nil {
return fmt.Errorf("Error restoring users %s", err)
}
}
} else {
Expand Down Expand Up @@ -1140,8 +1175,8 @@ func (server *ServerMonitor) JobReseedMysqldump(backupfile string, restoreUser b
cmdstring = fmt.Sprintf(cmdstring, resetmaster, sql_log_bin)

var usergzfile io.Reader
if restoreUser {
usergzfile, err = server.ReadMysqldumpUser(backupfile)
if restoreUser && server.LastBackupMeta.Logical != nil && server.LastBackupMeta.Logical.SplitUser {
usergzfile, err = server.ReadMysqldumpUser(filepath.Dir(backupfile))
if err != nil {
return fmt.Errorf("Error opening mysql.user file %s", err)
}
Expand Down Expand Up @@ -1176,11 +1211,10 @@ func (server *ServerMonitor) JobReseedMysqldump(backupfile string, restoreUser b
return nil
}

func (server *ServerMonitor) ReadMysqldumpUser(backupfile string) (io.Reader, error) {
func (server *ServerMonitor) ReadMysqldumpUser(dir string) (io.Reader, error) {
cluster := server.ClusterGroup
var err error

dir := filepath.Dir(backupfile)
if _, err := os.Stat(dir); os.IsNotExist(err) {
return nil, fmt.Errorf("Directory %s does not exist", dir)
}
Expand All @@ -1192,7 +1226,7 @@ func (server *ServerMonitor) ReadMysqldumpUser(backupfile string) (io.Reader, er

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Opening mysql.user file %s", userpath)

gzfile, err := os.Open(backupfile)
gzfile, err := os.Open(userpath)
if err != nil {
return nil, fmt.Errorf("[%s] Failed opening backup file in backup server for reseed: %s ", server.URL, err)
}
Expand All @@ -1202,7 +1236,21 @@ func (server *ServerMonitor) ReadMysqldumpUser(backupfile string) (io.Reader, er
return nil, fmt.Errorf("[%s] Failed to unzip backup file in backup server for reseed: %s ", server.URL, err)
}

return fz, nil
pw := new(bytes.Buffer)
scanner := bufio.NewScanner(fz)
for scanner.Scan() {
line := scanner.Text()
cleaned := line
// REMOVE GRANT SUBSTRING FOR CONTAINING "IDENTIFIED BY" OR "IDENTIFIED WITH"
if strings.HasPrefix(line, "GRANT") && strings.Contains(line, "IDENTIFIED BY") || strings.Contains(line, "IDENTIFIED WITH") {
// Remove substring (IDENTIFIED .*) and exclude WITH
reIdentified := regexp.MustCompile(`(?i)\s+IDENTIFIED\s+(?:WITH\s+\S+(?:\s+(?:BY|AS)\s+'.*?')?|BY\s+(?:PASSWORD\s+)?'.*?')`)
cleaned = reIdentified.ReplaceAllString(line, "")
}
_, _ = pw.Write([]byte(cleaned + "\n"))
}

return pw, nil
}

// JobReseedBackupScript will execute the backup load script
Expand Down Expand Up @@ -1872,7 +1920,7 @@ func (server *ServerMonitor) JobBackupMysqldumpUser() error {
var err error

dir := server.GetMyBackupDirectory()
userpath := filepath.Join(dir, "mysql.users.sql.gz")
userpath := filepath.Join(dir, "mysql.user.sql.gz")

dumpargs := append(cluster.GetDumpCredentials(server), server.GetSSLClientParam("client-dump")...)
dumpargs = append(dumpargs, "--insert-ignore", "--system=user")
Expand Down Expand Up @@ -2163,9 +2211,10 @@ func (server *ServerMonitor) JobBackupLogical() error {
// Removing previous valid backup state and start
server.DelBackupLogicalCookie()

if cluster.Conf.BackupSplitMysqlUser {
// Always dump users for reusable backup for both mysqldump and mydumper
if server.IsMariaDB() && server.DBVersion.GreaterEqual("10.4") {
server.LastBackupMeta.Logical.SplitUser = true
err := server.JobBackupMysqldumpUser()
err = server.JobBackupMysqldumpUser()
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Error mysqldump backup request: %s", err.Error())
return err
Expand Down