Skip to content

Commit 9c788fb

Browse files
author
Rahul Karajgikar
committed
cleanup
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 36e473d commit 9c788fb

File tree

10 files changed

+77
-157
lines changed

10 files changed

+77
-157
lines changed

server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java

Lines changed: 20 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,8 @@
3333
package org.opensearch.cluster.coordination;
3434

3535
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
36-
import org.opensearch.cluster.ClusterChangedEvent;
37-
import org.opensearch.cluster.ClusterStateApplier;
3836
import org.opensearch.cluster.NodeConnectionsService;
3937
import org.opensearch.cluster.metadata.IndexMetadata;
40-
import org.opensearch.cluster.node.DiscoveryNode;
4138
import org.opensearch.cluster.service.ClusterService;
4239
import org.opensearch.common.settings.Settings;
4340
import org.opensearch.index.MockEngineFactoryPlugin;
@@ -51,9 +48,7 @@
5148
import org.opensearch.test.store.MockFSIndexStore;
5249
import org.opensearch.test.transport.MockTransportService;
5350
import org.opensearch.test.transport.StubbableTransport;
54-
import org.opensearch.transport.Transport;
5551
import org.opensearch.transport.TransportChannel;
56-
import org.opensearch.transport.TransportConnectionListener;
5752
import org.opensearch.transport.TransportRequest;
5853
import org.opensearch.transport.TransportRequestHandler;
5954
import org.opensearch.transport.TransportService;
@@ -104,22 +99,14 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
10499
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
105100
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
106101
.build();
107-
// start a cluster-manager node
102+
// start a 3 node cluster with 1 cluster-manager
108103
final String cm = internalCluster().startNode(nodeSettings);
109-
110-
logger.info("--> spawning node t1");
111-
final String blueNodeName = internalCluster().startNode(
112-
Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()
113-
);
114-
logger.info("--> spawning node t2");
104+
internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
115105
final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
116106

117-
logger.info("--> initial health check");
118107
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
119108
assertThat(response.isTimedOut(), is(false));
120-
logger.info("--> done initial health check");
121109

122-
logger.info("--> creating index");
123110
client().admin()
124111
.indices()
125112
.prepareCreate(indexName)
@@ -130,63 +117,22 @@ public void testTransientErrorsDuringRecovery1AreRetried() throws Exception {
130117
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
131118
)
132119
.get();
133-
logger.info("--> done creating index");
134-
MockTransportService cmTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
135-
MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance(
136-
TransportService.class,
137-
redNodeName
138-
);
139120

