55
55
import org .opensearch .test .MockLogAppender ;
56
56
import org .opensearch .test .junit .annotations .TestLogging ;
57
57
58
+ import java .util .ArrayList ;
58
59
import java .util .Arrays ;
59
60
import java .util .Collections ;
60
61
import java .util .HashMap ;
61
62
import java .util .HashSet ;
63
+ import java .util .List ;
62
64
import java .util .Map ;
63
65
import java .util .Set ;
64
66
import java .util .concurrent .atomic .AtomicBoolean ;
67
69
import java .util .concurrent .atomic .AtomicReference ;
68
70
import java .util .function .LongSupplier ;
69
71
72
+ import static org .opensearch .cluster .routing .allocation .DiskThresholdSettings .CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE ;
70
73
import static org .hamcrest .Matchers .contains ;
71
74
import static org .hamcrest .Matchers .equalTo ;
72
75
@@ -581,12 +584,16 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
581
584
);
582
585
583
586
advanceTime .set (false ); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
584
- assertSingleWarningMessage (
585
- monitor ,
586
- aboveHighWatermark ,
587
+ final List <String > messages = new ArrayList <>();
588
+ messages .add (
587
589
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
588
590
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
589
591
);
592
+ messages .add (
593
+ "Putting index create block on cluster as all nodes are breaching high disk watermark. "
594
+ + "Number of nodes above high watermark: 1."
595
+ );
596
+ assertMultipleWarningMessages (monitor , aboveHighWatermark , messages );
590
597
591
598
advanceTime .set (true );
592
599
assertRepeatedWarningMessages (
@@ -605,22 +612,11 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
605
612
606
613
relocatingShardSizeRef .set (-5L );
607
614
advanceTime .set (true );
608
- assertSingleInfoMessage (
609
- monitor ,
610
- aboveHighWatermark ,
611
- "high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
612
- + "the node is expected to be below the high disk watermark when these relocations are complete"
613
- );
614
615
615
616
relocatingShardSizeRef .set (0L );
616
617
timeSupplier .getAsLong (); // advance time long enough to do another reroute
617
618
advanceTime .set (false ); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
618
- assertSingleWarningMessage (
619
- monitor ,
620
- aboveHighWatermark ,
621
- "high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
622
- + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
623
- );
619
+ assertMultipleWarningMessages (monitor , aboveHighWatermark , messages );
624
620
625
621
advanceTime .set (true );
626
622
assertRepeatedWarningMessages (
@@ -722,6 +718,113 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
722
718
assertTrue (countBlocksCalled .get () == 0 );
723
719
}
724
720
721
+ public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark () {
722
+ AllocationService allocation = createAllocationService (
723
+ Settings .builder ()
724
+ .put ("cluster.routing.allocation.node_concurrent_recoveries" , 10 )
725
+ .put ("cluster.blocks.create_index.enabled" , false )
726
+ .build ()
727
+ );
728
+ Metadata metadata = Metadata .builder ()
729
+ .put (
730
+ IndexMetadata .builder ("test" )
731
+ .settings (settings (Version .CURRENT ).put ("index.routing.allocation.require._id" , "node2" ))
732
+ .numberOfShards (1 )
733
+ .numberOfReplicas (0 )
734
+ )
735
+ .put (
736
+ IndexMetadata .builder ("test_1" )
737
+ .settings (settings (Version .CURRENT ).put ("index.routing.allocation.require._id" , "node1" ))
738
+ .numberOfShards (1 )
739
+ .numberOfReplicas (0 )
740
+ )
741
+ .put (
742
+ IndexMetadata .builder ("test_2" )
743
+ .settings (settings (Version .CURRENT ).put ("index.routing.allocation.require._id" , "node1" ))
744
+ .numberOfShards (1 )
745
+ .numberOfReplicas (0 )
746
+ )
747
+ .build ();
748
+ RoutingTable routingTable = RoutingTable .builder ()
749
+ .addAsNew (metadata .index ("test" ))
750
+ .addAsNew (metadata .index ("test_1" ))
751
+ .addAsNew (metadata .index ("test_2" ))
752
+ .build ();
753
+
754
+ final ClusterState clusterState = applyStartedShardsUntilNoChange (
755
+ ClusterState .builder (ClusterName .CLUSTER_NAME_SETTING .getDefault (Settings .EMPTY ))
756
+ .metadata (metadata )
757
+ .routingTable (routingTable )
758
+ .blocks (ClusterBlocks .builder ().addGlobalBlock (Metadata .CLUSTER_CREATE_INDEX_BLOCK ).build ())
759
+ .nodes (DiscoveryNodes .builder ().add (newNode ("node1" )).add (newNode ("node2" )))
760
+ .build (),
761
+ allocation
762
+ );
763
+ AtomicReference <Set <String >> indices = new AtomicReference <>();
764
+ AtomicInteger countBlocksCalled = new AtomicInteger ();
765
+ AtomicInteger countUnblockBlocksCalled = new AtomicInteger ();
766
+ AtomicLong currentTime = new AtomicLong ();
767
+ Settings settings = Settings .builder ().put (CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE .getKey (), true ).build ();
768
+ DiskThresholdMonitor monitor = new DiskThresholdMonitor (
769
+ settings ,
770
+ () -> clusterState ,
771
+ new ClusterSettings (settings , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS ),
772
+ null ,
773
+ currentTime ::get ,
774
+ (reason , priority , listener ) -> {
775
+ listener .onResponse (null );
776
+ }
777
+ ) {
778
+
779
+ @ Override
780
+ protected void updateIndicesReadOnly (Set <String > indicesToMarkReadOnly , ActionListener <Void > listener , boolean readOnly ) {
781
+ assertTrue (indices .compareAndSet (null , indicesToMarkReadOnly ));
782
+ assertTrue (readOnly );
783
+ listener .onResponse (null );
784
+ }
785
+
786
+ @ Override
787
+ protected void setIndexCreateBlock (ActionListener <Void > listener , boolean indexCreateBlock ) {
788
+ if (indexCreateBlock == true ) {
789
+ countBlocksCalled .set (countBlocksCalled .get () + 1 );
790
+ } else {
791
+ countUnblockBlocksCalled .set (countUnblockBlocksCalled .get () + 1 );
792
+ }
793
+
794
+ listener .onResponse (null );
795
+ }
796
+ };
797
+
798
+ Map <String , DiskUsage > builder = new HashMap <>();
799
+
800
+ // Initially all the nodes are breaching high watermark and IndexCreateBlock is already present on the cluster.
801
+ // Since block is already present, DiskThresholdMonitor should not again try to apply block.
802
+ builder .put ("node1" , new DiskUsage ("node1" , "node1" , "/foo/bar" , 100 , 9 ));
803
+ builder .put ("node2" , new DiskUsage ("node2" , "node2" , "/foo/bar" , 100 , 9 ));
804
+ monitor .onNewInfo (clusterInfo (builder ));
805
+ // Since Block is already present and nodes are below high watermark so neither block nor unblock will be called.
806
+ assertEquals (countBlocksCalled .get (), 0 );
807
+ assertEquals (countUnblockBlocksCalled .get (), 0 );
808
+
809
+ // Ensure DiskThresholdMonitor does not try to remove block in the next iteration if all nodes are breaching high watermark.
810
+ monitor .onNewInfo (clusterInfo (builder ));
811
+ assertEquals (countBlocksCalled .get (), 0 );
812
+ assertEquals (countUnblockBlocksCalled .get (), 0 );
813
+
814
+ builder = new HashMap <>();
815
+
816
+ // If any node is no longer breaching high watermark, DiskThresholdMonitor should remove IndexCreateBlock.
817
+ builder .put ("node1" , new DiskUsage ("node1" , "node1" , "/foo/bar" , 100 , 19 ));
818
+ builder .put ("node2" , new DiskUsage ("node2" , "node2" , "/foo/bar" , 100 , 1 ));
819
+ // Need to add delay in current time to allow nodes to be removed high watermark list.
820
+ currentTime .addAndGet (randomLongBetween (60001 , 120000 ));
821
+
822
+ monitor .onNewInfo (clusterInfo (builder ));
823
+ // Block will be removed if any nodes is no longer breaching high watermark.
824
+ assertEquals (countBlocksCalled .get (), 0 );
825
+ assertEquals (countUnblockBlocksCalled .get (), 1 );
826
+ }
827
+
725
828
private void assertNoLogging (DiskThresholdMonitor monitor , final Map <String , DiskUsage > diskUsages ) throws IllegalAccessException {
726
829
try (MockLogAppender mockAppender = MockLogAppender .createForLoggers (LogManager .getLogger (DiskThresholdMonitor .class ))) {
727
830
mockAppender .addExpectation (
@@ -756,10 +859,11 @@ private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, final M
756
859
}
757
860
}
758
861
759
- private void assertSingleWarningMessage (DiskThresholdMonitor monitor , final Map <String , DiskUsage > diskUsages , String message )
862
+ private void assertMultipleWarningMessages (DiskThresholdMonitor monitor , final Map <String , DiskUsage > diskUsages , List < String > messages )
760
863
throws IllegalAccessException {
761
- assertLogging (monitor , diskUsages , Level .WARN , message );
762
- assertNoLogging (monitor , diskUsages );
864
+ for (int index = 0 ; index < messages .size (); index ++) {
865
+ assertLogging (monitor , diskUsages , Level .WARN , messages .get (index ));
866
+ }
763
867
}
764
868
765
869
private void assertSingleInfoMessage (DiskThresholdMonitor monitor , final Map <String , DiskUsage > diskUsages , String message )
0 commit comments