Skip to content

Commit 4e43b8f

Browse files
author
Rahul Karajgikar
committed
Move all logic to transportService/ClusterConnectionManager as per comments
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 0009018 commit 4e43b8f

File tree

10 files changed

+37
-79
lines changed

10 files changed

+37
-79
lines changed

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
172172
}
173173

174174
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
175-
logger.info("NodeConnectionsService - disconnecting from node [{}] in loop", discoveryNode);
176175
runnables.add(targetsByNode.get(discoveryNode).disconnect());
177176
}
178177
}
@@ -390,13 +389,8 @@ public String toString() {
390389
protected void doRun() {
391390
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
392391
transportService.disconnectFromNode(discoveryNode);
393-
transportService.markPendingLeftAsCompleted(discoveryNode);
394392
consecutiveFailureCount.set(0);
395-
logger.debug(
396-
"disconnected from {} and marked pending left as completed. " + "pending lefts: [{}]",
397-
discoveryNode,
398-
transportService.getNodesLeftInProgress()
399-
);
393+
logger.debug("disconnected from {}", discoveryNode);
400394
onCompletion(ActivityType.DISCONNECTING, null, connectActivity);
401395
}
402396

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -630,16 +630,6 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
630630
);
631631
return;
632632
}
633-
// if node-left is still in progress, we fail the joinRequest early
634-
if (transportService.getNodesLeftInProgress().contains(joinRequest.getSourceNode())) {
635-
joinCallback.onFailure(
636-
new IllegalStateException(
637-
"cannot join node [" + joinRequest.getSourceNode() + "] because node-left is currently in progress for this node"
638-
639-
)
640-
);
641-
return;
642-
}
643633

644634
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
645635
final ClusterState stateForJoinValidation = getStateForClusterManagerService();
@@ -1369,13 +1359,13 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13691359
currentPublication = Optional.of(publication);
13701360

13711361
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
1362+
// marking pending disconnects before publish
1363+
// if we try to joinRequest during pending disconnect, it should fail
1364+
transportService.markPendingDisconnects(clusterChangedEvent.nodesDelta());
13721365
leaderChecker.setCurrentNodes(publishNodes);
13731366
followersChecker.setCurrentNodes(publishNodes);
13741367
lagDetector.setTrackedNodes(publishNodes);
13751368
coordinationState.get().handlePrePublish(clusterState);
1376-
// trying to mark pending disconnects before publish
1377-
// if we try to joinRequest during pending disconnect, it should fail
1378-
transportService.markPendingConnections(clusterChangedEvent.nodesDelta());
13791369
publication.start(followersChecker.getFaultyNodes());
13801370
}
13811371
} catch (Exception e) {

server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ public FollowersChecker(
161161
transportService.addConnectionListener(new TransportConnectionListener() {
162162
@Override
163163
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
164-
logger.info("in transport listener onNodeDisconnected");
165164
handleDisconnectedNode(node);
166165
}
167166
});
@@ -391,7 +390,6 @@ public void handleException(TransportException exp) {
391390
failureCountSinceLastSuccess++;
392391

393392
final String reason;
394-
395393
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
396394
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
397395
reason = "disconnected";

server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ private void handleLeaderCheck(LeaderCheckRequest request) {
231231
"rejecting leader check since [" + request.getSender() + "] has been removed from the cluster"
232232
);
233233
} else {
234-
logger.debug("handling {}", request);
234+
logger.trace("handling {}", request);
235235
}
236236
}
237237

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
580580

581581
logger.debug("apply cluster state with version {}", newClusterState.version());
582582
callClusterStateAppliers(clusterChangedEvent, stopWatch);
583+
logger.debug("completed calling appliers of cluster state for version {}", newClusterState.version());
583584

584585
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
585586

@@ -592,11 +593,11 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
592593
+ " on "
593594
+ newClusterState.nodes().getLocalNode();
594595

595-
logger.info("set locally applied cluster state to version {}", newClusterState.version());
596+
logger.debug("set locally applied cluster state to version {}", newClusterState.version());
596597
state.set(newClusterState);
597598

598599
callClusterStateListeners(clusterChangedEvent, stopWatch);
599-
logger.info("completed appliers and listeners of cluster state for version {}", newClusterState.version());
600+
logger.debug("completed calling listeners of cluster state for version {}", newClusterState.version());
600601
}
601602

