17
17
import org .opensearch .common .util .concurrent .OpenSearchRejectedExecutionException ;
18
18
import org .opensearch .index .shard .IndexShard ;
19
19
import org .opensearch .indices .replication .SegmentReplicationBaseIT ;
20
+ import org .opensearch .indices .replication .common .ReplicationType ;
20
21
import org .opensearch .plugins .Plugin ;
21
22
import org .opensearch .rest .RestStatus ;
22
23
import org .opensearch .test .OpenSearchIntegTestCase ;
@@ -52,12 +53,24 @@ protected Settings nodeSettings(int nodeOrdinal) {
52
53
.build ();
53
54
}
54
55
56
+ @ Override
57
+ public Settings indexSettings () {
58
+ // we want to control refreshes
59
+ return Settings .builder ()
60
+ .put (super .indexSettings ())
61
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , SHARD_COUNT )
62
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , REPLICA_COUNT )
63
+ .put (IndexModule .INDEX_QUERY_CACHE_ENABLED_SETTING .getKey (), false )
64
+ .put (IndexMetadata .SETTING_REPLICATION_TYPE , ReplicationType .SEGMENT )
65
+ .put ("index.refresh_interval" , -1 )
66
+ .build ();
67
+ }
68
+
55
69
@ Override
56
70
protected Collection <Class <? extends Plugin >> nodePlugins () {
57
71
return asList (MockTransportService .TestPlugin .class );
58
72
}
59
73
60
- @ AwaitsFix (bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671" )
61
74
public void testWritesRejected () throws Exception {
62
75
final String primaryNode = internalCluster ().startNode ();
63
76
createIndex (INDEX_NAME );
@@ -76,6 +89,10 @@ public void testWritesRejected() throws Exception {
76
89
indexingThread .start ();
77
90
indexingThread .join ();
78
91
latch .await ();
92
+
93
+ indexDoc ();
94
+ totalDocs .incrementAndGet ();
95
+ refresh (INDEX_NAME );
79
96
// index again while we are stale.
80
97
assertBusy (() -> {
81
98
expectThrows (OpenSearchRejectedExecutionException .class , () -> {
@@ -90,6 +107,7 @@ public void testWritesRejected() throws Exception {
90
107
91
108
// index another doc showing there is no pressure enforced.
92
109
indexDoc ();
110
+ refresh (INDEX_NAME );
93
111
waitForSearchableDocs (totalDocs .incrementAndGet (), replicaNodes .toArray (new String [] {}));
94
112
verifyStoreContent ();
95
113
}
@@ -98,7 +116,6 @@ public void testWritesRejected() throws Exception {
98
116
* This test ensures that a replica can be added while the index is under write block.
99
117
* Ensuring that only write requests are blocked.
100
118
*/
101
- @ AwaitsFix (bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671" )
102
119
public void testAddReplicaWhileWritesBlocked () throws Exception {
103
120
final String primaryNode = internalCluster ().startNode ();
104
121
createIndex (INDEX_NAME );
@@ -118,6 +135,9 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
118
135
indexingThread .start ();
119
136
indexingThread .join ();
120
137
latch .await ();
138
+ indexDoc ();
139
+ totalDocs .incrementAndGet ();
140
+ refresh (INDEX_NAME );
121
141
// index again while we are stale.
122
142
assertBusy (() -> {
123
143
expectThrows (OpenSearchRejectedExecutionException .class , () -> {
@@ -142,6 +162,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
142
162
143
163
// index another doc showing there is no pressure enforced.
144
164
indexDoc ();
165
+ refresh (INDEX_NAME );
145
166
waitForSearchableDocs (totalDocs .incrementAndGet (), replicaNodes .toArray (new String [] {}));
146
167
verifyStoreContent ();
147
168
}
@@ -258,7 +279,7 @@ private void assertFailedRequests(BulkResponse response) {
258
279
}
259
280
260
281
private void indexDoc () {
261
- client ().prepareIndex (INDEX_NAME ).setId (UUIDs .base64UUID ()).setSource ("{}" , "{}" ).get ();
282
+ client ().prepareIndex (INDEX_NAME ).setId (UUIDs .base64UUID ()).setSource ("{}" , "{}" ).execute (). actionGet ();
262
283
}
263
284
264
285
private void assertEqualSegmentInfosVersion (List <String > replicaNames , IndexShard primaryShard ) {
0 commit comments