Skip to content

Writable warm replica replication/recovery #17390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;

/**
* This class runs Segment Replication Integ test suite with remote store enabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
} else {
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource;
allocationId = updates.initializedPrimary.allocationId().getId();
}
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Queue;
import java.util.Set;

import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;

/**
* A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations
* for remote shards within the cluster.
Expand Down Expand Up @@ -345,7 +347,8 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
// to re-fetch any shard blocks from the repository.
if (shard.primary()) {
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false) {
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false
&& isPartialIndex(allocation.metadata().getIndexSafe(shard.index())) == false) {
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
// Skip flushing for indices with partial locality (warm indices)
// For these indices, we don't need to commit as we will sync from the remote store on re-open
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
return;

Check warning on line 374 in server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java#L374

Added line #L374 was not covered by tests
}
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
try (final ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Expand Down Expand Up @@ -442,7 +447,9 @@
latestSegmentInfos.changed();
}
try {
commitSegmentInfos(latestSegmentInfos);
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
commitSegmentInfos(latestSegmentInfos);
}
} catch (IOException e) {
// mark the store corrupted unless we are closing as result of engine failure.
// in this case Engine#failShard will handle store corruption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5142,7 +5142,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
if (indexSettings.isStoreLocalityPartial() == false) {
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
}

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand All @@ -5158,7 +5160,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
|| indexSettings.isStoreLocalityPartial() : "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.lucene.index.IndexFileNames.SEGMENTS;

/**
* Composite Directory will contain both local and remote directory
Expand Down Expand Up @@ -74,12 +78,37 @@
);
}

/**
* Returns names of all files stored in local directory
* @throws IOException in case of I/O error
*/
private String[] listLocalFiles() throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: listLocalOnly() called", this::toString);
return localDirectory.listAll();
}

/**
* Returns a list of names of all block files stored in the local directory for a given file,
* including the original file itself if present.
*
* @param fileName The name of the file to search for, along with its associated block files.
* @return A list of file names, including the original file (if present) and all its block files.
* @throws IOException in case of I/O error while listing files.
*/
private List<String> listBlockFiles(String fileName) throws IOException {
return Stream.of(listLocalFiles())
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
.collect(Collectors.toList());
}

/**
* Returns names of all files stored in this directory in sorted order
* Does not include locally stored block files (having _block_ in their names) and files pending deletion
*
* @throws IOException in case of I/O error
*/
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17527
@Override
public String[] listAll() throws IOException {
ensureOpen();
Expand All @@ -105,6 +134,7 @@
* Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException when file does not exist in the directory
*/
@Override
public void deleteFile(String name) throws IOException {
Expand All @@ -115,7 +145,21 @@
} else if (Arrays.asList(listAll()).contains(name) == false) {
throw new NoSuchFileException("File " + name + " not found in directory");
} else {
fileCache.remove(getFilePath(name));
List<String> blockFiles = listBlockFiles(name);
if (blockFiles.isEmpty()) {
// Remove this condition when this issue is addressed.
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17526
logger.debug("The file [{}] or its block files do not exist in local directory", name);

Check warning on line 152 in server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java#L152

Added line #L152 was not covered by tests
} else {
for (String blockFile : blockFiles) {
if (fileCache.get(getFilePath(blockFile)) == null) {
logger.debug("The file [{}] exists in local but not part of FileCache, deleting it from local", blockFile);
localDirectory.deleteFile(blockFile);

Check warning on line 157 in server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java#L156-L157

Added lines #L156 - L157 were not covered by tests
} else {
fileCache.remove(getFilePath(blockFile));
}
}
}
}
}

Expand Down Expand Up @@ -254,6 +298,15 @@
public void close() throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: close() called", this::toString);
String[] localFiles = listLocalFiles();
for (String localFile : localFiles) {
// Delete segments_N file with ref count 1 created during index creation on replica shards
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17534
if (localFile.startsWith(SEGMENTS)) {
fileCache.remove(getFilePath(localFile));

Check warning on line 306 in server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java#L306

Added line #L306 was not covered by tests
}
}
fileCache.prune();
localDirectory.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
@ExperimentalApi
public class FileTypeUtils {

public static String BLOCK_FILE_IDENTIFIER = "_block_";

public static boolean isTempFile(String name) {
return name.endsWith(".tmp");
}

public static boolean isBlockFile(String name) {
return name.contains("_block_");
return name.contains(BLOCK_FILE_IDENTIFIER);
}

public static boolean isExtraFSFile(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,20 @@
public void logCurrentState() {
lock.lock();
try {
String allFiles = "\n";
final StringBuilder allFiles = new StringBuilder("\n");

Check warning on line 313 in server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java#L313

Added line #L313 was not covered by tests
for (Map.Entry<K, Node<K, V>> entry : data.entrySet()) {
String path = entry.getKey().toString();
String file = path.substring(path.lastIndexOf('/'));
allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n";
allFiles.append(file)
.append(" [RefCount: ")
.append(entry.getValue().refCount)
.append(" , Weight: ")
.append(entry.getValue().weight)
.append(" ]\n");
}

Check warning on line 323 in server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java#L317-L323

Added lines #L317 - L323 were not covered by tests
if (allFiles.length() > 1) {
logger.trace(() -> "Cache entries : " + allFiles);

Check warning on line 325 in server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java#L325

Added line #L325 was not covered by tests
}
logger.trace("Cache entries : " + allFiles);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@
public void logCurrentState() {
int i = 0;
for (RefCountedCache<K, V> cache : table) {
logger.trace("SegmentedCache " + i);
((LRUCache<K, V>) cache).logCurrentState();
if (cache.size() > 0) {
final int segmentIndex = i;
logger.trace(() -> "SegmentedCache " + segmentIndex);
((LRUCache<K, V>) cache).logCurrentState();

Check warning on line 196 in server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java#L194-L196

Added lines #L194 - L196 were not covered by tests
}
i++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -202,6 +203,12 @@
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);

// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage
// as replicas will sync all files from remote in case of failure.
if (indexShard.indexSettings().isStoreLocalityPartial()) {
return Collections.emptyList();

Check warning on line 210 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java#L210

Added line #L210 was not covered by tests
}
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
// local files
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,16 @@ public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTest
private FSDirectory localDirectory;
private CompositeDirectory compositeDirectory;

private final static String[] LOCAL_FILES = new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7", "temp_file.tmp" };
private final static String[] LOCAL_FILES = new String[] {
"_1.cfe",
"_1.cfe_block_0",
"_1.cfe_block_1",
"_2.cfe",
"_0.cfe_block_7",
"_0.cfs_block_7",
"temp_file.tmp" };
private final static String FILE_PRESENT_LOCALLY = "_1.cfe";
private final static String BLOCK_FILE_PRESENT_LOCALLY = "_1.cfe_block_0";
private final static String FILE_PRESENT_IN_REMOTE_ONLY = "_0.si";
private final static String NON_EXISTENT_FILE = "non_existent_file";
private final static String NEW_FILE = "new_file";
Expand All @@ -67,9 +75,11 @@ public void testListAll() throws IOException {

public void testDeleteFile() throws IOException {
assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
assertTrue(existsInLocalDirectory(BLOCK_FILE_PRESENT_LOCALLY));
// Delete the file and assert that it no more is a part of the directory
compositeDirectory.deleteFile(FILE_PRESENT_LOCALLY);
assertFalse(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
assertFalse(existsInCompositeDirectory(BLOCK_FILE_PRESENT_LOCALLY));
// Reading deleted file from directory should result in NoSuchFileException
assertThrows(NoSuchFileException.class, () -> compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT));
}
Expand Down
Loading