602603
protected void connectToNodesAndWait(ClusterState newClusterState) {

server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ public class ClusterConnectionManager implements ConnectionManager {
6565

6666
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
6767
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
68-
private final Set<DiscoveryNode> pendingLeft = ConcurrentCollections.newConcurrentSet();
68+
/**
69+
This set is used only by cluster-manager nodes.
70+
Nodes are marked as pending disconnect right before cluster state publish phase.
71+
They are cleared up as part of cluster state apply commit phase
72+
This is to avoid connections from being made to nodes that are in the process of leaving the cluster
73+
*/
74+
private final Set<DiscoveryNode> pendingDisconnections = ConcurrentCollections.newConcurrentSet();
6975
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
7076
@Override
7177
protected void closeInternal() {
@@ -113,19 +119,9 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
113119
}
114120

115121
@Override
116-
public Set<DiscoveryNode> getNodesLeftInProgress() {
117-
return this.pendingLeft;
118-
}
119-
120-
@Override
121-
public void markPendingLefts(List<DiscoveryNode> nodes) {
122-
logger.info("marking pending left for nodes: [{}]", nodes);
123-
pendingLeft.addAll(nodes);
124-
}
125-
126-
@Override
127-
public boolean markPendingLeftCompleted(DiscoveryNode discoveryNode) {
128-
return pendingLeft.remove(discoveryNode);
122+
public void markPendingDisconnects(List<DiscoveryNode> nodes) {
123+
logger.info("marking pending disconnects for nodes: [{}]", nodes);
124+
pendingDisconnections.addAll(nodes);
129125
}
130126

131127
/**
@@ -147,6 +143,16 @@ public void connectToNode(
147143
return;
148144
}
149145

146+
// if node-left is still in progress, we fail the connect request early
147+
if (pendingDisconnections.contains(node)) {
148+
listener.onFailure(
149+
new IllegalStateException(
150+
"blocked connection to node [" + node + "] because node-left is currently in progress for this node"
151+
)
152+
);
153+
return;
154+
}
155+
150156
if (connectingRefCounter.tryIncRef() == false) {
151157
listener.onFailure(new IllegalStateException("connection manager is closed"));
152158
return;
@@ -246,6 +252,8 @@ public void disconnectFromNode(DiscoveryNode node) {
246252
// if we found it and removed it we close
247253
nodeChannels.close();
248254
}
255+
pendingDisconnections.remove(node);
256+
logger.info("Removed node {} from pending disconnects list", node);
249257
}
250258

251259
/**

server/src/main/java/org/opensearch/transport/ConnectionManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,7 @@ void connectToNode(
6464

6565
boolean nodeConnected(DiscoveryNode node);
6666

67-
Set<DiscoveryNode> getNodesLeftInProgress();
68-
69-
boolean markPendingLeftCompleted(DiscoveryNode node);
70-
71-
void markPendingLefts(List<DiscoveryNode> nodes);
67+
void markPendingDisconnects(List<DiscoveryNode> nodes);
7268

7369
void disconnectFromNode(DiscoveryNode node);
7470

server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,8 @@ public ConnectionProfile getConnectionProfile() {
120120
}
121121

122122
@Override
123-
public Set<DiscoveryNode> getNodesLeftInProgress() {
124-
throw new UnsupportedOperationException("not implemented");
125-
}
126-
127-
@Override
128-
public boolean markPendingLeftCompleted(DiscoveryNode node) {
129-
throw new UnsupportedOperationException("not implemented");
130-
}
131-
132-
@Override
133-
public void markPendingLefts(List<DiscoveryNode> nodes) {
134-
throw new UnsupportedOperationException("not implemented");
123+
public void markPendingDisconnects(List<DiscoveryNode> nodes) {
124+
delegate.markPendingDisconnects(nodes);
135125
}
136126

137127
public Transport.Connection getAnyRemoteConnection() {

server/src/main/java/org/opensearch/transport/TransportService.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -483,18 +483,9 @@ public void connectToNode(DiscoveryNode node, ActionListener<Void> listener) thr
483483
connectToNode(node, null, listener);
484484
}
485485

486-
public Set<DiscoveryNode> getNodesLeftInProgress() {
487-
return connectionManager.getNodesLeftInProgress();
488-
}
489-
490-
// this
491-
public void markPendingConnections(DiscoveryNodes.Delta nodesDelta) {
492-
// connectionManager.markPendingJoins(nodesDelta.addedNodes());
493-
connectionManager.markPendingLefts(nodesDelta.removedNodes());
494-
}
495486

496-
public boolean markPendingLeftAsCompleted(DiscoveryNode node) {
497-
return connectionManager.markPendingLeftCompleted(node);
487+
public void markPendingDisconnects(DiscoveryNodes.Delta nodesDelta) {
488+
connectionManager.markPendingDisconnects(nodesDelta.removedNodes());
498489
}
499490

500491
public void connectToExtensionNode(DiscoveryNode node, ActionListener<Void> listener) throws ConnectTransportException {

test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,8 @@ public void connectToNode(
120120
}
121121

122122
@Override
123-
public Set<DiscoveryNode> getNodesLeftInProgress() {
124-
return delegate.getNodesLeftInProgress();
125-
}
126-
127-
@Override
128-
public boolean markPendingLeftCompleted(DiscoveryNode node) {
129-
return delegate.markPendingLeftCompleted(node);
130-
}
131-
132-
@Override
133-
public void markPendingLefts(List<DiscoveryNode> nodes) {
134-
delegate.markPendingLefts(nodes);
123+
public void markPendingDisconnects(List<DiscoveryNode> nodes) {
124+
delegate.markPendingDisconnects(nodes);
135125
}
136126

137127
@Override

0 commit comments

Comments
 (0)