Skip to content

Commit 062a2bb

Browse files
committed
JAVA-2069: Use both setVersion and electionId to detect a stale primary
1 parent 0a3e00d commit 062a2bb

File tree

7 files changed

+86
-15
lines changed

7 files changed

+86
-15
lines changed

src/main/com/mongodb/BaseCluster.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ protected void fireChangeEvent() {
197197
clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description));
198198
}
199199

200+
ClusterDescription getCurrentDescription() {
201+
return description;
202+
}
203+
200204
// gets a random server that still exists in the cluster. Returns null if there are none.
201205
private ClusterableServer getRandomServer(final List<ServerDescription> serverDescriptions) {
202206
while (!serverDescriptions.isEmpty()) {

src/main/com/mongodb/MultiServerCluster.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ final class MultiServerCluster extends BaseCluster {
4545
private ClusterType clusterType;
4646
private String replicaSetName;
4747
private ObjectId maxElectionId;
48+
private Integer maxSetVersion;
4849
private final ConcurrentMap<ServerAddress, ServerTuple> addressToServerTupleMap =
4950
new ConcurrentHashMap<ServerAddress, ServerTuple>();
5051

@@ -201,10 +202,13 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
201202
}
202203

203204
if (newDescription.isPrimary()) {
204-
if (newDescription.getElectionId() != null) {
205-
if (maxElectionId != null && maxElectionId.compareTo(newDescription.getElectionId()) > 0) {
206-
LOGGER.info(format("Invalidating potential primary %s whose election id %s is less than the max election id seen so "
207-
+ "far %s", newDescription.getAddress(), newDescription.getElectionId(), maxElectionId));
205+
if (newDescription.getSetVersion() != null && newDescription.getElectionId() != null) {
206+
if (isStalePrimary(newDescription)) {
207+
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
208+
+ "is less than one already seen of (%d, %s)",
209+
newDescription.getAddress(),
210+
newDescription.getSetVersion(), newDescription.getElectionId(),
211+
maxSetVersion, maxElectionId));
208212
addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate();
209213
return false;
210214
}
@@ -216,6 +220,13 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
216220
}
217221
}
218222

223+
if (newDescription.getSetVersion() != null
224+
&& (maxSetVersion == null || newDescription.getSetVersion().compareTo(maxSetVersion) > 0)) {
225+
LOGGER.info(format("Setting max set version to %d from replica set primary %s", newDescription.getSetVersion(),
226+
newDescription.getAddress()));
227+
maxSetVersion = newDescription.getSetVersion();
228+
}
229+
219230
if (isNotAlreadyPrimary(newDescription.getAddress())) {
220231
LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress()));
221232
}
@@ -224,6 +235,15 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
224235
return true;
225236
}
226237