140121
ClusterService cmClsService = internalCluster().getInstance(ClusterService.class, cm);
141-
// simulate a slow applier on the cm
142-
cmClsService.addStateApplier(new ClusterStateApplier() {
143-
@Override
144-
public void applyClusterState(ClusterChangedEvent event) {
145-
if (event.nodesRemoved()) {
146-
try {
147-
Thread.sleep(3000);
148-
} catch (InterruptedException e) {
149-
throw new RuntimeException(e);
150-
}
122+
// Simulate a slow applier on the cm to delay node-left state application
123+
cmClsService.addStateApplier(event -> {
124+
if (event.nodesRemoved()) {
125+
try {
126+
Thread.sleep(3000);
127+
} catch (InterruptedException e) {
128+
throw new RuntimeException(e);
151129
}
152130
}
153131
});
154-
cmTransportService.connectionManager().addListener(new TransportConnectionListener() {
155-
156-
@Override
157-
public void onConnectionOpened(Transport.Connection connection) {
158-
// try {
159-
// Thread.sleep(500);
160-
// } catch (InterruptedException e) {
161-
// throw new RuntimeException(e);
162-
// }
163-
164-
}
165-
166-
@Override
167-
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
168-
// if (node.getName().equals("node_t2")) {
169-
// try {
170-
// Thread.sleep(250);
171-
// } catch (InterruptedException e) {
172-
// throw new RuntimeException(e);
173-
// }
174-
// }
175-
}
176-
177-
// @Override
178-
// public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
179-
// try {
180-
// Thread.sleep(5000);
181-
// } catch (InterruptedException e) {
182-
// throw new RuntimeException(e);
183-
// }
184-
// }
185-
});
186132
AtomicBoolean bb = new AtomicBoolean();
187-
// simulate followerchecker failure
188133

189-
ConnectionDelay handlingBehavior = new ConnectionDelay(FOLLOWER_CHECK_ACTION_NAME, () -> {
134+
// Simulate followerchecker failure on 1 node when bb is false
135+
ConnectionDelay handlingBehavior = new ConnectionDelay(() -> {
190136
if (bb.get()) {
191137
return;
192138
}
@@ -197,55 +143,39 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
197143
}
198144
throw new NodeHealthCheckFailureException("non writable exception");
199145
});
146+
MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance(
147+
TransportService.class,
148+
redNodeName
149+
);
200150
redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, handlingBehavior);
201151

152+
// Loop runs 10 times to ensure race condition gets reproduced
202153
for (int i = 0; i < 10; i++) {
203-
bb.set(false); // fail followerchecker by force to trigger node disconnect
204-
logger.info("--> disconnecting from red node, iteration: " + i);
205-
// cmTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
154+
bb.set(false);
155+
// fail followerchecker by force to trigger node disconnect
206156
// now followerchecker should fail and trigger node left
207-
logger.info("--> checking cluster health 2 nodes, iteration: " + i);
208157
ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
209158
assertThat(response1.isTimedOut(), is(false));
210-
logger.info("--> completed checking cluster health 2 nodes, iteration: " + i);
211159

212160
// once we know a node has left, we can re-enable followerchecker to work normally
213161
bb.set(true);
214-
Thread.sleep(1500);
215-
logger.info("--> checking cluster health 3 nodes, iteration: " + i);
216162
ClusterHealthResponse response2 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
217163
assertThat(response2.isTimedOut(), is(false));
218-
logger.info("--> completed checking cluster health 3 nodes, iteration: " + i);
219-
220-
Thread.sleep(1500);
221164

222-
// Checking again
223-
logger.info("--> checking cluster health 3 nodes again, iteration: " + i);
165+
// Checking again to validate stability
224166
ClusterHealthResponse response3 = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
225167
assertThat(response3.isTimedOut(), is(false));
226-
logger.info("--> completed checking cluster health 3 nodes again, iteration: " + i);
227168
}
228169

229170
bb.set(true);
230-
logger.info("-->first validation outside loop");
231-
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
232-
assertThat(response.isTimedOut(), is(false));
233-
234-
logger.info("-->sleeping for 20s");
235-
Thread.sleep(20000);
236-
237-
logger.info("-->second validation outside loop after sleep");
238171
response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
239172
assertThat(response.isTimedOut(), is(false));
240173
}
241174

242175
private class ConnectionDelay implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
243-
244-
private final String actionName;
245176
private final Runnable connectionBreaker;
246177

247-
private ConnectionDelay(String actionName, Runnable connectionBreaker) {
248-
this.actionName = actionName;
178+
private ConnectionDelay(Runnable connectionBreaker) {
249179
this.connectionBreaker = connectionBreaker;
250180
}
251181

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
171171
for (final DiscoveryNode discoveryNode : discoveryNodes) {
172172
nodesToDisconnect.remove(discoveryNode);
173173
}
174-
logger.info(" targetsByNode is {}", targetsByNode.keySet());
175-
logger.info(" nodes to disconnect set is [{}]", nodesToDisconnect);
176174

177175
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
178176
runnables.add(targetsByNode.get(discoveryNode).disconnect());
@@ -181,7 +179,7 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
181179
// There might be some stale nodes that are in pendingDisconnect set from before but are not connected anymore
182180
// This code block clears the pending disconnect for these nodes to avoid permanently blocking node joins
183181
// This situation should ideally not happen
184-
transportService.markDisconnectAsCompleted(
182+
transportService.removePendingDisconnections(
185183
transportService.getPendingDisconnections()
186184
.stream()
187185
.filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode))

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,6 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
371371
} else if (joinHelper.isJoinPending()) {
372372
logger.trace("onFollowerCheckRequest: rejoining cluster-manager, responding successfully to {}", followerCheckRequest);
373373
} else {
374-
logger.info("Mode: {}, ", mode);
375374
logger.trace("onFollowerCheckRequest: received check from faulty cluster-manager, rejecting {}", followerCheckRequest);
376375
throw new CoordinationStateRejectedException(
377376
"onFollowerCheckRequest: received check from faulty cluster-manager, rejecting " + followerCheckRequest
@@ -1361,7 +1360,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13611360
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
13621361
// marking pending disconnects before publish
13631362
// if we try to joinRequest during pending disconnect, it should fail
1364-
transportService.markPendingDisconnects(clusterChangedEvent.nodesDelta());
1363+
transportService.setPendingDisconnections(clusterChangedEvent.nodesDelta());
13651364
leaderChecker.setCurrentNodes(publishNodes);
13661365
followersChecker.setCurrentNodes(publishNodes);
13671366
lagDetector.setTrackedNodes(publishNodes);
@@ -1466,7 +1465,6 @@ private class CoordinatorPeerFinder extends PeerFinder {
14661465
protected void onActiveClusterManagerFound(DiscoveryNode clusterManagerNode, long term) {
14671466
synchronized (mutex) {
14681467
ensureTermAtLeast(clusterManagerNode, term);
1469-
logger.info("sending join request to {}", clusterManagerNode);
14701468
joinHelper.sendJoinRequest(clusterManagerNode, getCurrentTerm(), joinWithDestination(lastJoin, clusterManagerNode, term));
14711469
}
14721470
}

server/src/main/java/org/opensearch/cluster/coordination/Publication.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS
8585
}
8686

8787
public void start(Set<DiscoveryNode> faultyNodes) {
88-
logger.trace("publishing {} to {}", publishRequest, publicationTargets);
89-
logger.info("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
88+
logger.debug("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
9089

9190
for (final DiscoveryNode faultyNode : faultyNodes) {
9291
onFaultyNode(faultyNode);

server/src/main/java/org/opensearch/cluster/service/MasterService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ private void handleException(String summary, long startTimeMillis, ClusterState
464464
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
465465
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary);
466466
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
467-
logger.debug("in cluster compute, finished computing new cluster state for version: {}", newClusterState.getVersion());
468467
return new TaskOutputs(
469468
taskInputs,
470469
previousClusterState,

server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
import java.util.Collections;
4747
import java.util.Iterator;
48-
import java.util.List;
4948
import java.util.Map;
5049
import java.util.Set;
5150
import java.util.concurrent.ConcurrentMap;
@@ -118,23 +117,6 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
118117
internalOpenConnection(node, resolvedProfile, listener);
119118
}
120119

121-
@Override
122-
public void markPendingDisconnects(List<DiscoveryNode> nodes) {
123-
logger.info("marking pending disconnects for nodes: [{}]", nodes);
124-
pendingDisconnections.addAll(nodes);
125-
}
126-
127-
@Override
128-
public Set<DiscoveryNode> getPendingDisconnections() {
129-
return pendingDisconnections;
130-
}
131-
132-
@Override
133-
public void markDisconnectAsCompleted(Set<DiscoveryNode> nodes) {
134-
logger.debug("marking disconnect as completed for nodes: [{}]", nodes);
135-
pendingDisconnections.removeAll(nodes);
136-
}
137-
138120
/**
139121
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
140122
* Once a successful is established, it can be validated before being exposed.
@@ -147,7 +129,7 @@ public void connectToNode(
147129
ConnectionValidator connectionValidator,
148130
ActionListener<Void> listener
149131
) throws ConnectTransportException {
150-
logger.info("[{}]connecting to node [{}]", Thread.currentThread().getName(), node);
132+
logger.trace("[{}]connecting to node [{}]", Thread.currentThread().getName(), node);
151133
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
152134
if (node == null) {
153135
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
@@ -195,16 +177,16 @@ public void connectToNode(
195177
assert Transports.assertNotTransportThread("connection validator success");
196178
try {
197179
if (connectedNodes.putIfAbsent(node, conn) != null) {
198-
logger.info("existing connection to node [{}], closing new redundant connection", node);
180+
logger.debug("existing connection to node [{}], closing new redundant connection", node);
199181
IOUtils.closeWhileHandlingException(conn);
200182
} else {
201-
logger.info("connected to node [{}]", node);
183+
logger.debug("connected to node [{}]", node);
202184
try {
203185
connectionListener.onNodeConnected(node, conn);
204186
} finally {
205187
final Transport.Connection finalConnection = conn;
206188
conn.addCloseListener(ActionListener.wrap(() -> {
207-
logger.info("unregistering {} after connection close and marking as disconnected", node);
189+
logger.trace("unregistering {} after connection close and marking as disconnected", node);
208190
connectedNodes.remove(node, finalConnection);
209191
connectionListener.onNodeDisconnected(node, conn);
210192
}));
@@ -263,7 +245,24 @@ public void disconnectFromNode(DiscoveryNode node) {
263245
nodeChannels.close();
264246
}
265247
pendingDisconnections.remove(node);
266-
logger.info("Removed node {} from pending disconnects list", node);
248+
logger.debug("Removed node [{}] from pending disconnections list", node);
249+
}
250+
251+
@Override
252+
public Set<DiscoveryNode> getPendingDisconnections() {
253+
return pendingDisconnections;
254+
}
255+
256+
@Override
257+
public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
258+
logger.debug("set pending disconnection for nodes: [{}]", nodes);
259+
pendingDisconnections.addAll(nodes);
260+
}
261+
262+
@Override
263+
public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
264+
logger.debug("marking disconnection as completed for nodes: [{}]", nodes);
265+
pendingDisconnections.removeAll(nodes);
267266
}
268267

269268
/**
@@ -317,7 +316,6 @@ private void internalOpenConnection(
317316
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
318317
}
319318
if (connection.isClosed()) {
320-
logger.info("channel closed while connecting, throwing exception");
321319
throw new ConnectTransportException(node, "a channel closed while connecting");
322320
}
323321
return connection;

server/src/main/java/org/opensearch/transport/ConnectionManager.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.opensearch.core.action.ActionListener;
3737

3838
import java.io.Closeable;
39-
import java.util.List;
4039
import java.util.Set;
4140
import java.util.concurrent.CopyOnWriteArrayList;
4241

@@ -53,10 +52,6 @@ public interface ConnectionManager extends Closeable {
5352

5453
void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener);
5554

56-
Set<DiscoveryNode> getPendingDisconnections();
57-
58-
void markDisconnectAsCompleted(Set<DiscoveryNode> nodes);
59-
6055
void connectToNode(
6156
DiscoveryNode node,
6257
ConnectionProfile connectionProfile,
@@ -68,10 +63,14 @@ void connectToNode(
6863

6964
boolean nodeConnected(DiscoveryNode node);
7065

71-
void markPendingDisconnects(List<DiscoveryNode> nodes);
72-
7366
void disconnectFromNode(DiscoveryNode node);
7467

68+
Set<DiscoveryNode> getPendingDisconnections();
69+
70+
void setPendingDisconnections(Set<DiscoveryNode> nodes);
71+
72+
void removePendingDisconnections(Set<DiscoveryNode> nodes);
73+
7574
Set<DiscoveryNode> getAllConnectedNodes();
7675

7776
int size();

0 commit comments

Comments
 (0)