Skip to content

Commit 37f1490

Browse files
author
Sandeep Kumawat
committed
Writable warm replica relocation/recovery
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent e7ac072 commit 37f1490

File tree

11 files changed

+2133
-14
lines changed

11 files changed

+2133
-14
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,14 @@
4242
import java.util.Collection;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.Objects;
4546
import java.util.Optional;
4647
import java.util.Set;
4748
import java.util.concurrent.CountDownLatch;
4849
import java.util.concurrent.TimeUnit;
4950
import java.util.stream.Collectors;
5051

5152
import static java.util.Arrays.asList;
52-
import static org.opensearch.test.OpenSearchIntegTestCase.client;
53-
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
5453
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
5554

5655
public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
@@ -245,4 +244,12 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
245244
return closeable.get();
246245
}
247246
}
247+
248+
protected boolean warmIndexSegmentReplicationEnabled() {
249+
return Objects.equals(
250+
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
251+
IndexModule.DataLocalityType.PARTIAL.name()
252+
);
253+
}
254+
248255
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java

Lines changed: 2087 additions & 0 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
import java.nio.file.Path;
1818

19-
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
20-
2119
/**
2220
* This class runs Segment Replication Integ test suite with remote store enabled.
2321
*/

server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
242242
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
243243
} else {
244244
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
245-
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
245+
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource
246+
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource;
246247
allocationId = updates.initializedPrimary.allocationId().getId();
247248
}
248249
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.Queue;
3434
import java.util.Set;
3535

36+
import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;
37+
3638
/**
3739
* A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations
3840
* for remote shards within the cluster.
@@ -345,7 +347,8 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
345347
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
346348
// to re-fetch any shard blocks from the repository.
347349
if (shard.primary()) {
348-
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false) {
350+
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false
351+
&& !isPartialIndex(allocation.metadata().getIndexSafe(shard.index()))) {
349352
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
350353
}
351354
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,11 @@ public boolean shouldPeriodicallyFlush() {
368368
@Override
369369
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
370370
ensureOpen();
371+
// Skip flushing for indices with partial locality (warm indices)
372+
// For these indices, we don't need to commit as we will sync from the remote store on re-open
373+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
374+
return;
375+
}
371376
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
372377
try (final ReleasableLock lock = readLock.acquire()) {
373378
ensureOpen();

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5110,6 +5110,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
51105110
*/
51115111
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
51125112
boolean syncSegmentSuccess = false;
5113+
// For warm indices, it doesn't make sense to check for local existence of files.
5114+
overrideLocal = overrideLocal && indexSettings.isStoreLocalityPartial() == false;
51135115
long startTimeMs = System.currentTimeMillis();
51145116
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
51155117
logger.trace("Downloading segments from remote segment store");
@@ -5141,7 +5143,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51415143
} else {
51425144
storeDirectory = store.directory();
51435145
}
5144-
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5146+
if (indexSettings.isStoreLocalityPartial() == false) {
5147+
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5148+
}
51455149

51465150
if (remoteSegmentMetadata != null) {
51475151
final SegmentInfos infosSnapshot = store.buildSegmentInfos(

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ public String[] listAll() throws IOException {
8686
logger.trace("Composite Directory[{}]: listAll() called", this::toString);
8787
String[] localFiles = localDirectory.listAll();
8888
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
89-
String[] remoteFiles = getRemoteFiles();
90-
allFiles.addAll(Arrays.asList(remoteFiles));
89+
// String[] remoteFiles = getRemoteFiles();
90+
// allFiles.addAll(Arrays.asList(remoteFiles));
9191
logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
92-
logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
92+
// logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
9393
Set<String> nonBlockLuceneFiles = allFiles.stream()
9494
.filter(file -> !FileTypeUtils.isBlockFile(file))
9595
.collect(Collectors.toUnmodifiableSet());
@@ -113,8 +113,11 @@ public void deleteFile(String name) throws IOException {
113113
if (FileTypeUtils.isTempFile(name)) {
114114
localDirectory.deleteFile(name);
115115
} else if (Arrays.asList(listAll()).contains(name) == false) {
116-
throw new NoSuchFileException("File " + name + " not found in directory");
116+
logger.debug("The file [{}] does not exist", name);
117+
// we should not fail here as localDirectory might not contain this file.
118+
// throw new NoSuchFileException("File " + name + " not found in directory");
117119
} else {
120+
localDirectory.deleteFile(name);
118121
fileCache.remove(getFilePath(name));
119122
}
120123
}

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,9 @@ public void logCurrentState() {
316316
String file = path.substring(path.lastIndexOf('/'));
317317
allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n";
318318
}
319-
logger.trace("Cache entries : " + allFiles);
319+
if (allFiles.equals("\n") == false) {
320+
logger.debug("Cache entries : {}", allFiles);
321+
}
320322
} finally {
321323
lock.unlock();
322324
}

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,10 @@ public CacheStats stats() {
190190
public void logCurrentState() {
191191
int i = 0;
192192
for (RefCountedCache<K, V> cache : table) {
193-
logger.trace("SegmentedCache " + i);
194-
((LRUCache<K, V>) cache).logCurrentState();
193+
if (cache.size() > 0) {
194+
logger.trace("SegmentedCache " + i);
195+
((LRUCache<K, V>) cache).logCurrentState();
196+
}
195197
i++;
196198
}
197199
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.io.IOException;
3838
import java.io.UncheckedIOException;
39+
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.Locale;
4142
import java.util.Set;
@@ -199,6 +200,12 @@ public void startReplication(ActionListener<Void> listener) {
199200
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
200201
cancellableThreads.checkForCancel();
201202
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
203+
204+
// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage
205+
// as they can will sync all files from remote in case failure.
206+
if (indexShard.indexSettings().isStoreLocalityPartial() == true) {
207+
return Collections.emptyList();
208+
}
202209
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
203210
// local files
204211
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());

0 commit comments

Comments
 (0)