From 99125308d2baaf3ed699dea06c681aaedc15fa38 Mon Sep 17 00:00:00 2001 From: Tim Cuthbertson Date: Tue, 1 Feb 2022 13:43:49 +1100 Subject: [PATCH 1/6] wip --- .../java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java | 2 +- src/main/java/com/zendesk/maxwell/replication/Position.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 470626c61..a3eef207a 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -97,7 +97,7 @@ private void publishFallback(RecordMetadata md, Exception e) { // with no fallback topic to avoid infinite loops KafkaCallback cb = new KafkaCallback(cc, position, key, json, succeededMessageCount, failedMessageCount, succeededMessageMeter, - failedMessageMeter, topic, null, context, producer); + failedMessageMeter, fallbackTopic, null, context, producer); producer.enqueueFallbackRow(fallbackTopic, key, cb, md, e); } diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 2eb0a97bf..4f13391ba 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -24,6 +24,7 @@ public Position withHeartbeat(long lastHeartbeatRead) { } public static Position capture(Connection c, boolean gtidMode) throws SQLException { + //query for heartbeats table, fallback to 0 return new Position(BinlogPosition.capture(c, gtidMode), 0L); } From 8c7d10cead1fd1f1fea78e43cb23b4341a2c061f Mon Sep 17 00:00:00 2001 From: Matt Venz Date: Fri, 4 Feb 2022 13:36:20 +1100 Subject: [PATCH 2/6] Check for last heartbeat in db when capturing the master position --- .../java/com/zendesk/maxwell/Maxwell.java | 3 +- .../com/zendesk/maxwell/MaxwellContext.java | 4 ++ .../zendesk/maxwell/replication/Position.java | 7 ++- .../maxwell/schema/MysqlPositionStore.java | 49 ++++++++++++------- 4 files changed, 41 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index e488241b9..b5e52b866 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -177,7 +177,8 @@ protected Position getInitialPosition() throws Exception { /* fourth method: capture the current master position. */ if ( initial == null ) { try ( Connection c = context.getReplicationConnection() ) { - initial = Position.capture(c, config.gtidMode); + long lastHeartbeatRead = context.getLastHeartbeat(); + initial = Position.capture(c, lastHeartbeatRead, config.gtidMode); } } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 74b49373b..fd357fb7c 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -156,6 +156,10 @@ public void start() throws IOException { getPositionStoreThread(); // boot up thread explicitly. } + public long getLastHeartbeat() throws Exception { + return this.positionStore.getHeartbeat(); + } + public long heartbeat() throws Exception { return this.positionStore.heartbeat(); } diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 4f13391ba..9bb56d18a 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -24,8 +24,11 @@ public Position withHeartbeat(long lastHeartbeatRead) { } public static Position capture(Connection c, boolean gtidMode) throws SQLException { - //query for heartbeats table, fallback to 0 - return new Position(BinlogPosition.capture(c, gtidMode), 0L); + return capture(c, 0L, gtidMode); + } + + public static Position capture(Connection c, long lastHeartbeatRead, boolean gtidMode) throws SQLException { + return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead); } public long getLastHeartbeatRead() { diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java index 92c3a0765..e65f58d8e 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java @@ -16,10 +16,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class MysqlPositionStore { static final Logger LOGGER = LoggerFactory.getLogger(MysqlPositionStore.class); - private static final Long DEFAULT_GTID_SERVER_ID = new Long(0); + private static final Long DEFAULT_GTID_SERVER_ID = 0L; private final Long serverID; private String clientID; private final boolean gtidMode; @@ -41,7 +40,7 @@ public void set(Position newPosition) throws SQLException, DuplicateProcessExcep if ( newPosition == null ) return; - Long heartbeat = newPosition.getLastHeartbeatRead(); + long heartbeat = newPosition.getLastHeartbeatRead(); String sql = "INSERT INTO `positions` set " + "server_id = ?, " @@ -87,6 +86,10 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup }); } + public long getHeartbeat() throws SQLException { + return getHeartbeat(connectionPool.getConnection()); + } + /* * the heartbeat system performs two functions: * 1 - it leaves pointers in the binlog in order to facilitate master recovery @@ -95,9 +98,23 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup private Long lastHeartbeat = null; - private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { - String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?"; + private long getHeartbeat(Connection c) throws SQLException { + try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { + s.setLong(1, serverID); + s.setString(2, clientID); + try ( ResultSet rs = s.executeQuery() ) { + if ( !rs.next() ) { + return 0L; + } else { + return rs.getLong("heartbeat"); + } + } + } + } + + private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { + String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?"; try ( PreparedStatement s = c.prepareStatement(heartbeatInsert) ) { s.setLong(1, thisHeartbeat); @@ -105,7 +122,6 @@ private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti s.setString(3, clientID); s.execute(); - return thisHeartbeat; } catch ( SQLIntegrityConstraintViolationException e ) { throw new DuplicateProcessException("Found heartbeat row for client,position while trying to insert. Is another maxwell running?"); } @@ -113,19 +129,14 @@ private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti private void heartbeat(Connection c, long thisHeartbeat) throws SQLException, DuplicateProcessException { if ( lastHeartbeat == null ) { - try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { - s.setLong(1, serverID); - s.setString(2, clientID); - - try ( ResultSet rs = s.executeQuery() ) { - if ( !rs.next() ) { - insertHeartbeat(c, thisHeartbeat); - lastHeartbeat = thisHeartbeat; - return; - } else { - lastHeartbeat = rs.getLong("heartbeat"); - } - } + long storedHeartbeat = getHeartbeat(c); + + if (storedHeartbeat > 0) { + lastHeartbeat = storedHeartbeat; + } else { + insertHeartbeat(c, thisHeartbeat); + lastHeartbeat = thisHeartbeat; + return; } } From a36f25ad26920ee0338e69c902f28320a0ba0740 Mon Sep 17 00:00:00 2001 From: Matt Venz Date: Mon, 7 Feb 2022 12:14:23 +1100 Subject: [PATCH 3/6] Wrap connection in try-with-resources --- .../java/com/zendesk/maxwell/schema/MysqlPositionStore.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java index e65f58d8e..00f30e5ea 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java @@ -87,7 +87,9 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup } public long getHeartbeat() throws SQLException { - return getHeartbeat(connectionPool.getConnection()); + try (Connection connection = connectionPool.getConnection()) { + return getHeartbeat(connection); + } } /* From 5fb84eed81d66d1cc0008eed6f660b3a6c0f5428 Mon Sep 17 00:00:00 2001 From: Matt Venz Date: Mon, 7 Feb 2022 13:42:37 +1100 Subject: [PATCH 4/6] Change order of arguments --- src/main/java/com/zendesk/maxwell/Maxwell.java | 2 +- src/main/java/com/zendesk/maxwell/replication/Position.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index b5e52b866..b1a180eeb 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -178,7 +178,7 @@ protected Position getInitialPosition() throws Exception { if ( initial == null ) { try ( Connection c = context.getReplicationConnection() ) { long lastHeartbeatRead = context.getLastHeartbeat(); - initial = Position.capture(c, lastHeartbeatRead, config.gtidMode); + initial = Position.capture(c, config.gtidMode, lastHeartbeatRead); } } diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 9bb56d18a..951ddda98 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -24,10 +24,10 @@ public Position withHeartbeat(long lastHeartbeatRead) { } public static Position capture(Connection c, boolean gtidMode) throws SQLException { - return capture(c, 0L, gtidMode); + return capture(c, gtidMode, 0L); } - public static Position capture(Connection c, long lastHeartbeatRead, boolean gtidMode) throws SQLException { + public static Position capture(Connection c, boolean gtidMode, long lastHeartbeatRead) throws SQLException { return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead); } From ac08b5a03babaeb7f87678775a2b526513128277 Mon Sep 17 00:00:00 2001 From: Matt Venz Date: Mon, 7 Feb 2022 15:56:56 +1100 Subject: [PATCH 5/6] Capture schema if heartbeat is 0 --- .../java/com/zendesk/maxwell/Maxwell.java | 5 +-- .../com/zendesk/maxwell/MaxwellContext.java | 4 -- .../zendesk/maxwell/replication/Position.java | 6 +-- .../maxwell/schema/MysqlPositionStore.java | 45 +++++++------------ 4 files changed, 18 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index b1a180eeb..517515ec0 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -177,8 +177,7 @@ protected Position getInitialPosition() throws Exception { /* fourth method: capture the current master position. */ if ( initial == null ) { try ( Connection c = context.getReplicationConnection() ) { - long lastHeartbeatRead = context.getLastHeartbeat(); - initial = Position.capture(c, config.gtidMode, lastHeartbeatRead); + initial = Position.capture(c, config.gtidMode); } } @@ -267,7 +266,7 @@ private void startInner() throws Exception { this.context.startSchemaCompactor(); - if (config.recaptureSchema) { + if (config.recaptureSchema || initPosition.getLastHeartbeatRead() <= 0) { mysqlSchemaStore.captureAndSaveSchema(); } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index fd357fb7c..74b49373b 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -156,10 +156,6 @@ public void start() throws IOException { getPositionStoreThread(); // boot up thread explicitly. } - public long getLastHeartbeat() throws Exception { - return this.positionStore.getHeartbeat(); - } - public long heartbeat() throws Exception { return this.positionStore.heartbeat(); } diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 951ddda98..2eb0a97bf 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -24,11 +24,7 @@ public Position withHeartbeat(long lastHeartbeatRead) { } public static Position capture(Connection c, boolean gtidMode) throws SQLException { - return capture(c, gtidMode, 0L); - } - - public static Position capture(Connection c, boolean gtidMode, long lastHeartbeatRead) throws SQLException { - return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead); + return new Position(BinlogPosition.capture(c, gtidMode), 0L); } public long getLastHeartbeatRead() { diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java index 00f30e5ea..d6a381abe 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java @@ -86,12 +86,6 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup }); } - public long getHeartbeat() throws SQLException { - try (Connection connection = connectionPool.getConnection()) { - return getHeartbeat(connection); - } - } - /* * the heartbeat system performs two functions: * 1 - it leaves pointers in the binlog in order to facilitate master recovery @@ -100,22 +94,7 @@ public long getHeartbeat() throws SQLException { private Long lastHeartbeat = null; - private long getHeartbeat(Connection c) throws SQLException { - try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { - s.setLong(1, serverID); - s.setString(2, clientID); - - try ( ResultSet rs = s.executeQuery() ) { - if ( !rs.next() ) { - return 0L; - } else { - return rs.getLong("heartbeat"); - } - } - } - } - - private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { + private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?"; try ( PreparedStatement s = c.prepareStatement(heartbeatInsert) ) { @@ -124,6 +103,7 @@ private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti s.setString(3, clientID); s.execute(); + return thisHeartbeat; } catch ( SQLIntegrityConstraintViolationException e ) { throw new DuplicateProcessException("Found heartbeat row for client,position while trying to insert. Is another maxwell running?"); } @@ -131,14 +111,19 @@ private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti private void heartbeat(Connection c, long thisHeartbeat) throws SQLException, DuplicateProcessException { if ( lastHeartbeat == null ) { - long storedHeartbeat = getHeartbeat(c); - - if (storedHeartbeat > 0) { - lastHeartbeat = storedHeartbeat; - } else { - insertHeartbeat(c, thisHeartbeat); - lastHeartbeat = thisHeartbeat; - return; + try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { + s.setLong(1, serverID); + s.setString(2, clientID); + + try ( ResultSet rs = s.executeQuery() ) { + if ( !rs.next() ) { + insertHeartbeat(c, thisHeartbeat); + lastHeartbeat = thisHeartbeat; + return; + } else { + lastHeartbeat = rs.getLong("heartbeat"); + } + } } } From 2428d6278dc066fbe694fc9e23624f37cfd942d7 Mon Sep 17 00:00:00 2001 From: Matt Venz Date: Tue, 8 Feb 2022 12:20:52 +1100 Subject: [PATCH 6/6] Revert "Capture schema if heartbeat is 0" This reverts commit ac08b5a03babaeb7f87678775a2b526513128277. --- .../java/com/zendesk/maxwell/Maxwell.java | 5 ++- .../com/zendesk/maxwell/MaxwellContext.java | 4 ++ .../zendesk/maxwell/replication/Position.java | 6 ++- .../maxwell/schema/MysqlPositionStore.java | 45 ++++++++++++------- 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index 517515ec0..b1a180eeb 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -177,7 +177,8 @@ protected Position getInitialPosition() throws Exception { /* fourth method: capture the current master position. */ if ( initial == null ) { try ( Connection c = context.getReplicationConnection() ) { - initial = Position.capture(c, config.gtidMode); + long lastHeartbeatRead = context.getLastHeartbeat(); + initial = Position.capture(c, config.gtidMode, lastHeartbeatRead); } } @@ -266,7 +267,7 @@ private void startInner() throws Exception { this.context.startSchemaCompactor(); - if (config.recaptureSchema || initPosition.getLastHeartbeatRead() <= 0) { + if (config.recaptureSchema) { mysqlSchemaStore.captureAndSaveSchema(); } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 74b49373b..fd357fb7c 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -156,6 +156,10 @@ public void start() throws IOException { getPositionStoreThread(); // boot up thread explicitly. } + public long getLastHeartbeat() throws Exception { + return this.positionStore.getHeartbeat(); + } + public long heartbeat() throws Exception { return this.positionStore.heartbeat(); } diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 2eb0a97bf..951ddda98 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -24,7 +24,11 @@ public Position withHeartbeat(long lastHeartbeatRead) { } public static Position capture(Connection c, boolean gtidMode) throws SQLException { - return new Position(BinlogPosition.capture(c, gtidMode), 0L); + return capture(c, gtidMode, 0L); + } + + public static Position capture(Connection c, boolean gtidMode, long lastHeartbeatRead) throws SQLException { + return new Position(BinlogPosition.capture(c, gtidMode), lastHeartbeatRead); } public long getLastHeartbeatRead() { diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java index d6a381abe..00f30e5ea 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java @@ -86,6 +86,12 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup }); } + public long getHeartbeat() throws SQLException { + try (Connection connection = connectionPool.getConnection()) { + return getHeartbeat(connection); + } + } + /* * the heartbeat system performs two functions: * 1 - it leaves pointers in the binlog in order to facilitate master recovery @@ -94,7 +100,22 @@ public synchronized void heartbeat(long heartbeatValue) throws SQLException, Dup private Long lastHeartbeat = null; - private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { + private long getHeartbeat(Connection c) throws SQLException { + try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { + s.setLong(1, serverID); + s.setString(2, clientID); + + try ( ResultSet rs = s.executeQuery() ) { + if ( !rs.next() ) { + return 0L; + } else { + return rs.getLong("heartbeat"); + } + } + } + } + + private void insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLException, DuplicateProcessException { String heartbeatInsert = "insert into `heartbeats` set `heartbeat` = ?, `server_id` = ?, `client_id` = ?"; try ( PreparedStatement s = c.prepareStatement(heartbeatInsert) ) { @@ -103,7 +124,6 @@ private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti s.setString(3, clientID); s.execute(); - return thisHeartbeat; } catch ( SQLIntegrityConstraintViolationException e ) { throw new DuplicateProcessException("Found heartbeat row for client,position while trying to insert. Is another maxwell running?"); } @@ -111,19 +131,14 @@ private Long insertHeartbeat(Connection c, Long thisHeartbeat) throws SQLExcepti private void heartbeat(Connection c, long thisHeartbeat) throws SQLException, DuplicateProcessException { if ( lastHeartbeat == null ) { - try ( PreparedStatement s = c.prepareStatement("SELECT `heartbeat` from `heartbeats` where server_id = ? and client_id = ?") ) { - s.setLong(1, serverID); - s.setString(2, clientID); - - try ( ResultSet rs = s.executeQuery() ) { - if ( !rs.next() ) { - insertHeartbeat(c, thisHeartbeat); - lastHeartbeat = thisHeartbeat; - return; - } else { - lastHeartbeat = rs.getLong("heartbeat"); - } - } + long storedHeartbeat = getHeartbeat(c); + + if (storedHeartbeat > 0) { + lastHeartbeat = storedHeartbeat; + } else { + insertHeartbeat(c, thisHeartbeat); + lastHeartbeat = thisHeartbeat; + return; } }