Skip to content

Commit 9bf99b4

Browse files
authored
Add support to run SegRep integ tests using remote store settings (#7361)
Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 63fbd0b commit 9bf99b4

File tree

6 files changed

+109
-34
lines changed

6 files changed

+109
-34
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.junit.After;
12+
import org.junit.Before;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.indices.replication.SegmentReplicationIT;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
19+
import java.nio.file.Path;
20+
21+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
22+
23+
/**
24+
* The aim of this class is to run Segment Replication integ tests by enabling remote store specific settings.
25+
* This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling
26+
* remote store.
27+
*/
28+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
29+
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {
30+
31+
private static final String REPOSITORY_NAME = "test-remore-store-repo";
32+
33+
@Override
34+
public Settings indexSettings() {
35+
return Settings.builder()
36+
.put(super.indexSettings())
37+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
38+
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
39+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
40+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
41+
.build();
42+
}
43+
44+
@Override
45+
protected Settings featureFlagSettings() {
46+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
47+
}
48+
49+
@Before
50+
public void setup() {
51+
internalCluster().startClusterManagerOnlyNode();
52+
Path absolutePath = randomRepoPath().toAbsolutePath();
53+
assertAcked(
54+
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
55+
);
56+
}
57+
58+
@After
59+
public void teardown() {
60+
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
61+
}
62+
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@
159159
import org.opensearch.index.store.Store.MetadataSnapshot;
160160
import org.opensearch.index.store.StoreFileMetadata;
161161
import org.opensearch.index.store.StoreStats;
162+
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
163+
import org.opensearch.index.translog.RemoteFsTranslog;
162164
import org.opensearch.index.translog.Translog;
163165
import org.opensearch.index.translog.TranslogConfig;
164166
import org.opensearch.index.translog.TranslogFactory;
@@ -2234,6 +2236,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
22342236
if (indexSettings.isRemoteStoreEnabled()) {
22352237
syncSegmentsFromRemoteSegmentStore(false);
22362238
}
2239+
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
2240+
syncRemoteTranslogAndUpdateGlobalCheckpoint();
2241+
}
22372242
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
22382243
final Engine newEngine = engineFactory.newReadWriteEngine(config);
22392244
onNewEngine(newEngine);
@@ -2520,10 +2525,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
25202525
storeRecovery.recoverFromStore(this, listener);
25212526
}
25222527

2523-
public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
2528+
public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
25242529
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
25252530
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
2526-
storeRecovery.recoverFromRemoteStore(this, repository, listener);
2531+
storeRecovery.recoverFromRemoteStore(this, listener);
25272532
}
25282533

25292534
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
@@ -3324,14 +3329,7 @@ public void startRecovery(
33243329
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
33253330
break;
33263331
case REMOTE_STORE:
3327-
final Repository remoteTranslogRepo;
3328-
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
3329-
if (remoteTranslogRepoName != null) {
3330-
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
3331-
} else {
3332-
remoteTranslogRepo = null;
3333-
}
3334-
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
3332+
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
33353333
break;
33363334
case PEER:
33373335
try {
@@ -4406,6 +4404,9 @@ public void close() throws IOException {
44064404
if (indexSettings.isRemoteStoreEnabled()) {
44074405
syncSegmentsFromRemoteSegmentStore(false);
44084406
}
4407+
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
4408+
syncRemoteTranslogAndUpdateGlobalCheckpoint();
4409+
}
44094410
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
44104411
onNewEngine(newEngineReference.get());
44114412
}
@@ -4439,6 +4440,18 @@ public void close() throws IOException {
44394440
onSettingsChanged();
44404441
}
44414442

4443+
private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
4444+
syncTranslogFilesFromRemoteTranslog();
4445+
loadGlobalCheckpointToReplicationTracker();
4446+
}
4447+
4448+
public void syncTranslogFilesFromRemoteTranslog() throws IOException {
4449+
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
4450+
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
4451+
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
4452+
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
4453+
}
4454+
44424455
/**
44434456
* Downloads segments from remote segment store. This method will download segments till
44444457
* last refresh checkpoint.

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,11 @@
6060
import org.opensearch.index.seqno.SequenceNumbers;
6161
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
6262
import org.opensearch.index.store.Store;
63-
import org.opensearch.index.translog.RemoteFsTranslog;
6463
import org.opensearch.index.translog.Translog;
65-
import org.opensearch.index.translog.transfer.FileTransferTracker;
66-
import org.opensearch.index.translog.transfer.TranslogTransferManager;
6764
import org.opensearch.indices.recovery.RecoveryState;
6865
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
6966
import org.opensearch.repositories.IndexId;
7067
import org.opensearch.repositories.Repository;
71-
import org.opensearch.repositories.blobstore.BlobStoreRepository;
7268

7369
import java.io.IOException;
7470
import java.util.Arrays;
@@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
118114
}
119115
}
120116

121-
void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
117+
void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
122118
if (canRecover(indexShard)) {
123119
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
124120
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
125121
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
126122
logger.debug("starting recovery from remote store ...");
127-
recoverFromRemoteStore(indexShard, repository);
123+
recoverFromRemoteStore(indexShard);
128124
return true;
129125
});
130126
} else {
@@ -441,7 +437,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
441437
});
442438
}
443439

444-
private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
440+
private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
445441
final Store remoteStore = indexShard.remoteStore();
446442
if (remoteStore == null) {
447443
throw new IndexShardRecoveryException(
@@ -462,8 +458,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
462458
if (store.directory().listAll().length == 0) {
463459
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
464460
}
465-
if (repository != null) {
466-
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
461+
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
462+
indexShard.syncTranslogFilesFromRemoteTranslog();
467463
} else {
468464
bootstrap(indexShard, store);
469465
}
@@ -482,19 +478,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
482478
}
483479
}
484480

485-
private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
486-
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
487-
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
488-
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
489-
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
490-
blobStoreRepository,
491-
indexShard.getThreadPool(),
492-
shardId,
493-
fileTransferTracker
494-
);
495-
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
496-
}
497-
498481
/**
499482
* Recovers the state of the shard from the store.
500483
*/

server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,8 @@ public Translog newTranslog(
7171
primaryModeSupplier
7272
);
7373
}
74+
75+
public Repository getRepository() {
76+
return repository;
77+
}
7478
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.index.translog.transfer.TranslogTransferManager;
2323
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
2424
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
25+
import org.opensearch.repositories.Repository;
2526
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2627
import org.opensearch.threadpool.ThreadPool;
2728

@@ -116,8 +117,20 @@ public RemoteFsTranslog(
116117
}
117118
}
118119

119-
public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
120+
public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException {
121+
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
122+
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
123+
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
124+
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
125+
blobStoreRepository,
126+
threadPool,
127+
shardId,
128+
fileTransferTracker
129+
);
130+
RemoteFsTranslog.download(translogTransferManager, location);
131+
}
120132

133+
public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
121134
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
122135
if (translogMetadata != null) {
123136
if (Files.notExists(location)) {

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2873,7 +2873,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
28732873
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
28742874
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
28752875
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
2876-
target.restoreFromRemoteStore(null, future);
2876+
target.restoreFromRemoteStore(future);
28772877
target.remoteStore().decRef();
28782878

28792879
assertTrue(future.actionGet());

0 commit comments

Comments
 (0)