Skip to content

Commit 0009018

Browse files
author
Rahul Karajgikar
committed
cleanup unused code and remove added log lines
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 551f30b commit 0009018

File tree

15 files changed

+43
-169
lines changed

15 files changed

+43
-169
lines changed

server/src/main/java/org/opensearch/cluster/NodeConnectionsService.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,8 @@
5252
import org.opensearch.common.util.concurrent.AbstractRunnable;
5353
import org.opensearch.core.action.ActionListener;
5454
import org.opensearch.threadpool.ThreadPool;
55-
import org.opensearch.transport.ConnectTransportException;
5655
import org.opensearch.transport.TransportService;
5756

58-
import java.io.IOException;
5957
import java.util.ArrayList;
6058
import java.util.Collection;
6159
import java.util.HashMap;
@@ -181,12 +179,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
181179
runnables.forEach(Runnable::run);
182180
}
183181

184-
public void markPendingJoinsAsComplete(List<DiscoveryNode> nodesConnected) {
185-
for (final DiscoveryNode discoveryNode : nodesConnected) {
186-
transportService.markPendingJoinAsCompleted(discoveryNode);
187-
}
188-
}
189-
190182
void ensureConnections(Runnable onCompletion) {
191183
// Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have
192184
// been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection.
@@ -343,30 +335,20 @@ private class ConnectionTarget {
343335
final AbstractRunnable abstractRunnable = this;
344336

345337
@Override
346-
protected void doRun() throws IOException {
338+
protected void doRun() {
347339
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
348-
// if we are trying a connect activity while a node left is in progress, fail
349-
if (transportService.getNodesLeftInProgress().contains(discoveryNode)) {
350-
throw new ConnectTransportException(discoveryNode, "failed to connect while node-left in progress");
351-
}
352340
if (transportService.nodeConnected(discoveryNode)) {
353341
// transportService.connectToNode is a no-op if already connected, but we don't want any DEBUG logging in this case
354342
// since we run this for every node on every cluster state update.
355343
logger.trace("still connected to {}", discoveryNode);
356344
onConnected();
357345
} else {
358-
logger.info("connecting to {}", discoveryNode);
346+
logger.debug("connecting to {}", discoveryNode);
359347
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
360348
@Override
361349
public void onResponse(Void aVoid) {
362350
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
363-
logger.info("connected to {}", discoveryNode);
364-
transportService.markPendingJoinAsCompleted(discoveryNode);
365-
logger.info(
366-
"marked pending join for {} as completed. new list of nodes pending join: {}",
367-
discoveryNode,
368-
transportService.getNodesJoinInProgress()
369-
);
351+
logger.debug("connected to {}", discoveryNode);
370352
onConnected();
371353
}
372354

@@ -407,11 +389,10 @@ public String toString() {
407389
@Override
408390
protected void doRun() {
409391
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
410-
logger.info("disconnecting from {}", discoveryNode);
411392
transportService.disconnectFromNode(discoveryNode);
412393
transportService.markPendingLeftAsCompleted(discoveryNode);
413394
consecutiveFailureCount.set(0);
414-
logger.info(
395+
logger.debug(
415396
"disconnected from {} and marked pending left as completed. " + "pending lefts: [{}]",
416397
discoveryNode,
417398
transportService.getNodesLeftInProgress()
@@ -444,7 +425,6 @@ Runnable connect(@Nullable ActionListener<Void> listener) {
444425
}
445426

446427
Runnable disconnect() {
447-
logger.info("running runnable disconnect");
448428
return addListenerAndStartActivity(
449429
null,
450430
ActivityType.DISCONNECTING,

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
417417

418418
synchronized (mutex) {
419419
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getClusterManagerNode();
420-
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
421-
logger.info(
420+
logger.debug(
422421
"handlePublishRequest: handling version [{}] from [{}]",
423422
publishRequest.getAcceptedState().getVersion(),
424423
sourceNode
@@ -631,7 +630,7 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
631630
);
632631
return;
633632
}
634-
// add a check here, if node-left is still in progress, we fail the connection
633+
// if node-left is still in progress, we fail the joinRequest early
635634
if (transportService.getNodesLeftInProgress().contains(joinRequest.getSourceNode())) {
636635
joinCallback.onFailure(
637636
new IllegalStateException(
@@ -642,7 +641,6 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
642641
return;
643642
}
644643

645-
// cluster manager connects to the node
646644
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
647645
final ClusterState stateForJoinValidation = getStateForClusterManagerService();
648646
if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
@@ -781,7 +779,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
781779
if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) {
782780
logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", method, leaderNode, getCurrentTerm());
783781
} else {
784-
logger.info(
782+
logger.debug(
785783
"{}: coordinator becoming FOLLOWER of [{}] in term {} (was {}, lastKnownLeader was [{}])",
786784
method,
787785
leaderNode,
@@ -928,7 +926,7 @@ public DiscoveryStats stats() {
928926
@Override
929927
public void startInitialJoin() {
930928
synchronized (mutex) {
931-
logger.info("Starting initial join, becoming candidate");
929+
logger.trace("Starting initial join, becoming candidate");
932930
becomeCandidate("startInitialJoin");
933931
}
934932
clusterBootstrapService.scheduleUnconfiguredBootstrap();
@@ -1371,13 +1369,12 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13711369
currentPublication = Optional.of(publication);
13721370

13731371
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
1374-
13751372
leaderChecker.setCurrentNodes(publishNodes);
13761373
followersChecker.setCurrentNodes(publishNodes);
13771374
lagDetector.setTrackedNodes(publishNodes);
13781375
coordinationState.get().handlePrePublish(clusterState);
1379-
// trying to mark pending connects/disconnects before publish
1380-
// if we try to connect during pending disconnect or vice versa - fail
1376+
// trying to mark pending disconnects before publish
1377+
// if we try to joinRequest during pending disconnect, it should fail
13811378
transportService.markPendingConnections(clusterChangedEvent.nodesDelta());
13821379
publication.start(followersChecker.getFaultyNodes());
13831380
}

server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
176176
* Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones.
177177
*/
178178
public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
179-
logger.info("[{}]Setting followerschecker currentnodes to {}", Thread.currentThread().getName(), discoveryNodes);
180179
synchronized (mutex) {
181180
final Predicate<DiscoveryNode> isUnknownNode = n -> discoveryNodes.nodeExists(n) == false;
182181
followerCheckers.keySet().removeIf(isUnknownNode);
@@ -358,9 +357,6 @@ private void handleWakeUp() {
358357

359358
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
360359
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
361-
if (discoveryNode.getName().equals("node_t2")) {
362-
logger.info("handleWakeUp: checking {} with {}", discoveryNode, request);
363-
}
364360

365361
transportService.sendRequest(
366362
discoveryNode,
@@ -396,19 +392,6 @@ public void handleException(TransportException exp) {
396392

397393
final String reason;
398394

399-
// if (exp instanceof NodeNotConnectedException || exp.getCause() instanceof NodeNotConnectedException){
400-
// // NodeNotConnectedException will only happen if getConnection fails in TransportService.sendRequest
401-
// // This only happens if clusterConnectionManager.getConnection() does not find the entry in connectedNodes list
402-
// // This happens on node disconnection
403-
// // Need to validate that this only gets triggered from node-left side. we want to ensure actual disconnections
404-
// work
405-
// failureCountSinceLastSuccess--;
406-
// logger.info(() -> new ParameterizedMessage("{} cache entry not found, but node is still in cluster state.
407-
// ignoring this failure", FollowerChecker.this), exp);
408-
// scheduleNextWakeUp();
409-
// return;
410-
// }
411-
412395
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
413396
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
414397
reason = "disconnected";

server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
379379
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
380380
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
381381
if (pendingOutgoingJoins.add(dedupKey)) {
382-
logger.info("attempting to join {} with {}", destination, joinRequest);
382+
logger.debug("attempting to join {} with {}", destination, joinRequest);
383383
transportService.sendRequest(
384384
destination,
385385
JOIN_ACTION_NAME,
@@ -394,7 +394,7 @@ public Empty read(StreamInput in) {
394394
@Override
395395
public void handleResponse(Empty response) {
396396
pendingOutgoingJoins.remove(dedupKey);
397-
logger.info("successfully joined {} with {}", destination, joinRequest);
397+
logger.debug("successfully joined {} with {}", destination, joinRequest);
398398
lastFailedJoinAttempt.set(null);
399399
nodeCommissioned.accept(true);
400400
onCompletion.run();

server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,20 +218,20 @@ private void handleLeaderCheck(LeaderCheckRequest request) {
218218
+ "since node is unhealthy ["
219219
+ statusInfo.getInfo()
220220
+ "]";
221-
logger.info(message);
221+
logger.debug(message);
222222
throw new NodeHealthCheckFailureException(message);
223223
} else if (discoveryNodes.isLocalNodeElectedClusterManager() == false) {
224-
logger.info("rejecting leader check on non-cluster-manager {}", request);
224+
logger.debug("rejecting leader check on non-cluster-manager {}", request);
225225
throw new CoordinationStateRejectedException(
226226
"rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the cluster-manager"
227227
);
228228
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
229-
logger.info("rejecting leader check from removed node: {}", request);
229+
logger.debug("rejecting leader check from removed node: {}", request);
230230
throw new CoordinationStateRejectedException(
231231
"rejecting leader check since [" + request.getSender() + "] has been removed from the cluster"
232232
);
233233
} else {
234-
logger.info("handling {}", request);
234+
logger.debug("handling {}", request);
235235
}
236236
}
237237

@@ -306,17 +306,17 @@ public void handleException(TransportException exp) {
306306
return;
307307
}
308308
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
309-
logger.info(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
309+
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
310310
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
311311
return;
312312
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
313-
logger.info(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
313+
logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
314314
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
315315
return;
316316
}
317317
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
318318
if (failureCount >= leaderCheckRetryCount) {
319-
logger.info(
319+
logger.debug(
320320
new ParameterizedMessage(
321321
"leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:",
322322
leader,

server/src/main/java/org/opensearch/cluster/coordination/Publication.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,9 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS
8686

8787
public void start(Set<DiscoveryNode> faultyNodes) {
8888
logger.trace("publishing {} to {}", publishRequest, publicationTargets);
89-
logger.info("publishing {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
89+
logger.info("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
9090

9191
for (final DiscoveryNode faultyNode : faultyNodes) {
92-
logger.info("in publish.start, found faulty node: [{}]", faultyNode);
9392
onFaultyNode(faultyNode);
9493
}
9594
onPossibleCommitFailure();
@@ -334,7 +333,7 @@ void setFailed(Exception e) {
334333

335334
void onFaultyNode(DiscoveryNode faultyNode) {
336335
if (isActive() && discoveryNode.equals(faultyNode)) {
337-
logger.info("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this);
336+
logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this);
338337
setFailed(new OpenSearchException("faulty node"));
339338
onPossibleCommitFailure();
340339
}

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,21 @@ public void onFailure(Exception e) {
499499
} else {
500500
responseActionListener = listener;
501501
}
502+
<<<<<<< HEAD
502503
sendClusterState(destination, responseActionListener);
504+
=======
505+
// TODO Decide to send remote state before starting publication by checking remote publication on all nodes
506+
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
507+
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
508+
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
509+
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
510+
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
511+
sendFullClusterState(destination, responseActionListener);
512+
} else {
513+
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
514+
sendClusterStateDiff(destination, responseActionListener);
515+
}
516+
>>>>>>> f0cf40ce03f (cleanup unused code and remove added log lines)
503517
}
504518

505519
public void sendApplyCommit(
@@ -590,7 +604,7 @@ private void sendClusterState(
590604
logger.debug("resending full cluster state to node {} reason {}", destination, exp.getDetailedMessage());
591605
sendFullClusterState(destination, listener);
592606
} else {
593-
logger.info(() -> new ParameterizedMessage("failed to send cluster state to {}", destination), exp);
607+
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", destination), exp);
594608
listener.onFailure(exp);
595609
}
596610
};

0 commit comments

Comments
 (0)