Skip to content

Commit 1b9ae7c

Browse files
author
Rahul Karajgikar
committed
Address comments + minor changes
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent d76ce02 commit 1b9ae7c

File tree

8 files changed

+111
-52
lines changed

8 files changed

+111
-52
lines changed

server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,87 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
172172
assertThat(response.isTimedOut(), is(false));
173173
}
174174

175+
public void testRestartDataNode() throws Exception {
176+
final String indexName = "test";
177+
final Settings nodeSettings = Settings.builder()
178+
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")
179+
.put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s")
180+
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms")
181+
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
182+
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
183+
.build();
184+
// start a 3 node cluster
185+
internalCluster().startNode(nodeSettings);
186+
internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
187+
final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
188+
189+
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
190+
assertThat(response.isTimedOut(), is(false));
191+
192+
client().admin()
193+
.indices()
194+
.prepareCreate(indexName)
195+
.setSettings(
196+
Settings.builder()
197+
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue")
198+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
199+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
200+
)
201+
.get();
202+
203+
Settings redNodeDataPathSettings = internalCluster().dataPathSettings(redNodeName);
204+
logger.info("-> stopping data node");
205+
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(redNodeName));
206+
response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
207+
assertThat(response.isTimedOut(), is(false));
208+
209+
logger.info("-> restarting stopped node");
210+
internalCluster().startNode(Settings.builder().put("node.name", redNodeName).put(redNodeDataPathSettings).build());
211+
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
212+
assertThat(response.isTimedOut(), is(false));
213+
}
214+
215+
public void testRestartCmNode() throws Exception {
216+
final String indexName = "test";
217+
final Settings nodeSettings = Settings.builder()
218+
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")
219+
.put(NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10s")
220+
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "200ms")
221+
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
222+
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
223+
.build();
224+
// start a 3 node cluster
225+
final String cm = internalCluster().startNode(Settings.builder().put("node.attr.color", "yellow").put(nodeSettings).build());
226+
internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
227+
internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
228+
229+
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
230+
assertThat(response.isTimedOut(), is(false));
231+
232+
client().admin()
233+
.indices()
234+
.prepareCreate(indexName)
235+
.setSettings(
236+
Settings.builder()
237+
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue")
238+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
239+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
240+
)
241+
.get();
242+
243+
Settings cmNodeSettings = internalCluster().dataPathSettings(cm);
244+
245+
logger.info("-> stopping cluster-manager node");
246+
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(cm));
247+
response = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
248+
assertThat(response.isTimedOut(), is(false));
249+
250+
logger.info("-> restarting stopped node");
251+
internalCluster().startNode(Settings.builder().put("node.name", cm).put(cmNodeSettings).build());
252+
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
253+
assertThat(response.isTimedOut(), is(false));
254+
}
255+
175256
private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
176257
private final Runnable connectionBreaker;
177258

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,15 +177,16 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
177177
}
178178

179179
// There might be some stale nodes that are in pendingDisconnect set from before but are not connected anymore
180-
// This code block clears the pending disconnect for these nodes to avoid permanently blocking node joins
181-
// This situation should ideally not happen
180+
// So these nodes would not be there in targetsByNode and would not have disconnect() called for them
181+
// This code block clears the pending disconnect for these nodes that don't have entries in targetsByNode
182+
// to avoid permanently blocking node joins
183+
// This situation should ideally not happen, this is just for extra safety
182184
transportService.removePendingDisconnections(
183-
transportService.getPendingDisconnections()
185+
targetsByNode.keySet()
184186
.stream()
185187
.filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode))
186188
.collect(Collectors.toSet())
187189
);
188-
189190
}
190191
runnables.forEach(Runnable::run);
191192
}

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1360,7 +1360,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13601360
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
13611361
// marking pending disconnects before publish
13621362
// if a nodes tries to send a joinRequest while it is pending disconnect, it should fail
1363-
transportService.setPendingDisconnections(clusterChangedEvent.nodesDelta());
1363+
transportService.setPendingDisconnections(new HashSet<>(clusterChangedEvent.nodesDelta().removedNodes()));
13641364
leaderChecker.setCurrentNodes(publishNodes);
13651365
followersChecker.setCurrentNodes(publishNodes);
13661366
lagDetector.setTrackedNodes(publishNodes);