238+
private boolean isStalePrimary(final ServerDescription newDescription) {
239+
if (maxSetVersion == null || maxElectionId == null) {
240+
return false;
241+
}
242+
243+
return (maxSetVersion.compareTo(newDescription.getSetVersion()) > 0
244+
|| (maxSetVersion.equals(newDescription.getSetVersion()) && maxElectionId.compareTo(newDescription.getElectionId()) > 0));
245+
}
246+
227247
private boolean isNotAlreadyPrimary(final ServerAddress address) {
228248
ServerTuple serverTuple = addressToServerTupleMap.get(address);
229249
return serverTuple == null || !serverTuple.description.isPrimary();

src/main/com/mongodb/ServerDescription.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class ServerDescription {
7474
private final int maxWireVersion;
7575

7676
private final ObjectId electionId;
77+
private final Integer setVersion;
7778

7879
private final Throwable exception;
7980

@@ -97,6 +98,7 @@ static class Builder {
9798
private int minWireVersion = 0;
9899
private int maxWireVersion = 0;
99100
private ObjectId electionId;
101+
private Integer setVersion;
100102
private Throwable exception;
101103

102104
// CHECKSTYLE:OFF
@@ -202,6 +204,17 @@ public Builder electionId(final ObjectId electionId) {
202204
return this;
203205
}
204206

207+
/**
208+
* Sets the setVersion reported by this server.
209+
*
210+
* @param setVersion the set version
211+
* @return this
212+
*/
213+
public Builder setVersion(final Integer setVersion) {
214+
this.setVersion = setVersion;
215+
return this;
216+
}
217+
205218
public Builder exception(final Throwable exception) {
206219
this.exception = exception;
207220
return this;
@@ -343,6 +356,15 @@ public ObjectId getElectionId() {
343356
return electionId;
344357
}
345358

359+
/**
360+
* The replica set setVersion reported by this MongoDB server.
361+
*
362+
* @return the setVersion, which may be null
363+
*/
364+
public Integer getSetVersion() {
365+
return setVersion;
366+
}
367+
346368
/**
347369
* Returns true if the server has the given tags. A server of either type {@code ServerType.StandAlone} or
348370
* {@code ServerType.ShardRouter} is considered to have all tags, so this method will always return true for instances of either of
@@ -467,7 +489,9 @@ public boolean equals(final Object o) {
467489
if (electionId != null ? !electionId.equals(that.electionId) : that.electionId != null) {
468490
return false;
469491
}
470-
492+
if (setVersion != null ? !setVersion.equals(that.setVersion) : that.setVersion != null) {
493+
return false;
494+
}
471495
// Compare class equality and message as exceptions rarely override equals
472496
Class thisExceptionClass = exception != null ? exception.getClass() : null;
473497
Class thatExceptionClass = that.exception != null ? that.exception.getClass() : null;
@@ -505,6 +529,7 @@ public int hashCode() {
505529
result = 31 * result + minWireVersion;
506530
result = 31 * result + maxWireVersion;
507531
result = 31 * result + (electionId != null ? electionId.hashCode() : 0);
532+
result = 31 * result + (setVersion != null ? setVersion.hashCode() : 0);
508533
result = 31 * result + (exception == null ? 0 : exception.getClass().hashCode());
509534
result = 31 * result + (exception == null ? 0 : exception.getMessage().hashCode());
510535
return result;
@@ -524,6 +549,7 @@ public String toString() {
524549
+ ", maxMessageSize=" + maxMessageSize
525550
+ ", maxWriteBatchSize=" + maxWriteBatchSize
526551
+ ", electionId=" + electionId
552+
+ ", setVersion=" + setVersion
527553
+ ", tagSet=" + tagSet
528554
+ ", setName='" + setName + '\''
529555
+ ", averageLatencyNanos=" + averageLatencyNanos
@@ -587,6 +613,7 @@ private String getAverageLatencyFormattedInMilliseconds() {
587613
minWireVersion = builder.minWireVersion;
588614
maxWireVersion = builder.maxWireVersion;
589615
electionId = builder.electionId;
616+
setVersion = builder.setVersion;
590617
exception = builder.exception;
591618
}
592619
}

src/main/com/mongodb/ServerMonitor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ private ServerDescription lookupServerDescription(final DBPort connection) throw
239239
}
240240

241241
@SuppressWarnings("unchecked")
242-
private ServerDescription createDescription(final CommandResult commandResult, final ServerVersion serverVersion,
243-
final long averageLatencyNanos) {
242+
ServerDescription createDescription(final CommandResult commandResult, final ServerVersion serverVersion,
243+
final long averageLatencyNanos) {
244244
return ServerDescription.builder()
245245
.state(ServerConnectionState.Connected)
246246
.version(serverVersion)
@@ -260,6 +260,7 @@ private ServerDescription createDescription(final CommandResult commandResult, f
260260
.minWireVersion(commandResult.getInt("minWireVersion", ServerDescription.getDefaultMinWireVersion()))
261261
.maxWireVersion(commandResult.getInt("maxWireVersion", ServerDescription.getDefaultMaxWireVersion()))
262262
.electionId(commandResult.containsKey("electionId") ? commandResult.getObjectId("electionId") : null)
263+
.setVersion(commandResult.containsKey("setVersion") ? commandResult.getInt("setVersion") : null)
263264
.averageLatency(averageLatencyNanos, TimeUnit.NANOSECONDS)
264265
.ok(commandResult.ok()).build();
265266
}

src/test/com/mongodb/MultiServerClusterSpecification.groovy

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -510,23 +510,24 @@ class MultiServerClusterSpecification extends Specification {
510510

511511
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, String setName,
512512
ServerAddress trueAddress) {
513-
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, setName, null, trueAddress)
514-
.build())
513+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, setName, null, null,
514+
trueAddress).build())
515515
}
516516

517517
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, List<ServerAddress> passives,
518518
String setName) {
519-
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null, null)
520-
.build())
519+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null, null,
520+
null).build())
521521
}
522522

523523
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, ObjectId electionId) {
524-
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', electionId, null)
525-
.build())
524+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', 2, electionId,
525+
null).build())
526526
}
527527

528528
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, boolean ok) {
529-
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null, null).build())
529+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null, null, null)
530+
.build())
530531
}
531532

532533
def getClusterDescription(MultiServerCluster cluster) {
@@ -542,7 +543,7 @@ class MultiServerClusterSpecification extends Specification {
542543
}
543544

544545
def getBuilder(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, List<ServerAddress> passives, boolean ok,
545-
String setName, ObjectId electionId, ServerAddress trueAddress) {
546+
String setName, Integer setVersion, ObjectId electionId, ServerAddress trueAddress) {
546547
ServerDescription.builder()
547548
.address(serverAddress)
548549
.type(serverType)
@@ -553,5 +554,6 @@ class MultiServerClusterSpecification extends Specification {
553554
.passives(passives*.toString() as Set)
554555
.setName(setName)
555556
.electionId(electionId)
557+
.setVersion(setVersion)
556558
}
557559
}

src/test/com/mongodb/ServerDescriptionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public void testDefaults() throws UnknownHostException {
8888
assertEquals(0, serverDescription.getMinWireVersion());
8989
assertEquals(0, serverDescription.getMaxWireVersion());
9090
assertNull(serverDescription.getElectionId());
91+
assertNull(serverDescription.getSetVersion());
9192
assertNull(serverDescription.getException());
9293
}
9394

@@ -117,6 +118,7 @@ public void testBuilder() throws UnknownHostException {
117118
.minWireVersion(1)
118119
.maxWireVersion(2)
119120
.electionId(new ObjectId("123412341234123412341234"))
121+
.setVersion(2)
120122
.exception(exception)
121123
.build();
122124

@@ -151,6 +153,7 @@ public void testBuilder() throws UnknownHostException {
151153
assertEquals(1, serverDescription.getMinWireVersion());
152154
assertEquals(2, serverDescription.getMaxWireVersion());
153155
assertEquals(new ObjectId("123412341234123412341234"), serverDescription.getElectionId());
156+
assertEquals(new Integer(2), serverDescription.getSetVersion());
154157
assertEquals(exception, serverDescription.getException());
155158
}
156159

@@ -176,6 +179,7 @@ public void testObjectOverrides() throws UnknownHostException {
176179
.minWireVersion(1)
177180
.maxWireVersion(2)
178181
.electionId(new ObjectId())
182+
.setVersion(2)
179183
.exception(new IllegalArgumentException("This is illegal"));
180184
assertEquals(builder.build(), builder.build());
181185
assertEquals(builder.build().hashCode(), builder.build().hashCode());

src/test/com/mongodb/ServerMonitorSpecification.groovy

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,19 @@ class ServerMonitorSpecification extends FunctionalSpecification {
7272
newDescription.electionId == expected
7373
}
7474

75+
def 'should set setVersion'() {
76+
given:
77+
initializeServerMonitor(new ServerAddress())
78+
CommandResult commandResult = database.command(new BasicDBObject('ismaster', 1))
79+
def expected = commandResult.get('setVersion')
80+
81+
when:
82+
latch.await()
83+
84+
then:
85+
newDescription.setVersion == expected
86+
}
87+
7588
@IgnoreIf( { serverIsAtLeastVersion(2.6) } )
7689
def 'should set default max wire batch size when not provided by server'() {
7790
given:

0 commit comments

Comments
 (0)