Skip to content

Commit cded50f

Browse files
authored
Read the same medata file that is locked during restore of shallow snapshot (#10992)
Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 5966e76 commit cded50f

File tree

5 files changed

+94
-4
lines changed

5 files changed

+94
-4
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.nio.file.Path;
4444
import java.util.ArrayList;
4545
import java.util.Arrays;
46+
import java.util.List;
4647
import java.util.Optional;
4748
import java.util.concurrent.ExecutionException;
4849
import java.util.stream.Collectors;
@@ -617,4 +618,71 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
617618
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
618619
}
619620

621+
public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionException, InterruptedException {
622+
String indexName1 = "testindex1";
623+
String snapshotRepoName = "test-restore-snapshot-repo";
624+
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
625+
String snapshotName1 = "test-restore-snapshot1";
626+
Path absolutePath1 = randomRepoPath().toAbsolutePath();
627+
Path absolutePath2 = randomRepoPath().toAbsolutePath();
628+
String[] pathTokens = absolutePath1.toString().split("/");
629+
String basePath = pathTokens[pathTokens.length - 1];
630+
Arrays.copyOf(pathTokens, pathTokens.length - 1);
631+
Path location = PathUtils.get(String.join("/", pathTokens));
632+
pathTokens = absolutePath2.toString().split("/");
633+
String basePath2 = pathTokens[pathTokens.length - 1];
634+
Arrays.copyOf(pathTokens, pathTokens.length - 1);
635+
Path location2 = PathUtils.get(String.join("/", pathTokens));
636+
logger.info("Path 1 [{}]", absolutePath1);
637+
logger.info("Path 2 [{}]", absolutePath2);
638+
String restoredIndexName1 = indexName1 + "-restored";
639+
640+
createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true));
641+
642+
Client client = client();
643+
Settings indexSettings = Settings.builder()
644+
.put(super.indexSettings())
645+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
646+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
647+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
648+
.build();
649+
createIndex(indexName1, indexSettings);
650+
651+
int numDocsInIndex1 = randomIntBetween(2, 5);
652+
indexDocuments(client, indexName1, numDocsInIndex1);
653+
654+
ensureGreen(indexName1);
655+
656+
logger.info("--> snapshot");
657+
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
658+
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
659+
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
660+
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));
661+
662+
int extraNumDocsInIndex1 = randomIntBetween(20, 50);
663+
indexDocuments(client, indexName1, extraNumDocsInIndex1);
664+
refresh(indexName1);
665+
666+
client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
667+
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
668+
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
669+
.cluster()
670+
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
671+
.setWaitForCompletion(true)
672+
.setIndices(indexName1)
673+
.setRenamePattern(indexName1)
674+
.setRenameReplacement(restoredIndexName1)
675+
.setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated)
676+
.get();
677+
678+
assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0);
679+
ensureGreen(restoredIndexName1);
680+
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);
681+
682+
// indexing some new docs and validating
683+
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
684+
ensureGreen(restoredIndexName1);
685+
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
686+
}
687+
620688
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4917,8 +4917,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
49174917
remoteStore.incRef();
49184918
}
49194919
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
4920-
.initializeToSpecificCommit(primaryTerm, commitGeneration)
4921-
.getMetadata();
4920+
.getSegmentsUploadedToRemoteStore();
49224921
final Directory storeDirectory = store.directory();
49234922
store.incRef();
49244923

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,11 @@ void recoverFromSnapshotAndRemoteStore(
410410
indexUUID,
411411
shardId
412412
);
413+
sourceRemoteDirectory.initializeToSpecificCommit(
414+
primaryTerm,
415+
commitGeneration,
416+
recoverySource.snapshot().getSnapshotId().getUUID()
417+
);
413418
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
414419
final Store store = indexShard.store();
415420
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.index.store.lockmanager.FileLockInfo;
3535
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
3636
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
37+
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
3738
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
3839
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
3940
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
@@ -160,8 +161,9 @@ public RemoteSegmentMetadata init() throws IOException {
160161
*
161162
* @throws IOException if there were any failures in reading the metadata file
162163
*/
163-
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException {
164-
String metadataFile = getMetadataFileForCommit(primaryTerm, commitGeneration);
164+
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration, String acquirerId) throws IOException {
165+
String metadataFilePrefix = MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, commitGeneration);
166+
String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLock(metadataFilePrefix, acquirerId);
165167
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
166168
if (remoteSegmentMetadata != null) {
167169
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());

server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
import org.apache.lucene.store.IndexOutput;
1515
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
1616

17+
import java.io.FileNotFoundException;
1718
import java.io.IOException;
1819
import java.nio.file.NoSuchFileException;
1920
import java.util.Collection;
21+
import java.util.List;
2022
import java.util.Objects;
23+
import java.util.stream.Collectors;
2124

2225
/**
2326
* A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to
@@ -70,6 +73,19 @@ public void release(LockInfo lockInfo) throws IOException {
7073
}
7174
}
7275

76+
public String fetchLock(String filenamePrefix, String acquirerId) throws IOException {
77+
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix);
78+
List<String> lockFilesForAcquirer = lockFiles.stream()
79+
.filter(lockFile -> acquirerId.equals(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lockFile)))
80+
.map(FileLockInfo.LockFileUtils::getFileToLockNameFromLock)
81+
.collect(Collectors.toList());
82+
if (lockFilesForAcquirer.size() == 0) {
83+
throw new FileNotFoundException("No lock file found for prefix: " + filenamePrefix + " and acquirerId: " + acquirerId);
84+
}
85+
assert lockFilesForAcquirer.size() == 1;
86+
return lockFilesForAcquirer.get(0);
87+
}
88+
7389
/**
7490
* Checks whether a given file have any lock on it or not.
7591
* @param lockInfo File Lock Info instance for which we need to check if lock is acquired.

0 commit comments

Comments
 (0)