Skip to content

Commit eb5d8e1

Browse files
authored
[Remote Store] Fix couple of Remote Store flaky test and use bulk api for ingestion (opensearch-project#9190) (opensearch-project#9251)
1 parent 32e8187 commit eb5d8e1

File tree

3 files changed

+49
-23
lines changed

3 files changed

+49
-23
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
package org.opensearch.remotestore;
1010

1111
import org.junit.After;
12+
import org.opensearch.action.bulk.BulkItemResponse;
13+
import org.opensearch.action.bulk.BulkRequest;
14+
import org.opensearch.action.bulk.BulkResponse;
15+
import org.opensearch.action.index.IndexRequest;
1216
import org.opensearch.action.index.IndexResponse;
1317
import org.opensearch.cluster.metadata.IndexMetadata;
1418
import org.opensearch.common.UUIDs;
@@ -31,10 +35,10 @@
3135
import java.util.Map;
3236
import java.util.concurrent.atomic.AtomicInteger;
3337

34-
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
35-
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
3638
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING;
39+
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
3740
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING;
41+
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
3842
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3943

4044
public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
@@ -74,13 +78,18 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
7478
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed);
7579
refreshedOrFlushedOperations = totalOperations;
7680
int numberOfOperations = randomIntBetween(20, 50);
77-
for (int j = 0; j < numberOfOperations; j++) {
78-
IndexResponse response = indexSingleDoc(index);
79-
maxSeqNo = response.getSeqNo();
80-
shardId = response.getShardId().id();
81-
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
81+
int numberOfBulk = randomIntBetween(1, 5);
82+
for (int j = 0; j < numberOfBulk; j++) {
83+
BulkResponse res = indexBulk(index, numberOfOperations);
84+
for (BulkItemResponse singleResp : res.getItems()) {
85+
indexingStats.put(
86+
MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(),
87+
singleResp.getResponse().getSeqNo()
88+
);
89+
maxSeqNo = singleResp.getResponse().getSeqNo();
90+
}
91+
totalOperations += numberOfOperations;
8292
}
83-
totalOperations += numberOfOperations;
8493
}
8594

8695
indexingStats.put(TOTAL_OPERATIONS, totalOperations);
@@ -123,6 +132,18 @@ protected IndexResponse indexSingleDoc(String indexName) {
123132
.get();
124133
}
125134

135+
protected BulkResponse indexBulk(String indexName, int numDocs) {
136+
BulkRequest bulkRequest = new BulkRequest();
137+
for (int i = 0; i < numDocs; i++) {
138+
final IndexRequest request = client().prepareIndex(indexName)
139+
.setId(UUIDs.randomBase64UUID())
140+
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
141+
.request();
142+
bulkRequest.add(request);
143+
}
144+
return client().bulk(bulkRequest).actionGet();
145+
}
146+
126147
public static Settings remoteStoreClusterSettings(String segmentRepoName) {
127148
return remoteStoreClusterSettings(segmentRepoName, segmentRepoName);
128149
}
@@ -170,10 +191,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
170191
return remoteStoreIndexSettings(numberOfReplicas, 1);
171192
}
172193

173-
protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) {
194+
protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) {
174195
return Settings.builder()
175196
.put(remoteStoreIndexSettings(numberOfReplicas))
176197
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit)
198+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh))
177199
.build();
178200
}
179201

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88

99
package org.opensearch.remotestore;
1010

11+
import org.hamcrest.MatcherAssert;
1112
import org.junit.Before;
1213
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
1314
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
1415
import org.opensearch.action.index.IndexResponse;
1516
import org.opensearch.cluster.metadata.IndexMetadata;
1617
import org.opensearch.cluster.routing.RecoverySource;
1718
import org.opensearch.common.settings.Settings;
18-
import org.opensearch.index.shard.RemoteStoreRefreshListener;
1919
import org.opensearch.indices.recovery.RecoveryState;
2020
import org.opensearch.plugins.Plugin;
2121
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -29,7 +29,9 @@
2929
import java.util.concurrent.TimeUnit;
3030

3131
import static org.hamcrest.Matchers.comparesEqualTo;
32-
import static org.hamcrest.Matchers.comparesEqualTo;
32+
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.oneOf;
34+
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
3335
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3436
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
3537

@@ -143,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception {
143145
verifyRemoteStoreCleanup();
144146
}
145147

146-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
147148
public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
148-
internalCluster().startDataOnlyNodes(3);
149-
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
149+
internalCluster().startDataOnlyNodes(1);
150+
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
150151
int numberOfIterations = randomIntBetween(5, 15);
151152
indexData(numberOfIterations, true, INDEX_NAME);
152153
String indexUUID = client().admin()
@@ -158,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
158159
// Delete is async.
159160
assertBusy(() -> {
160161
int actualFileCount = getFileCount(indexPath);
161-
if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) {
162-
assertEquals(numberOfIterations, actualFileCount);
162+
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
163+
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
163164
} else {
164165
// As delete is async its possible that the file gets created before the deletion or after
165166
// deletion.
166-
assertTrue(actualFileCount >= 10 || actualFileCount <= 11);
167+
MatcherAssert.assertThat(
168+
actualFileCount,
169+
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
170+
);
167171
}
168172
}, 30, TimeUnit.SECONDS);
169173
}
170174

171-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
172175
public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
173-
internalCluster().startDataOnlyNodes(3);
174-
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
176+
internalCluster().startDataOnlyNodes(1);
177+
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
175178
int numberOfIterations = randomIntBetween(5, 15);
176179
indexData(numberOfIterations, false, INDEX_NAME);
177180
String indexUUID = client().admin()
@@ -180,6 +183,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
180183
.get()
181184
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
182185
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
183-
assertEquals(numberOfIterations, getFileCount(indexPath));
186+
int actualFileCount = getFileCount(indexPath);
187+
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
188+
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
184189
}
185190
}

server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception {
122122
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
123123
}
124124

125-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130")
126125
public void testFailoverWhileIndexing() throws Exception {
127126
internalCluster().startNode();
128127
internalCluster().startNode();
@@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception {
143142
.setSource("field", numAutoGenDocs.get())
144143
.get();
145144

146-
if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
145+
if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) {
147146
numAutoGenDocs.incrementAndGet();
148147
if (numAutoGenDocs.get() == docCount / 2) {
149148
if (random().nextInt(3) == 0) {

0 commit comments

Comments
 (0)