35
35
import org .apache .logging .log4j .Level ;
36
36
import org .apache .logging .log4j .LogManager ;
37
37
import org .apache .logging .log4j .Logger ;
38
+ import org .apache .logging .log4j .core .LoggerContext ;
39
+ import org .apache .logging .log4j .core .config .Configuration ;
40
+ import org .apache .logging .log4j .core .config .LoggerConfig ;
38
41
import org .opensearch .OpenSearchTimeoutException ;
39
42
import org .opensearch .Version ;
40
43
import org .opensearch .action .support .PlainActionFuture ;
53
56
import org .opensearch .telemetry .tracing .noop .NoopTracer ;
54
57
import org .opensearch .test .MockLogAppender ;
55
58
import org .opensearch .test .OpenSearchTestCase ;
59
+ import org .opensearch .test .TestLogsAppender ;
56
60
import org .opensearch .test .junit .annotations .TestLogging ;
57
61
import org .opensearch .threadpool .TestThreadPool ;
58
62
import org .opensearch .threadpool .ThreadPool ;
63
+ import org .opensearch .transport .ClusterConnectionManager ;
59
64
import org .opensearch .transport .ConnectTransportException ;
60
65
import org .opensearch .transport .ConnectionProfile ;
61
66
import org .opensearch .transport .Transport ;
69
74
import org .junit .Before ;
70
75
71
76
import java .util .ArrayList ;
77
+ import java .util .Arrays ;
72
78
import java .util .Collections ;
73
79
import java .util .HashSet ;
74
80
import java .util .List ;
@@ -94,6 +100,8 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase {
94
100
private ThreadPool threadPool ;
95
101
private TransportService transportService ;
96
102
private Map <DiscoveryNode , CheckedRunnable <Exception >> nodeConnectionBlocks ;
103
+ private TestLogsAppender testLogsAppender ;
104
+ private LoggerContext loggerContext ;
97
105
98
106
private List <DiscoveryNode > generateNodes () {
99
107
List <DiscoveryNode > nodes = new ArrayList <>();
@@ -516,7 +524,7 @@ public void testConnectionCheckerRetriesIfPendingDisconnection() throws Interrup
516
524
517
525
// setup the connections
518
526
final DiscoveryNode node = new DiscoveryNode ("node0" , buildNewFakeTransportAddress (), Version .CURRENT );
519
- ;
527
+
520
528
final DiscoveryNodes nodes = DiscoveryNodes .builder ().add (node ).build ();
521
529
522
530
final AtomicBoolean connectionCompleted = new AtomicBoolean ();
@@ -526,13 +534,15 @@ public void testConnectionCheckerRetriesIfPendingDisconnection() throws Interrup
526
534
527
535
// now trigger a disconnect, and then set pending disconnections to true to fail any new connections
528
536
final long maxDisconnectionTime = 1000 ;
529
- final long disconnectionTime = 100 ;
530
- deterministicTaskQueue .scheduleAt (disconnectionTime , new Runnable () {
537
+ deterministicTaskQueue .scheduleNow (new Runnable () {
531
538
@ Override
532
539
public void run () {
533
540
transportService .disconnectFromNode (node );
534
541
logger .info ("--> setting pending disconnections to fail next connection attempts" );
535
542
service .setPendingDisconnections (new HashSet <>(Collections .singleton (node )));
543
+ // we reset the connection count during the first disconnection
544
+ // we also clear the captured logs as we want to assert for exceptions that show up after this
545
+ testLogsAppender .clearCapturedLogs ();
536
546
transportService .resetConnectToNodeCallCount ();
537
547
}
538
548
@@ -541,20 +551,39 @@ public String toString() {
541
551
return "scheduled disconnection of " + node ;
542
552
}
543
553
});
554
+ final long maxReconnectionTime = 2000 ;
555
+ final int expectedReconnectionAttempts = 5 ;
544
556
545
- // ensure the disconnect task completes, give extra time also for connection checker tasks
546
- runTasksUntil (deterministicTaskQueue , maxDisconnectionTime );
547
-
548
- // verify that connectionchecker is trying to call connectToNode multiple times
557
+ // ensure the disconnect task completes, and run for additional time to check for reconnections
558
+ // exit early if we see enough reconnection attempts
549
559
logger .info ("--> verifying connectionchecker is trying to reconnect" );
560
+ runTasksUntilExpectedReconnectionAttempts (
561
+ deterministicTaskQueue ,
562
+ maxDisconnectionTime + maxReconnectionTime ,
563
+ transportService ,
564
+ expectedReconnectionAttempts
565
+ );
566
+
567
+ // assert that we saw at least the required number of reconnection attempts, and the exceptions that showed up are as expected
550
568
logger .info ("--> number of reconnection attempts: {}" , transportService .getConnectToNodeCallCount ());
551
- assertThat ("ConnectToNode should be called multiple times" , transportService .getConnectToNodeCallCount (), greaterThan (5 ));
569
+ assertThat (
570
+ "Did not see enough reconnection attempts from connection checker" ,
571
+ transportService .getConnectToNodeCallCount (),
572
+ greaterThan (expectedReconnectionAttempts )
573
+ );
574
+ boolean logFound = testLogsAppender .waitForLog ("failed to connect" , 1 , TimeUnit .SECONDS )
575
+ && testLogsAppender .waitForLog (
576
+ "IllegalStateException: cannot make a new connection as disconnect to node" ,
577
+ 1 ,
578
+ TimeUnit .SECONDS
579
+ );
580
+ assertTrue ("Expected log for reconnection failure was not found in the required time period" , logFound );
552
581
assertFalse ("connected to " + node , transportService .nodeConnected (node ));
553
582
554
583
// clear the pending disconnections and ensure the connection gets re-established automatically by connectionchecker
555
584
logger .info ("--> clearing pending disconnections to allow connections to re-establish" );
556
585
service .clearPendingDisconnections ();
557
- runTasksUntil (deterministicTaskQueue , maxDisconnectionTime + 2 * reconnectIntervalMillis );
586
+ runTasksUntil (deterministicTaskQueue , maxDisconnectionTime + maxReconnectionTime + 2 * reconnectIntervalMillis );
558
587
assertConnectedExactlyToNodes (transportService , nodes );
559
588
}
560
589
@@ -569,6 +598,24 @@ private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long e
569
598
deterministicTaskQueue .runAllRunnableTasks ();
570
599
}
571
600
601
+ private void runTasksUntilExpectedReconnectionAttempts (
602
+ DeterministicTaskQueue deterministicTaskQueue ,
603
+ long endTimeMillis ,
604
+ TestTransportService transportService ,
605
+ int expectedReconnectionAttempts
606
+ ) {
607
+ // break the loop if we timeout or if we have enough reconnection attempts
608
+ while ((deterministicTaskQueue .getCurrentTimeMillis () < endTimeMillis )
609
+ && (transportService .getConnectToNodeCallCount () <= expectedReconnectionAttempts )) {
610
+ if (deterministicTaskQueue .hasRunnableTasks () && randomBoolean ()) {
611
+ deterministicTaskQueue .runRandomTask ();
612
+ } else if (deterministicTaskQueue .hasDeferredTasks ()) {
613
+ deterministicTaskQueue .advanceTime ();
614
+ }
615
+ }
616
+ deterministicTaskQueue .runAllRunnableTasks ();
617
+ }
618
+
572
619
private void ensureConnections (NodeConnectionsService service ) {
573
620
final PlainActionFuture <Void > future = new PlainActionFuture <>();
574
621
service .ensureConnections (() -> future .onResponse (null ));
@@ -594,6 +641,16 @@ private void assertConnected(TransportService transportService, Iterable<Discove
594
641
@ Before
595
642
public void setUp () throws Exception {
596
643
super .setUp ();
644
+ // Add any other specific messages you want to capture
645
+ List <String > messagesToCapture = Arrays .asList ("failed to connect" , "IllegalStateException" );
646
+ testLogsAppender = new TestLogsAppender (messagesToCapture );
647
+ loggerContext = (LoggerContext ) LogManager .getContext (false );
648
+ Configuration config = loggerContext .getConfiguration ();
649
+ LoggerConfig loggerConfig = config .getLoggerConfig (NodeConnectionsService .class .getName ());
650
+ loggerConfig .addAppender (testLogsAppender , null , null );
651
+ loggerConfig = config .getLoggerConfig (ClusterConnectionManager .class .getName ());
652
+ loggerConfig .addAppender (testLogsAppender , null , null );
653
+ loggerContext .updateLoggers ();
597
654
ThreadPool threadPool = new TestThreadPool (getClass ().getName ());
598
655
this .threadPool = threadPool ;
599
656
nodeConnectionBlocks = newConcurrentMap ();
@@ -605,6 +662,14 @@ public void setUp() throws Exception {
605
662
@ Override
606
663
@ After
607
664
public void tearDown () throws Exception {
665
+ testLogsAppender .clearCapturedLogs ();
666
+ loggerContext = (LoggerContext ) LogManager .getContext (false );
667
+ Configuration config = loggerContext .getConfiguration ();
668
+ LoggerConfig loggerConfig = config .getLoggerConfig (NodeConnectionsService .class .getName ());
669
+ loggerConfig .removeAppender (testLogsAppender .getName ());
670
+ loggerConfig = config .getLoggerConfig (ClusterConnectionManager .class .getName ());
671
+ loggerConfig .removeAppender (testLogsAppender .getName ());
672
+ loggerContext .updateLoggers ();
608
673
transportService .stop ();
609
674
ThreadPool .terminate (threadPool , 30 , TimeUnit .SECONDS );
610
675
threadPool = null ;
0 commit comments