Skip to content

Commit 5f2bff3

Browse files
authored
[Remote Store] Fix sleep time bug during remote store sync (#14342)
* [Remote Store] Fix sleep time bug during remote store sync (#14037) --------- Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> * Fix remote migration ITs Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> --------- Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 79405ed commit 5f2bff3

File tree

5 files changed

+62
-61
lines changed

5 files changed

+62
-61
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,24 @@
99
package org.opensearch.remotemigration;
1010

1111
import org.opensearch.action.DocWriteResponse;
12+
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
13+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1214
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
1315
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
1416
import org.opensearch.action.bulk.BulkRequest;
1517
import org.opensearch.action.bulk.BulkResponse;
1618
import org.opensearch.action.delete.DeleteResponse;
1719
import org.opensearch.action.index.IndexRequest;
1820
import org.opensearch.action.index.IndexResponse;
21+
import org.opensearch.client.Requests;
1922
import org.opensearch.cluster.ClusterState;
23+
import org.opensearch.cluster.health.ClusterHealthStatus;
2024
import org.opensearch.cluster.metadata.RepositoryMetadata;
2125
import org.opensearch.cluster.routing.RoutingNode;
26+
import org.opensearch.common.Priority;
2227
import org.opensearch.common.UUIDs;
2328
import org.opensearch.common.settings.Settings;
29+
import org.opensearch.common.unit.TimeValue;
2430
import org.opensearch.repositories.fs.ReloadableFsRepository;
2531
import org.opensearch.test.OpenSearchIntegTestCase;
2632
import org.junit.Before;
@@ -39,6 +45,7 @@
3945
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
4046
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
4147
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
48+
import static org.hamcrest.Matchers.equalTo;
4249

4350
public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
4451
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
@@ -114,6 +121,10 @@ public void initDocRepToRemoteMigration() {
114121
);
115122
}
116123

124+
public ClusterHealthStatus ensureGreen(String... indices) {
125+
return ensureGreen(TimeValue.timeValueSeconds(60), indices);
126+
}
127+
117128
public BulkResponse indexBulk(String indexName, int numDocs) {
118129
BulkRequest bulkRequest = new BulkRequest();
119130
for (int i = 0; i < numDocs; i++) {
@@ -181,14 +192,12 @@ private Thread getIndexingThread() {
181192
long currentDocCount = indexedDocs.incrementAndGet();
182193
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
183194
if (rarely()) {
184-
logger.info("--> [iteration {}] flushing index", currentDocCount);
185195
client().admin().indices().prepareFlush(indexName).get();
196+
logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount);
186197
} else {
187-
logger.info("--> [iteration {}] refreshing index", currentDocCount);
188198
client().admin().indices().prepareRefresh(indexName).get();
189199
}
190200
}
191-
logger.info("Completed ingestion of {} docs", currentDocCount);
192201
}
193202
});
194203
}
@@ -218,4 +227,38 @@ public void stopShardRebalancing() {
218227
.get()
219228
);
220229
}
230+
231+
public ClusterHealthStatus waitForRelocation() {
232+
ClusterHealthRequest request = Requests.clusterHealthRequest()
233+
.waitForNoRelocatingShards(true)
234+
.timeout(TimeValue.timeValueSeconds(60))
235+
.waitForEvents(Priority.LANGUID);
236+
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
237+
if (actionGet.isTimedOut()) {
238+
logger.info(
239+
"waitForRelocation timed out, cluster state:\n{}\n{}",
240+
client().admin().cluster().prepareState().get().getState(),
241+
client().admin().cluster().preparePendingClusterTasks().get()
242+
);
243+
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
244+
}
245+
return actionGet.getStatus();
246+
}
247+
248+
public ClusterHealthStatus waitForRelocation(TimeValue t) {
249+
ClusterHealthRequest request = Requests.clusterHealthRequest()
250+
.waitForNoRelocatingShards(true)
251+
.timeout(t)
252+
.waitForEvents(Priority.LANGUID);
253+
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
254+
if (actionGet.isTimedOut()) {
255+
logger.info(
256+
"waitForRelocation timed out, cluster state:\n{}\n{}",
257+
client().admin().cluster().prepareState().get().getState(),
258+
client().admin().cluster().preparePendingClusterTasks().get()
259+
);
260+
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
261+
}
262+
return actionGet.getStatus();
263+
}
221264
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,7 @@ public void testRemotePrimaryRelocation() throws Exception {
9999
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
100100
.execute()
101101
.actionGet();
102-
ClusterHealthResponse clusterHealthResponse = client().admin()
103-
.cluster()
104-
.prepareHealth()
105-
.setTimeout(TimeValue.timeValueSeconds(60))
106-
.setWaitForEvents(Priority.LANGUID)
107-
.setWaitForNoRelocatingShards(true)
108-
.execute()
109-
.actionGet();
110-
111-
assertEquals(0, clusterHealthResponse.getRelocatingShards());
102+
waitForRelocation();
112103
assertEquals(remoteNode, primaryNodeName("test"));
113104
logger.info("--> relocation from docrep to remote complete");
114105

@@ -123,16 +114,7 @@ public void testRemotePrimaryRelocation() throws Exception {
123114
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
124115
.execute()
125116
.actionGet();
126-
clusterHealthResponse = client().admin()
127-
.cluster()
128-
.prepareHealth()
129-
.setTimeout(TimeValue.timeValueSeconds(60))
130-
.setWaitForEvents(Priority.LANGUID)
131-
.setWaitForNoRelocatingShards(true)
132-
.execute()
133-
.actionGet();
134-
135-
assertEquals(0, clusterHealthResponse.getRelocatingShards());
117+
waitForRelocation();
136118
assertEquals(remoteNode2, primaryNodeName("test"));
137119

138120
logger.info("--> relocation from remote to remote complete");
@@ -155,7 +137,6 @@ public void testRemotePrimaryRelocation() throws Exception {
155137

156138
public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
157139
String docRepNode = internalCluster().startNode();
158-
Client client = internalCluster().client(docRepNode);
159140
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
160141
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
161142
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@
88

99
package org.opensearch.remotemigration;
1010

11-
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1211
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1312
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
1413
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1514
import org.opensearch.cluster.metadata.IndexMetadata;
1615
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
17-
import org.opensearch.common.Priority;
1816
import org.opensearch.common.settings.Settings;
19-
import org.opensearch.common.unit.TimeValue;
2017
import org.opensearch.index.SegmentReplicationPerGroupStats;
2118
import org.opensearch.index.query.QueryBuilders;
2219
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -83,16 +80,8 @@ public void testReplicaRecovery() throws Exception {
8380
.add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode))
8481
.execute()
8582
.actionGet();
86-
ClusterHealthResponse clusterHealthResponse = client().admin()
87-
.cluster()
88-
.prepareHealth()
89-
.setTimeout(TimeValue.timeValueSeconds(60))
90-
.setWaitForEvents(Priority.LANGUID)
91-
.setWaitForNoRelocatingShards(true)
92-
.execute()
93-
.actionGet();
9483

95-
assertEquals(0, clusterHealthResponse.getRelocatingShards());
84+
waitForRelocation();
9685
logger.info("--> relocation of primary from docrep to remote complete");
9786

9887
logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
@@ -109,17 +98,7 @@ public void testReplicaRecovery() throws Exception {
10998
)
11099
.get();
111100

112-
client().admin()
113-
.cluster()
114-
.prepareHealth()
115-
.setTimeout(TimeValue.timeValueSeconds(60))
116-
.setWaitForEvents(Priority.LANGUID)
117-
.setWaitForGreenStatus()
118-
.execute()
119-
.actionGet();
120-
logger.info("--> replica is up now on another docrep now as well as remote node");
121-
122-
assertEquals(0, clusterHealthResponse.getRelocatingShards());
101+
waitForRelocation();
123102
asyncIndexingService.stopIndexing();
124103
refresh("test");
125104

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@
88

99
package org.opensearch.remotemigration;
1010

11-
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1211
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
1312
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
1413
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1514
import org.opensearch.client.Client;
1615
import org.opensearch.cluster.metadata.IndexMetadata;
17-
import org.opensearch.common.Priority;
1816
import org.opensearch.common.settings.Settings;
1917
import org.opensearch.common.unit.TimeValue;
2018
import org.opensearch.common.util.FeatureFlags;
@@ -28,6 +26,7 @@
2826
import java.util.List;
2927
import java.util.Map;
3028

29+
import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
3130
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
3231
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
3332
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -48,6 +47,10 @@ protected Settings featureFlagSettings() {
4847
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
4948
}
5049

50+
protected int maximumNumberOfShards() {
51+
return 5;
52+
}
53+
5154
public void testMixedModeAddRemoteNodes() throws Exception {
5255
internalCluster().setBootstrapClusterManagerNodeIndex(0);
5356
List<String> cmNodes = internalCluster().startNodes(1);
@@ -155,7 +158,11 @@ public void testEndToEndRemoteMigration() throws Exception {
155158
internalCluster().setBootstrapClusterManagerNodeIndex(0);
156159
List<String> docRepNodes = internalCluster().startNodes(2);
157160
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
158-
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
161+
updateSettingsRequest.persistentSettings(
162+
Settings.builder()
163+
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
164+
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maximumNumberOfShards())
165+
);
159166
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
160167
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
161168
ensureGreen("test");
@@ -189,16 +196,7 @@ public void testEndToEndRemoteMigration() throws Exception {
189196
)
190197
.get()
191198
);
192-
193-
ClusterHealthResponse clusterHealthResponse = client().admin()
194-
.cluster()
195-
.prepareHealth()
196-
.setTimeout(TimeValue.timeValueSeconds(45))
197-
.setWaitForEvents(Priority.LANGUID)
198-
.setWaitForNoRelocatingShards(true)
199-
.execute()
200-
.actionGet();
201-
assertTrue(clusterHealthResponse.getRelocatingShards() == 0);
199+
waitForRelocation(TimeValue.timeValueSeconds(90));
202200
logger.info("---> Stopping indexing thread");
203201
asyncIndexingService.stopIndexing();
204202
Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2146,7 +2146,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
21462146
segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
21472147
}
21482148
try {
2149-
Thread.sleep(TimeValue.timeValueSeconds(30).seconds());
2149+
Thread.sleep(TimeValue.timeValueSeconds(30).millis());
21502150
} catch (InterruptedException ie) {
21512151
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
21522152
}

0 commit comments

Comments
 (0)