6
6
"database/sql"
7
7
"encoding/json"
8
8
"fmt"
9
+ "io"
9
10
"os"
10
11
"os/exec"
11
12
"strconv"
@@ -15,11 +16,26 @@ import (
15
16
_ "github.com/go-sql-driver/mysql"
16
17
. "github.com/radondb/radondb-mysql-kubernetes/utils"
17
18
log "github.com/sirupsen/logrus"
19
+ corev1 "k8s.io/api/core/v1"
18
20
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
+ "k8s.io/apimachinery/pkg/runtime"
19
22
"k8s.io/apimachinery/pkg/types"
20
23
"k8s.io/apimachinery/pkg/util/wait"
24
+ "k8s.io/client-go/kubernetes"
25
+ "k8s.io/client-go/rest"
26
+ "k8s.io/client-go/tools/clientcmd"
27
+ "k8s.io/client-go/tools/remotecommand"
21
28
)
22
29
30
+ type KubeAPI struct {
31
+ Client * kubernetes.Clientset
32
+ Config * rest.Config
33
+ }
34
+
35
+ type runRemoteCommandConfig struct {
36
+ container , namespace , podName string
37
+ }
38
+
23
39
const (
24
40
leaderStopCommand = "kill -9 $(pidof mysqld)"
25
41
mysqlUser = "root"
@@ -115,65 +131,80 @@ func leaderStop() error {
115
131
log .Info ("I am readonly, skip the leader stop" )
116
132
os .Exit (0 )
117
133
}
134
+ ch := make (chan error )
135
+ go func () {
136
+ defer func () {
137
+ if err := enableMyRaft (); err != nil {
138
+ log .Error (err )
139
+ }
140
+ }()
118
141
119
- // Step 1: disable event scheduler
120
- log .Info ("Disabling event scheduler" )
121
- stmt , err := SetEventScheduler (conn , false )
122
- if err != nil {
123
- return fmt .Errorf ("failed to disable event scheduler: %s" , err .Error ())
124
- }
125
- log .Infof ("set event scheduler to false: %s" , stmt )
142
+ log .Info ("Raft disable" )
143
+ if err := disableMyRaft (); err != nil {
144
+ log .Errorf ("failed to failover: %v" , err )
145
+ ch <- err
146
+ }
126
147
127
- // Step 2: set readonly
128
- log .Info ("Setting readonly " )
129
- stmt , err = SetReadOnly (conn , true )
130
- if err != nil {
131
- return fmt . Errorf ( "failed to set readonly: %s" , err . Error ())
132
- }
133
- log .Infof ("set readonly to true : %s" , stmt )
148
+ // Step 1: disable event scheduler
149
+ log .Info ("Disabling event scheduler " )
150
+ stmt , err := SetEventScheduler (conn , false )
151
+ if err != nil {
152
+ ch <- err
153
+ }
154
+ log .Infof ("set event scheduler to false : %s" , stmt )
134
155
135
- // Step 3: check long running writes
136
- log .Info ("Checking long running writes " )
137
- num , stmt , err := CheckLongRunningWrites (conn , 4 )
138
- if err != nil {
139
- return fmt . Errorf ( "failed to check long running writes: %s" , err . Error ())
140
- }
141
- log .Infof ("%d,long running writes : %s" , num , stmt )
156
+ // Step 2: set readonly
157
+ log .Info ("Setting readonly " )
158
+ stmt , err = SetReadOnly (conn , true )
159
+ if err != nil {
160
+ ch <- err
161
+ }
162
+ log .Infof ("set readonly to true : %s" , stmt )
142
163
143
- // TODO: Step 4: set max connections
164
+ // Step 3: check long running writes
165
+ log .Info ("Checking long running writes" )
166
+ num , stmt , err := CheckLongRunningWrites (conn , 4 )
167
+ if err != nil {
168
+ ch <- err
169
+ }
170
+ log .Infof ("%d,long running writes: %s" , num , stmt )
144
171
145
- // Step 5: kill threads
146
- log .Info ("Killing threads" )
147
- err = KillThreads (conn )
148
- if err != nil {
149
- return fmt .Errorf ("failed to kill threads: %s" , err .Error ())
150
- }
172
+ // TODO: Step 4: set max connections
151
173
152
- // Step 6: FlushTablesWithReadLock
153
- log .Info ("Flushing tables with read lock" )
154
- stmt , err = FlushTablesWithReadLock (conn )
155
- if err != nil {
156
- return fmt .Errorf ("failed to flush tables with read lock: %s" , err .Error ())
157
- }
158
- log .Info ("flushed tables with read lock: " , stmt )
174
+ // Step 5: kill threads
175
+ log .Info ("Killing threads" )
176
+ err = KillThreads (conn )
177
+ if err != nil {
178
+ ch <- err
179
+ }
159
180
160
- // Step 7: FlushBinaryLogs
161
- log .Info ("Flushing binary logs " )
162
- stmt , err = FlushBinaryLogs (conn )
163
- if err != nil {
164
- return fmt . Errorf ( "failed to flush binary logs: %s" , err . Error ())
165
- }
166
- log .Info ("flushed binary logs: " , stmt )
181
+ // Step 6: FlushTablesWithReadLock
182
+ log .Info ("Flushing tables with read lock " )
183
+ stmt , err = FlushTablesWithReadLock (conn )
184
+ if err != nil {
185
+ ch <- err
186
+ }
187
+ log .Info ("flushed tables with read lock: " , stmt )
167
188
168
- // Step 8: Failover
169
- log .Info ("Failover" )
170
- time .Sleep (2 * time .Second )
171
- if err := DoFailOver (); err != nil {
172
- return fmt .Errorf ("failed to failover: %s" , err .Error ())
189
+ // Step 7: FlushBinaryLogs
190
+ log .Info ("Flushing binary logs" )
191
+ stmt , err = FlushBinaryLogs (conn )
192
+ if err != nil {
193
+ ch <- err
194
+ }
195
+ log .Info ("flushed binary logs:" , stmt )
196
+ }()
197
+ select {
198
+ case err := <- ch :
199
+ return err
200
+ case <- time .After (5 * time .Second ):
201
+ log .Info ("timeout" )
202
+ if err := killMysqld (); err != nil {
203
+ return err
204
+ }
205
+ return nil
173
206
}
174
207
175
- log .Info ("leader stop finished" )
176
- return nil
177
208
}
178
209
179
210
func liveness () error {
@@ -424,7 +455,7 @@ func DoFailOver() error {
424
455
if err := disableMyRaft (); err != nil {
425
456
return err
426
457
}
427
- return WaitForNewLeader ( time . Minute * 2 )
458
+ return nil
428
459
}
429
460
430
461
func enableMyRaft () error {
@@ -534,3 +565,91 @@ func patchPodLabel(n MySQLNode, patch string) error {
534
565
}
535
566
return nil
536
567
}
568
+
569
+ func (k * KubeAPI ) Exec (namespace , pod , container string , stdin io.Reader , command []string ) (string , string , error ) {
570
+ var stdout , stderr bytes.Buffer
571
+
572
+ var Scheme = runtime .NewScheme ()
573
+ if err := corev1 .AddToScheme (Scheme ); err != nil {
574
+ log .Fatalf ("failed to add to scheme: %v" , err )
575
+ return "" , "" , err
576
+ }
577
+ var ParameterCodec = runtime .NewParameterCodec (Scheme )
578
+
579
+ request := k .Client .CoreV1 ().RESTClient ().Post ().
580
+ Resource ("pods" ).SubResource ("exec" ).
581
+ Namespace (namespace ).Name (pod ).
582
+ VersionedParams (& corev1.PodExecOptions {
583
+ Container : container ,
584
+ Command : command ,
585
+ Stdin : stdin != nil ,
586
+ Stdout : true ,
587
+ Stderr : true ,
588
+ }, ParameterCodec )
589
+
590
+ exec , err := remotecommand .NewSPDYExecutor (k .Config , "POST" , request .URL ())
591
+
592
+ if err == nil {
593
+ err = exec .Stream (remotecommand.StreamOptions {
594
+ Stdin : stdin ,
595
+ Stdout : & stdout ,
596
+ Stderr : & stderr ,
597
+ })
598
+ }
599
+
600
+ return stdout .String (), stderr .String (), err
601
+ }
602
+
603
+ func runRemoteCommand (kubeapi * KubeAPI , cfg runRemoteCommandConfig , cmd []string ) (string , string , error ) {
604
+ bashCmd := []string {"bash" }
605
+ reader := strings .NewReader (strings .Join (cmd , " " ))
606
+ return kubeapi .Exec (cfg .namespace , cfg .podName , cfg .container , reader , bashCmd )
607
+ }
608
+
609
+ func NewForConfig (config * rest.Config ) (* KubeAPI , error ) {
610
+ var api KubeAPI
611
+ var err error
612
+
613
+ api .Config = config
614
+ api .Client , err = kubernetes .NewForConfig (api .Config )
615
+
616
+ return & api , err
617
+ }
618
+
619
+ func NewConfig () (* rest.Config , error ) {
620
+ // The default loading rules try to read from the files specified in the
621
+ // environment or from the home directory.
622
+ loader := clientcmd .NewDefaultClientConfigLoadingRules ()
623
+
624
+ // The deferred loader tries an in-cluster config if the default loading
625
+ // rules produce no results.
626
+ return clientcmd .NewNonInteractiveDeferredLoadingClientConfig (
627
+ loader , & clientcmd.ConfigOverrides {}).ClientConfig ()
628
+ }
629
+
630
+ func killMysqld () error {
631
+ config , err := NewConfig ()
632
+ if err != nil {
633
+ panic (err )
634
+ }
635
+ k , err := NewForConfig (config )
636
+ if err != nil {
637
+ panic (err )
638
+ }
639
+ cfg := runRemoteCommandConfig {
640
+ podName : podName ,
641
+ namespace : ns ,
642
+ container : "mysql" ,
643
+ }
644
+
645
+ killMySQLCommand := []string {leaderStopCommand }
646
+ log .Infof ("killing mysql command: %s" , leaderStopCommand )
647
+ var output , stderr string
648
+ output , stderr , err = runRemoteCommand (k , cfg , killMySQLCommand )
649
+ log .Info ("output=[" + output + "]" )
650
+ log .Info ("stderr=[" + stderr + "]" )
651
+ if err != nil {
652
+ log .Fatal (err )
653
+ }
654
+ return nil
655
+ }
0 commit comments