-
Notifications
You must be signed in to change notification settings - Fork 1k
maxwellHA on zookeeper #1948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
maxwellHA on zookeeper #1948
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
#!/bin/bash | ||
set -e | ||
|
||
base_dir="$(dirname "$0")/.." | ||
lib_dir="$base_dir/lib" | ||
lib_dir_development="$base_dir/target/lib" | ||
|
||
if [ ! -e "$lib_dir" -a -e "$lib_dir_development" ]; then | ||
lib_dir="$lib_dir_development" | ||
CLASSPATH="$CLASSPATH:$base_dir/target/classes" | ||
fi | ||
|
||
CLASSPATH="$CLASSPATH:$lib_dir/*" | ||
|
||
if [ -z "$JAVA_HOME" ]; then | ||
JAVA="java" | ||
else | ||
JAVA="$JAVA_HOME/bin/java" | ||
fi | ||
|
||
export LANG="en_US.UTF-8" | ||
exec $JAVA -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp $CLASSPATH com.zendesk.maxwell.util.MaxwellLeaders "$@" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,32 @@ | ||
package com.zendesk.maxwell; | ||
|
||
import com.zendesk.maxwell.util.CuratorUtils; | ||
import org.apache.curator.framework.CuratorFramework; | ||
import org.apache.curator.framework.recipes.leader.LeaderLatch; | ||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener; | ||
import org.jgroups.JChannel; | ||
import org.jgroups.protocols.raft.Log; | ||
import org.jgroups.protocols.raft.Role; | ||
import org.jgroups.raft.RaftHandle; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.net.InetAddress; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
/** | ||
* Class that joins a jgroups-raft cluster of servers | ||
* Class that joins a jgroups-raft cluster of servers or zookeeper | ||
*/ | ||
public class MaxwellHA { | ||
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellHA.class); | ||
|
||
private final Maxwell maxwell; | ||
private final String jgroupsConf, raftMemberID, clientID; | ||
private String jgroupsConf, raftMemberID, clientID; | ||
private String zookeeperServer; | ||
private int sessionTimeoutMs, connectionTimeoutMs, maxRetries, baseSleepTimeMs; | ||
private boolean hasRun = false; | ||
private AtomicBoolean isRaftLeader = new AtomicBoolean(false); | ||
|
||
|
@@ -33,6 +44,26 @@ public MaxwellHA(Maxwell maxwell, String jgroupsConf, String raftMemberID, Strin | |
this.clientID = clientID; | ||
} | ||
|
||
/** | ||
* Build a MaxwellHA object | ||
* @param maxwell The Maxwell instance that will be run when an election is won | ||
* @param zookeeperServer zookeeper adds | ||
* @param sessionTimeoutMs | ||
* @param connectionTimeoutMs | ||
* @param maxRetries | ||
* @param baseSleepTimeMs | ||
* @param clientID The maxwell clientID. This will be the only one through which the actual path is stored | ||
*/ | ||
public MaxwellHA(Maxwell maxwell, String zookeeperServer, int sessionTimeoutMs, int connectionTimeoutMs, int maxRetries, int baseSleepTimeMs, String clientID) { | ||
this.maxwell = maxwell; | ||
this.zookeeperServer = zookeeperServer; | ||
this.sessionTimeoutMs = sessionTimeoutMs; | ||
this.connectionTimeoutMs = connectionTimeoutMs; | ||
this.maxRetries = maxRetries; | ||
this.baseSleepTimeMs = baseSleepTimeMs; | ||
this.clientID = clientID; | ||
} | ||
|
||
private void run() { | ||
try { | ||
if (hasRun) | ||
|
@@ -53,7 +84,7 @@ private void run() { | |
* Does not return. | ||
* @throws Exception if there's any issues | ||
*/ | ||
public void startHA() throws Exception { | ||
public void startHAJGroups() throws Exception { | ||
JChannel ch=new JChannel(jgroupsConf); | ||
RaftHandle handle=new RaftHandle(ch, null); | ||
if ( raftMemberID != null ) | ||
|
@@ -83,4 +114,75 @@ public void startHA() throws Exception { | |
|
||
Thread.sleep(Long.MAX_VALUE); | ||
} | ||
|
||
/** | ||
* indicates that Ha is started in zookeeper mode | ||
* @throws Exception | ||
*/ | ||
public void startHAZookeeper() throws Exception { | ||
|
||
Lock lock = new ReentrantLock(); | ||
String hostAddress = InetAddress.getLocalHost().getHostAddress(); | ||
|
||
String electPath = "/" + clientID + "/services"; | ||
String masterPath = "/" + clientID + "/leader"; | ||
CuratorUtils cu = new CuratorUtils(); | ||
cu.setZookeeperServer(zookeeperServer); | ||
cu.setSessionTimeoutMs(sessionTimeoutMs); | ||
cu.setConnectionTimeoutMs(connectionTimeoutMs); | ||
cu.setMaxRetries(maxRetries); | ||
cu.setBaseSleepTimeMs(baseSleepTimeMs); | ||
cu.setClientId(clientID); | ||
cu.setElectPath(electPath); | ||
cu.setMasterPath(masterPath); | ||
cu.init(); | ||
CuratorFramework client = cu.getClient(); | ||
LeaderLatch leader = new LeaderLatch(client, cu.getElectPath(),hostAddress,LeaderLatch.CloseMode.NOTIFY_LEADER); | ||
leader.start(); | ||
LOGGER.info("this node:" + hostAddress + " is participating in the election of the leader ...."); | ||
leader.addListener(new LeaderLatchListener() { | ||
@Override | ||
public void isLeader() { | ||
try { | ||
lock.lock(); | ||
cu.register(); | ||
} catch (Exception e) { | ||
e.printStackTrace(); | ||
LOGGER.error("The node registration is abnormal, check whether the maxwell host communicates properly with the zookeeper network"); | ||
cu.stop(); | ||
System.exit(1); | ||
}finally { | ||
lock.unlock(); | ||
} | ||
LOGGER.info("node:" + hostAddress + " is current leader, starting Maxwell...."); | ||
LOGGER.info("hasLeadership = " + leader.hasLeadership()); | ||
|
||
run(); | ||
|
||
try { | ||
leader.close(); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} | ||
cu.stop(); | ||
} | ||
|
||
@Override | ||
public void notLeader() { | ||
try { | ||
lock.lock(); | ||
LOGGER.warn("node:" + hostAddress + " lost leader"); | ||
LOGGER.warn("master-slave switchover......"); | ||
LOGGER.warn("The leadership went from " + hostAddress + " to " + leader.getLeader()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this case shut down the current maxwell process given that it has lost the leadership status? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand what you mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I mean by the time we get this call, it means the current Maxwell process has lost leadership. That means we should probably stop the process altogether or shut down the replicator and go back into the election mode waiting for our turn once again. Logging a warn does nothing and means we are going to keep replicator threads running and pumping duplicate data into whatever producer is configured. Additionally, the positions store will start getting conflicting writes from two different processes. @osheroff Do I understand it correctly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand. I have tested this point. When I try to kill the process, the code will go straight to this location and print the results we want, and also print the node information of the next leader. If there are other exceptions that make it impossible to execute this code, please tell me and I will solve it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave it to Ben to make a call on what should happen in this case, but I feel simply shutting down the process gracefully may be the easiest way to avoid conflicts after a leadership loss. Alternatively, it may be possible to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, when the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What do you feel a should it mean for a Maxwell instance to become a follower? (AFAIU, there is no notion of a follower mode in current maxwell codebase) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example,if I start three maxwell instances,we label them as 1,2,3, where 1 is the leader and 2,3 are the followers. When 1 is the leader,2,3 is just a daemon process that doesn't do anything. When 1 exit the leader(not the exit caused by maxwell,but the server failure:For example,exit caused by restart,memory overflow,disk space, etc.),then 2 or 3 takes over from 1 to continue the collection task. If the maxwell process caused by mysql exits,no matter how many instances are started, the problem still presists. This is not something that can be fixed by high availability. What I need to do is to ensure that maxwell itself is highly available. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your solution absolutely solves the scenarios described. One thing I feel it is missing (or I may be confused!) is a scenario when a leader, doing its leader stuff, replicating data data, etc, loses its leadership while remaining alive and seemingly healthy (due to ZK connectivity issues, ZK restart, any other issues that force a new election). In those cases the old leader needs to step down and stop doing its usual leader things and move to a quiet follower mode (stop binlog replicator, don't write into the position store anymore, etc). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tested the scenario you described and got corresponding results. Please let me know if you have any other problems |
||
}catch (Exception e){ | ||
e.printStackTrace(); | ||
}finally { | ||
lock.unlock(); | ||
} | ||
} | ||
}); | ||
|
||
Thread.sleep(Long.MAX_VALUE); | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.