Skip to content

Commit 4268d7b

Browse files
Writable warm replica replication/recovery (#17390)
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com> (cherry picked from commit cb869c0) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent efde476 commit 4268d7b

File tree

13 files changed

+1773
-17
lines changed

13 files changed

+1773
-17
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import java.util.stream.Collectors;
5050

5151
import static java.util.Arrays.asList;
52-
import static org.opensearch.test.OpenSearchIntegTestCase.client;
53-
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
5452
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
5553

5654
public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java

Lines changed: 1665 additions & 0 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
import java.nio.file.Path;
1818

19-
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
20-
2119
/**
2220
* This class runs Segment Replication Integ test suite with remote store enabled.
2321
*/

server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
242242
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
243243
} else {
244244
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
245-
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
245+
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource
246+
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource;
246247
allocationId = updates.initializedPrimary.allocationId().getId();
247248
}
248249
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.Queue;
3434
import java.util.Set;
3535

36+
import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;
37+
3638
/**
3739
* A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations
3840
* for remote shards within the cluster.
@@ -345,7 +347,8 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
345347
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
346348
// to re-fetch any shard blocks from the repository.
347349
if (shard.primary()) {
348-
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false) {
350+
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false
351+
&& isPartialIndex(allocation.metadata().getIndexSafe(shard.index())) == false) {
349352
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
350353
}
351354
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,11 @@ public boolean shouldPeriodicallyFlush() {
405405
@Override
406406
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
407407
ensureOpen();
408+
// Skip flushing for indices with partial locality (warm indices)
409+
// For these indices, we don't need to commit as we will sync from the remote store on re-open
410+
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
411+
return;
412+
}
408413
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
409414
try (final ReleasableLock lock = readLock.acquire()) {
410415
ensureOpen();
@@ -494,7 +499,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
494499
latestSegmentInfos.changed();
495500
}
496501
try {
497-
commitSegmentInfos(latestSegmentInfos);
502+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
503+
commitSegmentInfos(latestSegmentInfos);
504+
}
498505
} catch (IOException e) {
499506
// mark the store corrupted unless we are closing as result of engine failure.
500507
// in this case Engine#failShard will handle store corruption.

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5148,7 +5148,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51485148
} else {
51495149
storeDirectory = store.directory();
51505150
}
5151-
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5151+
if (indexSettings.isStoreLocalityPartial() == false) {
5152+
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5153+
}
51525154

51535155
if (remoteSegmentMetadata != null) {
51545156
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
@@ -5164,7 +5166,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51645166
}
51655167
}
51665168
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5167-
: "There should not be any segments file in the dir";
5169+
|| indexSettings.isStoreLocalityPartial() : "There should not be any segments file in the dir";
51685170
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
51695171
}
51705172
syncSegmentSuccess = true;

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,12 @@
3535
import java.util.Arrays;
3636
import java.util.Collection;
3737
import java.util.HashSet;
38+
import java.util.List;
3839
import java.util.Set;
3940
import java.util.stream.Collectors;
41+
import java.util.stream.Stream;
42+
43+
import static org.apache.lucene.index.IndexFileNames.SEGMENTS;
4044

4145
/**
4246
* Composite Directory will contain both local and remote directory
@@ -75,12 +79,37 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
7579
);
7680
}
7781

82+
/**
83+
* Returns names of all files stored in local directory
84+
* @throws IOException in case of I/O error
85+
*/
86+
private String[] listLocalFiles() throws IOException {
87+
ensureOpen();
88+
logger.trace("Composite Directory[{}]: listLocalOnly() called", this::toString);
89+
return localDirectory.listAll();
90+
}
91+
92+
/**
93+
* Returns a list of names of all block files stored in the local directory for a given file,
94+
* including the original file itself if present.
95+
*
96+
* @param fileName The name of the file to search for, along with its associated block files.
97+
* @return A list of file names, including the original file (if present) and all its block files.
98+
* @throws IOException in case of I/O error while listing files.
99+
*/
100+
private List<String> listBlockFiles(String fileName) throws IOException {
101+
return Stream.of(listLocalFiles())
102+
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
103+
.collect(Collectors.toList());
104+
}
105+
78106
/**
79107
* Returns names of all files stored in this directory in sorted order
80108
* Does not include locally stored block files (having _block_ in their names) and files pending deletion
81109
*
82110
* @throws IOException in case of I/O error
83111
*/
112+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17527
84113
@Override
85114
public String[] listAll() throws IOException {
86115
ensureOpen();
@@ -106,6 +135,7 @@ public String[] listAll() throws IOException {
106135
* Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory
107136
* @param name the name of an existing file.
108137
* @throws IOException in case of I/O error
138+
* @throws NoSuchFileException when file does not exist in the directory
109139
*/
110140
@Override
111141
public void deleteFile(String name) throws IOException {
@@ -116,7 +146,21 @@ public void deleteFile(String name) throws IOException {
116146
} else if (Arrays.asList(listAll()).contains(name) == false) {
117147
throw new NoSuchFileException("File " + name + " not found in directory");
118148
} else {
119-
fileCache.remove(getFilePath(name));
149+
List<String> blockFiles = listBlockFiles(name);
150+
if (blockFiles.isEmpty()) {
151+
// Remove this condition when this issue is addressed.
152+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17526
153+
logger.debug("The file [{}] or its block files do not exist in local directory", name);
154+
} else {
155+
for (String blockFile : blockFiles) {
156+
if (fileCache.get(getFilePath(blockFile)) == null) {
157+
logger.debug("The file [{}] exists in local but not part of FileCache, deleting it from local", blockFile);
158+
localDirectory.deleteFile(blockFile);
159+
} else {
160+
fileCache.remove(getFilePath(blockFile));
161+
}
162+
}
163+
}
120164
}
121165
}
122166

@@ -255,6 +299,15 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
255299
public void close() throws IOException {
256300
ensureOpen();
257301
logger.trace("Composite Directory[{}]: close() called", this::toString);
302+
String[] localFiles = listLocalFiles();
303+
for (String localFile : localFiles) {
304+
// Delete segments_N file with ref count 1 created during index creation on replica shards
305+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17534
306+
if (localFile.startsWith(SEGMENTS)) {
307+
fileCache.remove(getFilePath(localFile));
308+
}
309+
}
310+
fileCache.prune();
258311
localDirectory.close();
259312
}
260313

server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
@ExperimentalApi
1919
public class FileTypeUtils {
2020

21+
public static String BLOCK_FILE_IDENTIFIER = "_block_";
22+
2123
public static boolean isTempFile(String name) {
2224
return name.endsWith(".tmp");
2325
}
2426

2527
public static boolean isBlockFile(String name) {
26-
return name.contains("_block_");
28+
return name.contains(BLOCK_FILE_IDENTIFIER);
2729
}
2830

2931
public static boolean isExtraFSFile(String name) {

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,20 @@ public CacheStats stats() {
310310
public void logCurrentState() {
311311
lock.lock();
312312
try {
313-
String allFiles = "\n";
313+
final StringBuilder allFiles = new StringBuilder("\n");
314314
for (Map.Entry<K, Node<K, V>> entry : data.entrySet()) {
315315
String path = entry.getKey().toString();
316316
String file = path.substring(path.lastIndexOf('/'));
317-
allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n";
317+
allFiles.append(file)
318+
.append(" [RefCount: ")
319+
.append(entry.getValue().refCount)
320+
.append(" , Weight: ")
321+
.append(entry.getValue().weight)
322+
.append(" ]\n");
323+
}
324+
if (allFiles.length() > 1) {
325+
logger.trace(() -> "Cache entries : " + allFiles);
318326
}
319-
logger.trace("Cache entries : " + allFiles);
320327
} finally {
321328
lock.unlock();
322329
}

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,11 @@ public CacheStats stats() {
190190
public void logCurrentState() {
191191
int i = 0;
192192
for (RefCountedCache<K, V> cache : table) {
193-
logger.trace("SegmentedCache " + i);
194-
((LRUCache<K, V>) cache).logCurrentState();
193+
if (cache.size() > 0) {
194+
final int segmentIndex = i;
195+
logger.trace(() -> "SegmentedCache " + segmentIndex);
196+
((LRUCache<K, V>) cache).logCurrentState();
197+
}
195198
i++;
196199
}
197200
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.io.IOException;
3838
import java.io.UncheckedIOException;
39+
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.Locale;
4142
import java.util.Set;
@@ -199,6 +200,12 @@ public void startReplication(ActionListener<Void> listener) {
199200
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
200201
cancellableThreads.checkForCancel();
201202
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
203+
204+
// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage
205+
// as replicas will sync all files from remote in case of failure.
206+
if (indexShard.indexSettings().isStoreLocalityPartial()) {
207+
return Collections.emptyList();
208+
}
202209
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
203210
// local files
204211
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,16 @@ public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTest
3939
private FSDirectory localDirectory;
4040
private CompositeDirectory compositeDirectory;
4141

42-
private final static String[] LOCAL_FILES = new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7", "temp_file.tmp" };
42+
private final static String[] LOCAL_FILES = new String[] {
43+
"_1.cfe",
44+
"_1.cfe_block_0",
45+
"_1.cfe_block_1",
46+
"_2.cfe",
47+
"_0.cfe_block_7",
48+
"_0.cfs_block_7",
49+
"temp_file.tmp" };
4350
private final static String FILE_PRESENT_LOCALLY = "_1.cfe";
51+
private final static String BLOCK_FILE_PRESENT_LOCALLY = "_1.cfe_block_0";
4452
private final static String FILE_PRESENT_IN_REMOTE_ONLY = "_0.si";
4553
private final static String NON_EXISTENT_FILE = "non_existent_file";
4654
private final static String NEW_FILE = "new_file";
@@ -67,9 +75,11 @@ public void testListAll() throws IOException {
6775

6876
public void testDeleteFile() throws IOException {
6977
assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
78+
assertTrue(existsInLocalDirectory(BLOCK_FILE_PRESENT_LOCALLY));
7079
// Delete the file and assert that it no more is a part of the directory
7180
compositeDirectory.deleteFile(FILE_PRESENT_LOCALLY);
7281
assertFalse(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
82+
assertFalse(existsInCompositeDirectory(BLOCK_FILE_PRESENT_LOCALLY));
7383
// Reading deleted file from directory should result in NoSuchFileException
7484
assertThrows(NoSuchFileException.class, () -> compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT));
7585
}

0 commit comments

Comments
 (0)