Skip to content

Commit 0baa746

Browse files
committed
fix the problem
1 parent af085ab commit 0baa746

File tree

6 files changed

+65
-70
lines changed

6 files changed

+65
-70
lines changed

docs/docs/high_availability.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ Prepare two or more servers to serve as the maxwell host server and a zookeeper
6363
Example Running Scripts:
6464

6565
```
66-
bin/maxwell --log_level='INFO' --user='<user>' --password='<passwd>' --host='<host>' --producer=stdout --client_id='<client_id>' --ha=true --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>' --ha_modality='zookeeper'
66+
bin/maxwell --log_level='INFO' --user='<user>' --password='<passwd>' --host='<host>' --producer=stdout --client_id='<client_id>' --ha='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>'
6767
```
6868

6969
Run the preceding command on each maxwell host.
7070

7171
Get which host is the leader script Example:
7272
```
73-
bin/maxwell-leaders --ha=true --ha_modality='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>' --client_id='<client_id>'
73+
bin/maxwell-leaders --ha='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>' --client_id='<client_id>'
7474
```
7575
You can get:
7676
```
@@ -80,9 +80,9 @@ You can get:
8080
## Getting deeper
8181
If a timeout error occurs between the maxwell host and the zookeeper cluster or the connection is abnormal due to network instability, you can set the following parameters:
8282
```
83-
--session_time_out_ms=<session timeout duration>
84-
--connection_time_out_ms=<internal default wait time for the client to establish a connection with the zk>
85-
--max_retries=<number of retries>
86-
--base_sleep_time_ms=<retry time interval>
83+
--zookeeper_session_timeout_ms=<session timeout duration>
84+
--zookeeper_connection_timeout_ms=<internal default wait time for the client to establish a connection with the zk>
85+
--zookeeper_max_retries=<number of retries>
86+
--zookeeper_retry_wait_ms=<retry time interval>
8787
```
8888

src/main/java/com/zendesk/maxwell/Maxwell.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -332,19 +332,18 @@ public void run() {
332332

333333
LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage);
334334

335-
if ( config.haMode ) {
336-
337-
if ( "jgroups-raft".equals(config.haModality)) {
338-
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
339-
} else if ("zookeeper".equals(config.haModality)){
335+
if ( null != config.haMode){
336+
if ( "jgroups-raft".equals(config.haMode)){
337+
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHAJGroups();
338+
} else if ( "zookeeper".equals(config.haMode)){
340339
if( StringUtils.isBlank(config.zookeeperServer)){
341340
throw new Exception("In high availability mode 'zookeeperServer' does not allow Null. --zookeeper_server = " + config.zookeeperServer);
342341
}
343-
new MaxwellHA(maxwell, config.zookeeperServer, config.sessionTimeoutMs, config.connectionTimeoutMs, config.maxRetries, config.baseSleepTimeMs, config.clientID).startHA(config.haModality);
344-
}else {
345-
throw new Exception("Currently only jgroups-raft and zookeeper are supported. Check that the value of '--ha_modality' is one of them");
342+
new MaxwellHA(maxwell, config.zookeeperServer, config.zookeeperSessionTimeoutMs, config.zookeeperConnectionTimeoutMs, config.zookeeperMaxRetries, config.zookeeperRetryWaitMs, config.clientID).startHAZookeeper();
343+
} else {
344+
throw new Exception("The value of ha is not in (jgroups-raft,zookeeper). ha = " + config.haMode);
346345
}
347-
} else {
346+
} else{
348347
maxwell.start();
349348
}
350349
} catch ( SQLException e ) {

src/main/java/com/zendesk/maxwell/MaxwellConfig.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ public class MaxwellConfig extends AbstractConfig {
611611
/**
612612
* Enable high available support (via jgroups-raft or zookeeper)
613613
*/
614-
public boolean haMode;
614+
public String haMode;
615615

616616
/**
617617
* Path to raft.xml file that configures high availability support
@@ -637,27 +637,23 @@ public class MaxwellConfig extends AbstractConfig {
637637
/**
638638
* session time
639639
*/
640-
public int sessionTimeoutMs;
640+
public int zookeeperSessionTimeoutMs;
641641

642642
/**
643643
* connection time
644644
*/
645-
public int connectionTimeoutMs;
645+
public int zookeeperConnectionTimeoutMs;
646646

647647
/**
648648
* maxRetries
649649
*/
650-
public int maxRetries;
650+
public int zookeeperMaxRetries;
651651

652652
/**
653-
* baseSleepTimeMs
653+
* retryWaitMs
654654
*/
655-
public int baseSleepTimeMs;
655+
public int zookeeperRetryWaitMs;
656656

657-
/**
658-
* jgroups-raft or zookeeper
659-
*/
660-
public String haModality;
661657

662658
/**
663659
* Build a default configuration object.
@@ -772,23 +768,21 @@ protected MaxwellOptionParser buildOptionParser() {
772768
parser.separator();
773769

774770
parser.accepts( "ha", "enable high-availability mode via jgroups-raft or zookeeper" )
775-
.withOptionalArg().ofType(Boolean.class);
771+
.withOptionalArg();
776772
parser.accepts( "jgroups_config", "location of jgroups xml configuration file" )
777773
.withRequiredArg();
778774
parser.accepts( "raft_member_id", "raft memberID. (may also be specified in raft.xml)" )
779775
.withRequiredArg();
780776
parser.accepts("zookeeper_server","enable maxwell High Availability using zookeeper")
781777
.withRequiredArg();
782-
parser.accepts("session_time_out_ms","session timeout duration (maxwellHA on zk)")
778+
parser.accepts("zookeeper_session_timeout_ms","session timeout duration (maxwellHA on zk)")
783779
.withRequiredArg().ofType(Integer.class);
784-
parser.accepts("connection_time_out_ms","connection timeout duration (maxwellHA on zk)")
780+
parser.accepts("zookeeper_connection_timeout_ms","connection timeout duration (maxwellHA on zk)")
785781
.withRequiredArg().ofType(Integer.class);
786-
parser.accepts("max_retries","maximum retry (maxwellHA on zk)")
782+
parser.accepts("zookeeper_max_retries","maximum retry (maxwellHA on zk)")
787783
.withRequiredArg().ofType(Integer.class);
788-
parser.accepts("base_sleep_time_ms","initial retry wait time (maxwellHA on zk)")
784+
parser.accepts("zookeeper_retry_wait_ms","initial retry wait time (maxwellHA on zk)")
789785
.withRequiredArg().ofType(Integer.class);
790-
parser.accepts("ha_modality","high availability mode : jgroups-raft or zookeeper")
791-
.withRequiredArg();
792786

793787
parser.separator();
794788

@@ -1248,17 +1242,16 @@ private void setup(OptionSet options, Properties properties) {
12481242

12491243
setupEncryptionOptions(options, properties);
12501244

1251-
this.haMode = fetchBooleanOption("ha", options, properties, false);
1245+
this.haMode = fetchStringOption("ha", options, properties, null);
12521246
this.jgroupsConf = fetchStringOption("jgroups_config", options, properties, "raft.xml");
12531247
this.raftMemberID = fetchStringOption("raft_member_id", options, properties, null);
12541248
this.replicationReconnectionRetries = fetchIntegerOption("replication_reconnection_retries", options, properties, 1);
12551249

12561250
this.zookeeperServer = fetchStringOption("zookeeper_server", options, properties, null);
1257-
this.sessionTimeoutMs = fetchIntegerOption("session_time_out_ms", options, properties, 6000);
1258-
this.connectionTimeoutMs = fetchIntegerOption("connection_time_out_ms", options, properties, 6000);
1259-
this.maxRetries = fetchIntegerOption("max_retries", options, properties, 3);
1260-
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms", options, properties, 1000);
1261-
this.haModality = fetchStringOption("ha_modality", options, properties, "jgroups-raft");
1251+
this.zookeeperSessionTimeoutMs = fetchIntegerOption("zookeeper_session_timeout_ms", options, properties, 6000);
1252+
this.zookeeperConnectionTimeoutMs = fetchIntegerOption("zookeeper_connection_timeout_ms", options, properties, 6000);
1253+
this.zookeeperMaxRetries = fetchIntegerOption("zookeeper_max_retries", options, properties, 3);
1254+
this.zookeeperRetryWaitMs = fetchIntegerOption("zookeeper_retry_wait_ms", options, properties, 1000);
12621255

12631256
this.binlogEventQueueSize = fetchIntegerOption("binlog_event_queue_size", options, properties, BinlogConnectorReplicator.BINLOG_QUEUE_SIZE);
12641257
}

src/main/java/com/zendesk/maxwell/MaxwellHA.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private void run() {
7979
* Does not return.
8080
* @throws Exception if there's any issues
8181
*/
82-
public void startHA() throws Exception {
82+
public void startHAJGroups() throws Exception {
8383
JChannel ch=new JChannel(jgroupsConf);
8484
RaftHandle handle=new RaftHandle(ch, null);
8585
if ( raftMemberID != null )
@@ -112,10 +112,9 @@ public void startHA() throws Exception {
112112

113113
/**
114114
* indicates that Ha is started in zookeeper mode
115-
* @param flag the overloading of the method facilitates the subsequent expansion of multiple modes
116115
* @throws Exception
117116
*/
118-
public void startHA(String flag) throws Exception {
117+
public void startHAZookeeper() throws Exception {
119118
String electPath = "/" + clientID + "/services";
120119
String masterPath = "/" + clientID + "/leader";
121120
CuratorUtils cu = new CuratorUtils();
@@ -131,17 +130,26 @@ public void startHA(String flag) throws Exception {
131130
CuratorFramework client = cu.getClient();
132131
LeaderLatch leader = new LeaderLatch(client, cu.getElectPath());
133132
leader.start();
133+
LOGGER.info("this node is participating in the election of the leader ....");
134134
leader.addListener(new LeaderLatchListener() {
135135
@Override
136136
public void isLeader() {
137-
cu.register();
137+
try {
138+
cu.register();
139+
} catch (Exception e) {
140+
e.printStackTrace();
141+
LOGGER.error("The node registration is abnormal, check whether the maxwell host communicates properly with the zookeeper network");
142+
cu.stop();
143+
System.exit(1);
144+
}
145+
LOGGER.info("node is current leader, starting Maxwell....");
138146
run();
139-
LOGGER.debug("starting maxwell");
147+
cu.stop();
140148
}
141149

142150
@Override
143151
public void notLeader() {
144-
152+
//LeaderLatch.CloseMode.SILENT mode will not invoke this method
145153
}
146154
});
147155

src/main/java/com/zendesk/maxwell/util/CuratorUtils.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,13 @@
55
import org.apache.curator.framework.CuratorFrameworkFactory;
66
import org.apache.curator.retry.ExponentialBackoffRetry;
77
import org.apache.zookeeper.CreateMode;
8-
import org.slf4j.Logger;
9-
import org.slf4j.LoggerFactory;
108

119
import java.net.InetAddress;
1210
import java.util.ArrayList;
1311
import java.util.List;
1412

1513
public class CuratorUtils {
1614

17-
static final Logger LOGGER = LoggerFactory.getLogger(CuratorUtils.class);
18-
1915
private CuratorFramework client;
2016
private String zookeeperServer;
2117
private int sessionTimeoutMs;
@@ -104,27 +100,19 @@ public CuratorFramework getClient() {
104100
return client;
105101
}
106102

107-
public void register() {
108-
try {
109-
String rootPath = masterPath;
110-
String hostAddress = InetAddress.getLocalHost().getHostAddress();
111-
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + hostAddress);
112-
} catch (Exception e) {
113-
LOGGER.error("register exception", e);
114-
}
103+
public void register() throws Exception {
104+
String rootPath = masterPath;
105+
String hostAddress = InetAddress.getLocalHost().getHostAddress();
106+
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + hostAddress);
115107
}
116108

117-
public List<String> getChildren(String path) {
109+
public List<String> getChildren(String path) throws Exception {
118110
List<String> childrenList = new ArrayList<>();
119-
try {
120-
childrenList = client.getChildren().forPath(path);
121-
} catch (Exception e) {
122-
LOGGER.error("There was an error getting the child nodes", e);
123-
}
111+
childrenList = client.getChildren().forPath(path);
124112
return childrenList;
125113
}
126114

127-
public List<String> getInstances() {
115+
public List<String> getInstances() throws Exception {
128116
return getChildren(masterPath);
129117
}
130118
}

src/main/java/com/zendesk/maxwell/util/MaxwellLeaders.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,35 @@ public static void main(String[] args) {
2323
Logging.setLevel("INFO");
2424
}
2525

26-
if( config.haMode && "zookeeper".equals(config.haModality)){
26+
if( "zookeeper".equals(config.haMode)){
2727
CuratorUtils cu = new CuratorUtils();
2828
cu.setZookeeperServer(config.zookeeperServer);
29-
cu.setSessionTimeoutMs(config.sessionTimeoutMs);
30-
cu.setConnectionTimeoutMs(config.connectionTimeoutMs);
31-
cu.setMaxRetries(config.maxRetries);
32-
cu.setBaseSleepTimeMs(config.baseSleepTimeMs);
29+
cu.setSessionTimeoutMs(config.zookeeperSessionTimeoutMs);
30+
cu.setConnectionTimeoutMs(config.zookeeperConnectionTimeoutMs);
31+
cu.setMaxRetries(config.zookeeperMaxRetries);
32+
cu.setBaseSleepTimeMs(config.zookeeperRetryWaitMs);
3333
cu.setClientId(config.clientID);
3434
String electPath = "/" + config.clientID + "/services";
3535
String masterPath = "/" + config.clientID + "/leader";
3636
cu.setElectPath(electPath);
3737
cu.setMasterPath(masterPath);
3838
cu.init();
39-
List<String> instances = cu.getInstances();
39+
List<String> instances = null;
40+
try {
41+
instances = cu.getInstances();
42+
} catch (Exception e) {
43+
e.printStackTrace();
44+
LOGGER.error("The path does not exist or is empty. Please check whether the clientID is correct. clientID = " + config.clientID);
45+
System.exit(1);
46+
}
4047

4148
if(0 == instances.size()){
4249
LOGGER.info("Maxwell is not a high availability mode Or maxwell is not started");
4350
}else {
4451
LOGGER.info("clientID:"+config.clientID + ":leaders now are -> {}",instances.get(0));
4552
}
4653
}else {
47-
LOGGER.info("Maxwell is not a high availability mode");
54+
LOGGER.error("make sure ha = 'zookeeper'. ha = " + config.haMode);
4855
}
4956
}
5057
}

0 commit comments

Comments
 (0)