Skip to content

Commit e4bfda7

Browse files
gbbafnakkewwei
authored andcommitted
Update checkpoint from remote nodes replicas (opensearch-project#13888) (opensearch-project#14062) (opensearch-project#14094)
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 1cb5a32 commit e4bfda7

File tree

7 files changed

+45
-88
lines changed

7 files changed

+45
-88
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,11 @@ private Thread getIndexingThread() {
186186
indexSingleDoc(indexName);
187187
long currentDocCount = indexedDocs.incrementAndGet();
188188
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
189-
logger.info("--> [iteration {}] flushing index", currentDocCount);
190189
if (rarely()) {
190+
logger.info("--> [iteration {}] flushing index", currentDocCount);
191191
client().admin().indices().prepareFlush(indexName).get();
192192
} else {
193+
logger.info("--> [iteration {}] refreshing index", currentDocCount);
193194
client().admin().indices().prepareRefresh(indexName).get();
194195
}
195196
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ initalMetadataVersion < internalCluster().client()
273273
* After shard relocation completes, shuts down the docrep nodes and asserts remote
274274
* index settings are applied even when the index is in YELLOW state
275275
*/
276+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13737")
276277
public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Exception {
277278
internalCluster().startClusterManagerOnlyNode();
278279

@@ -329,6 +330,7 @@ public void testIndexSettingsUpdatedEvenForMisconfiguredReplicas() throws Except
329330
* After shard relocation completes, restarts the docrep node holding extra replica shard copy
330331
* and asserts remote index settings are applied as soon as the docrep replica copy is unassigned
331332
*/
333+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13871")
332334
public void testIndexSettingsUpdatedWhenDocrepNodeIsRestarted() throws Exception {
333335
internalCluster().startClusterManagerOnlyNode();
334336

@@ -469,6 +471,7 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E
469471
* exclude docrep nodes, assert that remote index path file exists
470472
* when shards start relocating to the remote nodes.
471473
*/
474+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13939")
472475
public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
473476
String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();
474477

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@
88

99
package org.opensearch.remotemigration;
1010

11-
import org.opensearch.action.DocWriteResponse;
1211
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
1312
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1413
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
1514
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
1615
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
17-
import org.opensearch.action.delete.DeleteResponse;
18-
import org.opensearch.action.index.IndexResponse;
1916
import org.opensearch.client.Client;
2017
import org.opensearch.client.Requests;
2118
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
@@ -66,8 +63,8 @@ public void testRemotePrimaryRelocation() throws Exception {
6663

6764
AtomicInteger numAutoGenDocs = new AtomicInteger();
6865
final AtomicBoolean finished = new AtomicBoolean(false);
69-
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
70-
66+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
67+
asyncIndexingService.startIndexing();
7168
refresh("test");
7269

7370
// add remote node in mixed mode cluster
@@ -141,17 +138,19 @@ public void testRemotePrimaryRelocation() throws Exception {
141138
logger.info("--> relocation from remote to remote complete");
142139

143140
finished.set(true);
144-
indexingThread.join();
141+
asyncIndexingService.stopIndexing();
145142
refresh("test");
146-
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
143+
OpenSearchAssertions.assertHitCount(
144+
client().prepareSearch("test").setTrackTotalHits(true).get(),
145+
asyncIndexingService.getIndexedDocs()
146+
);
147147
OpenSearchAssertions.assertHitCount(
148148
client().prepareSearch("test")
149149
.setTrackTotalHits(true)// extra paranoia ;)
150150
.setQuery(QueryBuilders.termQuery("auto", true))
151151
.get(),
152-
numAutoGenDocs.get()
152+
asyncIndexingService.getIndexedDocs()
153153
);
154-
155154
}
156155

157156
public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
@@ -165,9 +164,8 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
165164
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
166165
ensureGreen("test");
167166

168-
AtomicInteger numAutoGenDocs = new AtomicInteger();
169-
final AtomicBoolean finished = new AtomicBoolean(false);
170-
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
167+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
168+
asyncIndexingService.startIndexing();
171169

172170
refresh("test");
173171

@@ -209,27 +207,11 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
209207
assertEquals(actionGet.getRelocatingShards(), 0);
210208
assertEquals(docRepNode, primaryNodeName("test"));
211209

212-
finished.set(true);
213-
indexingThread.join();
210+
asyncIndexingService.stopIndexing();
214211
client().admin()
215212
.cluster()
216213
.prepareUpdateSettings()
217214
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
218215
.get();
219216
}
220-
221-
private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
222-
Thread indexingThread = new Thread(() -> {
223-
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
224-
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
225-
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
226-
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
227-
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
228-
client().prepareIndex("test").setSource("auto", true).get();
229-
numAutoGenDocs.incrementAndGet();
230-
}
231-
});
232-
indexingThread.start();
233-
return indexingThread;
234-
}
235217
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java

Lines changed: 26 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,32 +8,27 @@
88

99
package org.opensearch.remotemigration;
1010

11-
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
12-
13-
import org.opensearch.action.DocWriteResponse;
1411
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1512
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
13+
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
1614
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
17-
import org.opensearch.action.delete.DeleteResponse;
18-
import org.opensearch.action.index.IndexResponse;
1915
import org.opensearch.cluster.metadata.IndexMetadata;
2016
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
2117
import org.opensearch.common.Priority;
2218
import org.opensearch.common.settings.Settings;
2319
import org.opensearch.common.unit.TimeValue;
20+
import org.opensearch.index.SegmentReplicationPerGroupStats;
2421
import org.opensearch.index.query.QueryBuilders;
2522
import org.opensearch.test.OpenSearchIntegTestCase;
2623
import org.opensearch.test.hamcrest.OpenSearchAssertions;
2724

28-
import java.util.concurrent.atomic.AtomicBoolean;
29-
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.TimeUnit;
3026

3127
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
3228
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
3329
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3430

3531
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
36-
3732
public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {
3833

3934
protected int maximumNumberOfShards() {
@@ -52,6 +47,7 @@ protected int minimumNumberOfReplicas() {
5247
Brings up new replica copies on remote and docrep nodes, when primary is on a remote node
5348
Live indexing is happening meanwhile
5449
*/
50+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13473")
5551
public void testReplicaRecovery() throws Exception {
5652
internalCluster().setBootstrapClusterManagerNodeIndex(0);
5753
String primaryNode = internalCluster().startNode();
@@ -63,10 +59,8 @@ public void testReplicaRecovery() throws Exception {
6359
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
6460
String replicaNode = internalCluster().startNode();
6561
ensureGreen("test");
66-
67-
AtomicInteger numAutoGenDocs = new AtomicInteger();
68-
final AtomicBoolean finished = new AtomicBoolean(false);
69-
Thread indexingThread = getThread(finished, numAutoGenDocs);
62+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
63+
asyncIndexingService.startIndexing();
7064

7165
refresh("test");
7266

@@ -78,12 +72,10 @@ public void testReplicaRecovery() throws Exception {
7872
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
7973
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
8074

81-
String remoteNode2 = internalCluster().startNode();
75+
internalCluster().startNode();
8276
internalCluster().validateClusterFormed();
8377

8478
// identify the primary
85-
86-
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
8779
logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode);
8880
client().admin()
8981
.cluster()
@@ -102,7 +94,6 @@ public void testReplicaRecovery() throws Exception {
10294

10395
assertEquals(0, clusterHealthResponse.getRelocatingShards());
10496
logger.info("--> relocation of primary from docrep to remote complete");
105-
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
10697

10798
logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
10899
// Increase replica count to 3
@@ -129,52 +120,33 @@ public void testReplicaRecovery() throws Exception {
129120
logger.info("--> replica is up now on another docrep now as well as remote node");
130121

131122
assertEquals(0, clusterHealthResponse.getRelocatingShards());
123+
asyncIndexingService.stopIndexing();
124+
refresh("test");
132125

133-
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
126+
// segrep lag should be zero
127+
assertBusy(() -> {
128+
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
129+
.indices()
130+
.prepareSegmentReplicationStats("test")
131+
.setDetailed(true)
132+
.execute()
133+
.actionGet();
134+
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0);
135+
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
136+
perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0));
137+
}, 20, TimeUnit.SECONDS);
134138

135-
// Stop replicas on docrep now.
136-
// ToDo : Remove once we have dual replication enabled
137-
client().admin()
138-
.indices()
139-
.updateSettings(
140-
new UpdateSettingsRequest("test").settings(
141-
Settings.builder()
142-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
143-
.put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode)
144-
.build()
145-
)
146-
)
147-
.get();
148-
149-
finished.set(true);
150-
indexingThread.join();
151-
refresh("test");
152-
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
139+
OpenSearchAssertions.assertHitCount(
140+
client().prepareSearch("test").setTrackTotalHits(true).get(),
141+
asyncIndexingService.getIndexedDocs()
142+
);
153143
OpenSearchAssertions.assertHitCount(
154144
client().prepareSearch("test")
155145
.setTrackTotalHits(true)// extra paranoia ;)
156146
.setQuery(QueryBuilders.termQuery("auto", true))
157-
// .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2))
158147
.get(),
159-
numAutoGenDocs.get()
148+
asyncIndexingService.getIndexedDocs()
160149
);
161150

162151
}
163-
164-
private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
165-
Thread indexingThread = new Thread(() -> {
166-
while (finished.get() == false && numAutoGenDocs.get() < 100) {
167-
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
168-
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
169-
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
170-
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
171-
client().prepareIndex("test").setSource("auto", true).get();
172-
numAutoGenDocs.incrementAndGet();
173-
logger.info("Indexed {} docs here", numAutoGenDocs.get());
174-
}
175-
});
176-
indexingThread.start();
177-
return indexingThread;
178-
}
179-
180152
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,6 @@ public boolean shouldSeedRemoteStore() {
525525
public Function<String, Boolean> isShardOnRemoteEnabledNode = nodeId -> {
526526
DiscoveryNode node = discoveryNodes.get(nodeId);
527527
if (node != null) {
528-
logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode());
529528
return node.isRemoteStoreNode();
530529
}
531530
return false;

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai
384384
protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) {
385385
// Update replication checkpoint on source via transport call only supported for remote store integration. For node-
386386
// node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call
387-
if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) {
387+
if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) {
388388
return;
389389
}
390390
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();

server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn
9191
// make sure the store is not released until we are done.
9292
this.cancellableThreads = new CancellableThreads();
9393
store.incRef();
94-
if (indexShard.indexSettings().isRemoteStoreEnabled()) {
94+
if (indexShard.indexSettings().isAssignedOnRemoteNode()) {
9595
indexShard.remoteStore().incRef();
9696
}
9797
}
@@ -284,7 +284,7 @@ protected void closeInternal() {
284284
try {
285285
store.decRef();
286286
} finally {
287-
if (indexShard.indexSettings().isRemoteStoreEnabled()) {
287+
if (indexShard.indexSettings().isAssignedOnRemoteNode()) {
288288
indexShard.remoteStore().decRef();
289289
}
290290
}

0 commit comments

Comments
 (0)