Skip to content

Commit 120c6dd

Browse files
authored
Introduce and use Cluster.withLock to achieve cluster-wide locking (#872)
This is a backport of #869. JAVA-4471
1 parent 3fabb53 commit 120c6dd

15 files changed

+140
-95
lines changed

driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.Set;
3838
import java.util.concurrent.ConcurrentHashMap;
3939
import java.util.concurrent.ConcurrentMap;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.concurrent.atomic.AtomicReference;
4042

4143
import static com.mongodb.assertions.Assertions.isTrue;
4244
import static com.mongodb.connection.ClusterConnectionMode.MULTIPLE;
@@ -89,17 +91,17 @@ MongoException getSrvResolutionException() {
8991

9092
protected void initialize(final Collection<ServerAddress> serverAddresses) {
9193
ClusterDescription currentDescription = getCurrentDescription();
92-
ClusterDescription newDescription;
94+
AtomicReference<ClusterDescription> newDescription = new AtomicReference<>();
9395

9496
// synchronizing this code because addServer registers a callback which is re-entrant to this instance.
9597
// In other words, we are leaking a reference to "this" from the constructor.
96-
synchronized (this) {
98+
withLock(() -> {
9799
for (final ServerAddress serverAddress : serverAddresses) {
98100
addServer(serverAddress);
99101
}
100-
newDescription = updateDescription();
101-
}
102-
fireChangeEvent(newDescription, currentDescription);
102+
newDescription.set(updateDescription());
103+
});
104+
fireChangeEvent(newDescription.get(), currentDescription);
103105
}
104106

105107
@Override
@@ -111,14 +113,14 @@ protected void connect() {
111113

112114
@Override
113115
public void close() {
114-
synchronized (this) {
116+
withLock(() -> {
115117
if (!isClosed()) {
116118
for (final ServerTuple serverTuple : addressToServerTupleMap.values()) {
117119
serverTuple.server.close();
118120
}
119121
}
120122
super.close();
121-
}
123+
});
122124
}
123125

124126
@Override
@@ -141,7 +143,7 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event)
141143
}
142144

143145
void onChange(final Collection<ServerAddress> newHosts) {
144-
synchronized (this) {
146+
withLock(() -> {
145147
if (isClosed()) {
146148
return;
147149
}
@@ -165,15 +167,16 @@ void onChange(final Collection<ServerAddress> newHosts) {
165167
ClusterDescription newClusterDescription = updateDescription();
166168

167169
fireChangeEvent(newClusterDescription, oldClusterDescription);
168-
}
170+
});
169171
}
170172

171173
private void onChange(final ServerDescriptionChangedEvent event) {
172-
ClusterDescription oldClusterDescription = null;
173-
ClusterDescription newClusterDescription = null;
174-
boolean shouldUpdateDescription = true;
175-
synchronized (this) {
174+
AtomicReference<ClusterDescription> oldClusterDescription = new AtomicReference<>();
175+
AtomicReference<ClusterDescription> newClusterDescription = new AtomicReference<>();
176+
AtomicBoolean shouldUpdateDescription = new AtomicBoolean(true);
177+
withLock(() -> {
176178
if (isClosed()) {
179+
shouldUpdateDescription.set(false);
177180
return;
178181
}
179182

@@ -190,6 +193,7 @@ private void onChange(final ServerDescriptionChangedEvent event) {
190193
LOGGER.trace(format("Ignoring description changed event for removed server %s",
191194
newDescription.getAddress()));
192195
}
196+
shouldUpdateDescription.set(false);
193197
return;
194198
}
195199

@@ -203,27 +207,27 @@ private void onChange(final ServerDescriptionChangedEvent event) {
203207

204208
switch (clusterType) {
205209
case REPLICA_SET:
206-
shouldUpdateDescription = handleReplicaSetMemberChanged(newDescription);
210+
shouldUpdateDescription.set(handleReplicaSetMemberChanged(newDescription));
207211
break;
208212
case SHARDED:
209-
shouldUpdateDescription = handleShardRouterChanged(newDescription);
213+
shouldUpdateDescription.set(handleShardRouterChanged(newDescription));
210214
break;
211215
case STANDALONE:
212-
shouldUpdateDescription = handleStandAloneChanged(newDescription);
216+
shouldUpdateDescription.set(handleStandAloneChanged(newDescription));
213217
break;
214218
default:
215219
break;
216220
}
217221
}
218222

219-
if (shouldUpdateDescription) {
223+
if (shouldUpdateDescription.get()) {
220224
serverTuple.description = newDescription;
221-
oldClusterDescription = getCurrentDescription();
222-
newClusterDescription = updateDescription();
225+
oldClusterDescription.set(getCurrentDescription());
226+
newClusterDescription.set(updateDescription());
223227
}
224-
}
225-
if (shouldUpdateDescription) {
226-
fireChangeEvent(newClusterDescription, oldClusterDescription);
228+
});
229+
if (shouldUpdateDescription.get()) {
230+
fireChangeEvent(newClusterDescription.get(), oldClusterDescription.get());
227231
}
228232
}
229233

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,15 @@ public boolean isClosed() {
240240
return isClosed;
241241
}
242242

243-
protected synchronized void updateDescription(final ClusterDescription newDescription) {
244-
if (LOGGER.isDebugEnabled()) {
245-
LOGGER.debug(format("Updating cluster description to %s", newDescription.getShortDescription()));
246-
}
243+
protected void updateDescription(final ClusterDescription newDescription) {
244+
withLock(() -> {
245+
if (LOGGER.isDebugEnabled()) {
246+
LOGGER.debug(format("Updating cluster description to %s", newDescription.getShortDescription()));
247+
}
247248

248-
description = newDescription;
249-
updatePhase();
249+
description = newDescription;
250+
updatePhase();
251+
});
250252
}
251253

252254
protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) {
@@ -261,8 +263,13 @@ public ClusterDescription getCurrentDescription() {
261263
return description;
262264
}
263265

264-
private synchronized void updatePhase() {
265-
phase.getAndSet(new CountDownLatch(1)).countDown();
266+
@Override
267+
public synchronized void withLock(final Runnable action) {
268+
action.run();
269+
}
270+
271+
private void updatePhase() {
272+
withLock(() -> phase.getAndSet(new CountDownLatch(1)).countDown());
266273
}
267274

268275
private long getMaxWaitTimeNanos() {
@@ -381,8 +388,8 @@ private ServerSelector getCompositeServerSelector(final ServerSelector serverSel
381388

382389
protected ClusterableServer createServer(final ServerAddress serverAddress,
383390
final ServerDescriptionChangedListener serverDescriptionChangedListener) {
384-
return serverFactory.create(serverAddress, serverDescriptionChangedListener, createServerListener(serverFactory.getSettings()),
385-
clusterClock);
391+
return serverFactory.create(this, serverAddress, serverDescriptionChangedListener,
392+
createServerListener(serverFactory.getSettings()), clusterClock);
386393
}
387394

388395
private void throwIfIncompatible(final ClusterDescription curDescription) {
@@ -451,26 +458,30 @@ long getRemainingTime() {
451458
}
452459
}
453460

454-
private synchronized void notifyWaitQueueHandler(final ServerSelectionRequest request) {
455-
if (isClosed) {
456-
return;
457-
}
461+
private void notifyWaitQueueHandler(final ServerSelectionRequest request) {
462+
withLock(() -> {
463+
if (isClosed) {
464+
return;
465+
}
458466

459-
waitQueue.add(request);
467+
waitQueue.add(request);
460468

461-
if (waitQueueHandler == null) {
462-
waitQueueHandler = new Thread(new WaitQueueHandler(), "cluster-" + clusterId.getValue());
463-
waitQueueHandler.setDaemon(true);
464-
waitQueueHandler.start();
465-
} else {
466-
updatePhase();
467-
}
469+
if (waitQueueHandler == null) {
470+
waitQueueHandler = new Thread(new WaitQueueHandler(), "cluster-" + clusterId.getValue());
471+
waitQueueHandler.setDaemon(true);
472+
waitQueueHandler.start();
473+
} else {
474+
updatePhase();
475+
}
476+
});
468477
}
469478

470-
private synchronized void stopWaitQueueHandler() {
471-
if (waitQueueHandler != null) {
472-
waitQueueHandler.interrupt();
473-
}
479+
private void stopWaitQueueHandler() {
480+
withLock(() -> {
481+
if (waitQueueHandler != null) {
482+
waitQueueHandler.interrupt();
483+
}
484+
});
474485
}
475486

476487
private final class WaitQueueHandler implements Runnable {

driver-core/src/main/com/mongodb/internal/connection/Cluster.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,11 @@ public interface Cluster extends Closeable {
9292
* @return true if all the servers in this cluster have been closed
9393
*/
9494
boolean isClosed();
95+
96+
/**
97+
* Does the supplied {@code action} while holding a reentrant cluster-wide lock.
98+
*
99+
* @param action The action to {@linkplain Runnable#run() do}.
100+
*/
101+
void withLock(Runnable action);
95102
}

driver-core/src/main/com/mongodb/internal/connection/ClusterableServerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.mongodb.event.ServerListener;
2222

2323
public interface ClusterableServerFactory {
24-
ClusterableServer create(ServerAddress serverAddress, ServerDescriptionChangedListener serverDescriptionChangedListener,
24+
ClusterableServer create(Cluster cluster, ServerAddress serverAddress, ServerDescriptionChangedListener serverDescriptionChangedListener,
2525
ServerListener serverListener, ClusterClock clusterClock);
2626

2727
ServerSettings getSettings();

driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public DefaultClusterableServerFactory(final ClusterId clusterId, final ClusterS
7575
}
7676

7777
@Override
78-
public ClusterableServer create(final ServerAddress serverAddress,
78+
public ClusterableServer create(final Cluster cluster,
79+
final ServerAddress serverAddress,
7980
final ServerDescriptionChangedListener serverDescriptionChangedListener,
8081
final ServerListener serverListener,
8182
final ClusterClock clusterClock) {
@@ -90,7 +91,7 @@ mongoDriverInformation, emptyList(), null, serverApi),
9091
new InternalStreamConnectionFactory(clusterSettings.getMode(), streamFactory, credential, applicationName,
9192
mongoDriverInformation, compressorList, commandListener, serverApi),
9293
connectionPoolSettings, internalConnectionPoolSettings, sdamProvider);
93-
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(serverId, serverDescriptionChangedListener,
94+
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverDescriptionChangedListener,
9495
serverListener, serverMonitor, connectionPool, clusterSettings.getMode());
9596
sdamProvider.initialize(sdam);
9697
serverMonitor.start();

driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
import com.mongodb.event.ServerDescriptionChangedEvent;
2525
import com.mongodb.event.ServerListener;
2626

27-
import java.util.concurrent.locks.Lock;
28-
import java.util.concurrent.locks.ReentrantLock;
29-
3027
import static com.mongodb.assertions.Assertions.assertFalse;
3128
import static com.mongodb.assertions.Assertions.assertNotNull;
3229
import static com.mongodb.assertions.Assertions.assertTrue;
@@ -36,34 +33,34 @@
3633

3734
@ThreadSafe
3835
final class DefaultSdamServerDescriptionManager implements SdamServerDescriptionManager {
36+
private final Cluster cluster;
3937
private final ServerId serverId;
4038
private final ServerDescriptionChangedListener serverDescriptionChangedListener;
4139
private final ServerListener serverListener;
4240
private final ServerMonitor serverMonitor;
4341
private final ConnectionPool connectionPool;
4442
private final ClusterConnectionMode connectionMode;
45-
private final Lock lock;
4643
private volatile ServerDescription description;
4744

48-
DefaultSdamServerDescriptionManager(final ServerId serverId,
45+
DefaultSdamServerDescriptionManager(final Cluster cluster,
46+
final ServerId serverId,
4947
final ServerDescriptionChangedListener serverDescriptionChangedListener,
5048
final ServerListener serverListener, final ServerMonitor serverMonitor,
5149
final ConnectionPool connectionPool,
5250
final ClusterConnectionMode connectionMode) {
51+
this.cluster = cluster;
5352
this.serverId = assertNotNull(serverId);
5453
this.serverDescriptionChangedListener = assertNotNull(serverDescriptionChangedListener);
5554
this.serverListener = assertNotNull(serverListener);
5655
this.serverMonitor = assertNotNull(serverMonitor);
5756
this.connectionPool = assertNotNull(connectionPool);
5857
this.connectionMode = assertNotNull(connectionMode);
5958
description = unknownConnectingServerDescription(serverId, null);
60-
lock = new ReentrantLock();
6159
}
6260

6361
@Override
6462
public void update(final ServerDescription candidateDescription) {
65-
lock.lock();
66-
try {
63+
cluster.withLock(() -> {
6764
if (TopologyVersionHelper.newer(description.getTopologyVersion(), candidateDescription.getTopologyVersion())) {
6865
return;
6966
}
@@ -84,29 +81,17 @@ public void update(final ServerDescription candidateDescription) {
8481
assertFalse(markedPoolReady);
8582
connectionPool.invalidate();
8683
}
87-
} finally {
88-
lock.unlock();
89-
}
84+
});
9085
}
9186

9287
@Override
9388
public void handleExceptionBeforeHandshake(final SdamIssue sdamIssue) {
94-
lock.lock();
95-
try {
96-
handleException(sdamIssue, true);
97-
} finally {
98-
lock.unlock();
99-
}
89+
cluster.withLock(() -> handleException(sdamIssue, true));
10090
}
10191

10292
@Override
10393
public void handleExceptionAfterHandshake(final SdamIssue sdamIssue) {
104-
lock.lock();
105-
try {
106-
handleException(sdamIssue, false);
107-
} finally {
108-
lock.unlock();
109-
}
94+
cluster.withLock(() -> handleException(sdamIssue, false));
11095
}
11196

11297
@Override

driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private void init(final ClusterId clusterId, final ClusterableServerFactory serv
160160
.address(host)
161161
.build()),
162162
settings, serverFactory.getSettings());
163-
server = serverFactory.create(host, event -> { }, createServerListener(serverFactory.getSettings()), clusterClock);
163+
server = serverFactory.create(this, host, event -> { }, createServerListener(serverFactory.getSettings()), clusterClock);
164164

165165
clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description, initialDescription));
166166
}
@@ -283,6 +283,11 @@ public boolean isClosed() {
283283
return closed.get();
284284
}
285285

286+
@Override
287+
public void withLock(final Runnable action) {
288+
fail();
289+
}
290+
286291
private void handleServerSelectionRequest(final ServerSelectionRequest serverSelectionRequest) {
287292
assertTrue(initializationCompleted);
288293
if (srvRecordResolvedToMultipleHosts) {

driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public LoadBalancedClusterableServerFactory(final ClusterId clusterId, final Ser
6969
}
7070

7171
@Override
72-
public ClusterableServer create(final ServerAddress serverAddress,
72+
public ClusterableServer create(final Cluster cluster,
73+
final ServerAddress serverAddress,
7374
final ServerDescriptionChangedListener serverDescriptionChangedListener,
7475
final ServerListener serverListener, final ClusterClock clusterClock) {
7576
ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(clusterId, serverAddress),

0 commit comments

Comments
 (0)