diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index e488241b9..b1a180eeb 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -177,7 +177,8 @@ protected Position getInitialPosition() throws Exception { /* fourth method: capture the current master position. */ if ( initial == null ) { try ( Connection c = context.getReplicationConnection() ) { - initial = Position.capture(c, config.gtidMode); + long lastHeartbeatRead = context.getLastHeartbeat(); + initial = Position.capture(c, config.gtidMode, lastHeartbeatRead); } } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 74b49373b..fd357fb7c 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -156,6 +156,10 @@ public void start() throws IOException { getPositionStoreThread(); // boot up thread explicitly. } + public long getLastHeartbeat() throws Exception { + return this.positionStore.getHeartbeat(); + } + public long heartbeat() throws Exception { return this.positionStore.heartbeat(); } diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 470626c61..a3eef207a 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -97,7 +97,7 @@ private void publishFallback(RecordMetadata md, Exception e) { // with no fallback topic to avoid infinite loops KafkaCallback cb = new KafkaCallback(cc, position, key, json, succeededMessageCount, failedMessageCount, succeededMessageMeter, - failedMessageMeter, topic, null, context, producer); + failedMessageMeter, fallbackTopic, null, context, producer); producer.enqueueFallbackRow(fallbackTopic, key, cb, md, e); } diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 2eb0a97bf..951ddda98 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -24,7 +24,11 @@ public Position withHeartbeat(long lastHeartbeatRead) { } public static Position capture(Connection c, boolean gtidMode) throws SQLException { - return new Position(BinlogPosition.capture(c, gtidMode), 0L); + return capture(c, gtidMode, 0L); + } + + public static Position capture(Connection c, boolean gtidMode, long lastHeartbeatRead) throws SQLException { + return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead); } public long getLastHeartbeatRead() { diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java index 92c3a0765..00f30e5ea 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java @@ -16,10 +16,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class MysqlPositionStore { static final Logger LOGGER = LoggerFactory.getLogger(MysqlPositionStore.class); - private static final Long DEFAULT_GTID_SERVER_ID = new Long(0); + private static final Long DEFAULT_GTID_SERVER_ID = 0L; private final Long serverID; private String clientID; private final boolean gtidMode; @@ -41,7 +40,7 @@ public void set(Position newPosition) throws SQLException, DuplicateProcessExcep if ( newPosition == null ) return; - Long heartbeat = newPosition.getLastHeartbeatRead(); + long heartbeat = newPosition.getLastHeartbeatRead(); String sql = "INSERT INTO `positions` set " + "server_id = ?, " @@ -87,6 +86,12 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup }); } + public long getHeartbeat() throws SQLException { + try (Connection connection = connectionPool.getConnection()) { + return getHeartbeat(connection); + } + } + /* * the heartbeat system performs two functions: * 1 - it leaves pointers in the binlog in order to facilitate master recovery @@ -95,9 +100,23 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup private Long lastHeartbeat = null; - private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { - String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?"; + private long getHeartbeat(Connection c) throws SQLException { + try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { + s.setLong(1, serverID); + s.setString(2, clientID); + + try ( ResultSet rs = s.executeQuery() ) { + if ( !rs.next() ) { + return 0L; + } else { + return rs.getLong("heartbeat"); + } + } + } + } + private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { + String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?"; try ( PreparedStatement s = c.prepareStatement(heartbeatInsert) ) { s.setLong(1, thisHeartbeat); @@ -105,7 +124,6 @@ private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti s.setString(3, clientID); s.execute(); - return thisHeartbeat; } catch ( SQLIntegrityConstraintViolationException e ) { throw new DuplicateProcessException("Found heartbeat row for client,position while trying to insert. Is another maxwell running?"); } @@ -113,19 +131,14 @@ private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti private void heartbeat(Connection c, long thisHeartbeat) throws SQLException, DuplicateProcessException { if ( lastHeartbeat == null ) { - try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { - s.setLong(1, serverID); - s.setString(2, clientID); - - try ( ResultSet rs = s.executeQuery() ) { - if ( !rs.next() ) { - insertHeartbeat(c, thisHeartbeat); - lastHeartbeat = thisHeartbeat; - return; - } else { - lastHeartbeat = rs.getLong("heartbeat"); - } - } + long storedHeartbeat = getHeartbeat(c); + + if (storedHeartbeat > 0) { + lastHeartbeat = storedHeartbeat; + } else { + insertHeartbeat(c, thisHeartbeat); + lastHeartbeat = thisHeartbeat; + return; } }