Skip to content

GOA-522: Use last heartbeat if available when getting initial position #1823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ protected Position getInitialPosition() throws Exception {
/* fourth method: capture the current master position. */
if ( initial == null ) {
try ( Connection c = context.getReplicationConnection() ) {
initial = Position.capture(c, config.gtidMode);
long lastHeartbeatRead = context.getLastHeartbeat();
initial = Position.capture(c, lastHeartbeatRead, config.gtidMode);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public void start() throws IOException {
getPositionStoreThread(); // boot up thread explicitly.
}

public long getLastHeartbeat() throws Exception {
return this.positionStore.getHeartbeat();
}

public long heartbeat() throws Exception {
return this.positionStore.heartbeat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void publishFallback(RecordMetadata md, Exception e) {
// with no fallback topic to avoid infinite loops
KafkaCallback cb = new KafkaCallback(cc, position, key, json,
succeededMessageCount, failedMessageCount, succeededMessageMeter,
failedMessageMeter, topic, null, context, producer);
failedMessageMeter, fallbackTopic, null, context, producer);
producer.enqueueFallbackRow(fallbackTopic, key, cb, md, e);
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/zendesk/maxwell/replication/Position.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ public Position withHeartbeat(long lastHeartbeatRead) {
}

public static Position capture(Connection c, boolean gtidMode) throws SQLException {
return new Position(BinlogPosition.capture(c, gtidMode), 0L);
return capture(c, 0L, gtidMode);
}

public static Position capture(Connection c, long lastHeartbeatRead, boolean gtidMode) throws SQLException {
return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead);
}

public long getLastHeartbeatRead() {
Expand Down
49 changes: 30 additions & 19 deletions src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MysqlPositionStore {
static final Logger LOGGER = LoggerFactory.getLogger(MysqlPositionStore.class);
private static final Long DEFAULT_GTID_SERVER_ID = new Long(0);
private static final Long DEFAULT_GTID_SERVER_ID = 0L;
private final Long serverID;
private String clientID;
private final boolean gtidMode;
Expand All @@ -41,7 +40,7 @@ public void set(Position newPosition) throws SQLException, DuplicateProcessExcep
if ( newPosition == null )
return;

Long heartbeat = newPosition.getLastHeartbeatRead();
long heartbeat = newPosition.getLastHeartbeatRead();

String sql = "INSERT INTO `positions` set "
+ "server_id = ?, "
Expand Down Expand Up @@ -87,6 +86,10 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup
});
}

public long getHeartbeat() throws SQLException {
return getHeartbeat(connectionPool.getConnection());
}

/*
* the heartbeat system performs two functions:
* 1 - it leaves pointers in the binlog in order to facilitate master recovery
Expand All @@ -95,37 +98,45 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup

private Long lastHeartbeat = null;

private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException {
String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?";
private long getHeartbeat(Connection c) throws SQLException {
try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) {
s.setLong(1, serverID);
s.setString(2, clientID);

try ( ResultSet rs = s.executeQuery() ) {
if ( !rs.next() ) {
return 0L;
} else {
return rs.getLong("heartbeat");
}
}
}
}

private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException {
String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?";

try ( PreparedStatement s = c.prepareStatement(heartbeatInsert) ) {
s.setLong(1, thisHeartbeat);
s.setLong(2, serverID);
s.setString(3, clientID);

s.execute();
return thisHeartbeat;
} catch ( SQLIntegrityConstraintViolationException e ) {
throw new DuplicateProcessException("Found heartbeat row for client,position while trying to insert. Is another maxwell running?");
}
}

private void heartbeat(Connection c, long thisHeartbeat) throws SQLException, DuplicateProcessException {
if ( lastHeartbeat == null ) {
try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) {
s.setLong(1, serverID);
s.setString(2, clientID);

try ( ResultSet rs = s.executeQuery() ) {
if ( !rs.next() ) {
insertHeartbeat(c, thisHeartbeat);
lastHeartbeat = thisHeartbeat;
return;
} else {
lastHeartbeat = rs.getLong("heartbeat");
}
}
long storedHeartbeat = getHeartbeat(c);

if (storedHeartbeat > 0) {
lastHeartbeat = storedHeartbeat;
} else {
insertHeartbeat(c, thisHeartbeat);
lastHeartbeat = thisHeartbeat;
return;
}
}

Expand Down