server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class ClusterConnectionManager implements ConnectionManager {
6969
Nodes are marked as pending disconnect right before cluster state publish phase.
7070
They are cleared up as part of cluster state apply commit phase
7171
This is to avoid connections from being made to nodes that are in the process of leaving the cluster
72+
Note: If a disconnect is initiated while a connect is in progress, this Set will not handle this case.
73+
Callers need to ensure that connects and disconnects are sequenced.
7274
*/
7375
private final Set<DiscoveryNode> pendingDisconnections = ConcurrentCollections.newConcurrentSet();
7476
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
@@ -129,7 +131,7 @@ public void connectToNode(
129131
ConnectionValidator connectionValidator,
130132
ActionListener<Void> listener
131133
) throws ConnectTransportException {
132-
logger.trace("[{}]connecting to node [{}]", Thread.currentThread().getName(), node);
134+
logger.trace("connecting to node [{}]", node);
133135
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
134136
if (node == null) {
135137
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
@@ -138,11 +140,7 @@ public void connectToNode(
138140

139141
// if node-left is still in progress, we fail the connect request early
140142
if (pendingDisconnections.contains(node)) {
141-
listener.onFailure(
142-
new IllegalStateException(
143-
"blocked connection to node [" + node + "] because node-left is currently in progress for this node"
144-
)
145-
);
143+
listener.onFailure(new IllegalStateException("cannot make a new connection as disconnect to node [" + node + "] is pending"));
146144
return;
147145
}
148146

@@ -188,6 +186,7 @@ public void connectToNode(
188186
conn.addCloseListener(ActionListener.wrap(() -> {
189187
logger.trace("unregistering {} after connection close and marking as disconnected", node);
190188
connectedNodes.remove(node, finalConnection);
189+
pendingDisconnections.remove(node);
191190
connectionListener.onNodeDisconnected(node, conn);
192191
}));
193192
}
@@ -249,20 +248,15 @@ public void disconnectFromNode(DiscoveryNode node) {
249248
}
250249

251250
@Override
252-
public Set<DiscoveryNode> getPendingDisconnections() {
253-
return pendingDisconnections;
251+
public void setPendingDisconnection(DiscoveryNode node) {
252+
logger.debug("marking disconnection as pending for node: [{}]", node);
253+
pendingDisconnections.add(node);
254254
}
255255

256256
@Override
257-
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
258-
logger.debug("set pending disconnection for nodes: [{}]", nodes);
259-
pendingDisconnections.addAll(nodes);
260-
}
261-
262-
@Override
263-
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
264-
logger.debug("marking disconnection as completed for nodes: [{}]", nodes);
265-
pendingDisconnections.removeAll(nodes);
257+
public void removePendingDisconnection(DiscoveryNode node) {
258+
logger.debug("marking disconnection as completed for node: [{}]", node);
259+
pendingDisconnections.remove(node);
266260
}
267261

268262
/**

server/src/main/java/org/opensearch/transport/ConnectionManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,9 @@ void connectToNode(
6565

6666
void disconnectFromNode(DiscoveryNode node);
6767

68-
Set<DiscoveryNode> getPendingDisconnections();
68+
void setPendingDisconnection(DiscoveryNode node);
6969

70-
void setPendingDisconnections(Set<DiscoveryNode> nodes);
71-
72-
void removePendingDisconnections(Set<DiscoveryNode> nodes);
70+
void removePendingDisconnection(DiscoveryNode node);
7371

7472
Set<DiscoveryNode> getAllConnectedNodes();
7573

server/src/main/java/org/opensearch/transport/RemoteConnectionManager.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,18 +115,13 @@ public void disconnectFromNode(DiscoveryNode node) {
115115
}
116116

117117
@Override
118-
public Set<DiscoveryNode> getPendingDisconnections() {
119-
return delegate.getPendingDisconnections();
118+
public void setPendingDisconnection(DiscoveryNode node) {
119+
delegate.setPendingDisconnection(node);
120120
}
121121

122122
@Override
123-
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
124-
delegate.setPendingDisconnections(nodes);
125-
}
126-
127-
@Override
128-
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
129-
delegate.removePendingDisconnections(nodes);
123+
public void removePendingDisconnection(DiscoveryNode node) {
124+
delegate.removePendingDisconnection(node);
130125
}
131126

132127
@Override

server/src/main/java/org/opensearch/transport/TransportService.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.opensearch.action.support.PlainActionFuture;
4242
import org.opensearch.cluster.ClusterName;
4343
import org.opensearch.cluster.node.DiscoveryNode;
44-
import org.opensearch.cluster.node.DiscoveryNodes;
4544
import org.opensearch.common.Nullable;
4645
import org.opensearch.common.io.stream.Streamables;
4746
import org.opensearch.common.lease.Releasable;
@@ -774,16 +773,12 @@ public void disconnectFromNode(DiscoveryNode node) {
774773
connectionManager.disconnectFromNode(node);
775774
}
776775

777-
public Set<DiscoveryNode> getPendingDisconnections() {
778-
return connectionManager.getPendingDisconnections();
779-
}
780-
781-
public void setPendingDisconnections(DiscoveryNodes.Delta nodesDelta) {
782-
connectionManager.setPendingDisconnections(new HashSet<>(nodesDelta.removedNodes()));
776+
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
777+
nodes.forEach(connectionManager::setPendingDisconnection);
783778
}
784779

785780
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
786-
connectionManager.removePendingDisconnections(nodes);
781+
nodes.forEach(connectionManager::removePendingDisconnection);
787782
}
788783

789784
public void addMessageListener(TransportMessageListener listener) {

test/framework/src/main/java/org/opensearch/test/transport/StubbableConnectionManager.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,13 @@ public void disconnectFromNode(DiscoveryNode node) {
124124
}
125125

126126
@Override
127-
public Set<DiscoveryNode> getPendingDisconnections() {
128-
return delegate.getPendingDisconnections();
127+
public void setPendingDisconnection(DiscoveryNode node) {
128+
delegate.setPendingDisconnection(node);
129129
}
130130

131131
@Override
132-
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
133-
delegate.setPendingDisconnections(nodes);
134-
}
135-
136-
@Override
137-
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
138-
delegate.removePendingDisconnections(nodes);
132+
public void removePendingDisconnection(DiscoveryNode node) {
133+
delegate.removePendingDisconnection(node);
139134
}
140135

141136
@Override

0 commit comments

Comments
 (0)