Skip to content

Commit 17b43da

Browse files
author
Sandeep Kumawat
committed
Fix for failure IT's
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent 37f1490 commit 17b43da

File tree

11 files changed

+219
-562
lines changed

11 files changed

+219
-562
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.Collection;
4343
import java.util.List;
4444
import java.util.Map;
45-
import java.util.Objects;
4645
import java.util.Optional;
4746
import java.util.Set;
4847
import java.util.concurrent.CountDownLatch;
@@ -244,12 +243,4 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
244243
return closeable.get();
245244
}
246245
}
247-
248-
protected boolean warmIndexSegmentReplicationEnabled() {
249-
return Objects.equals(
250-
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
251-
IndexModule.DataLocalityType.PARTIAL.name()
252-
);
253-
}
254-
255246
}
Lines changed: 129 additions & 530 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
348348
// to re-fetch any shard blocks from the repository.
349349
if (shard.primary()) {
350350
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false
351-
&& !isPartialIndex(allocation.metadata().getIndexSafe(shard.index()))) {
351+
&& isPartialIndex(allocation.metadata().getIndexSafe(shard.index())) == false) {
352352
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
353353
}
354354
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
370370
ensureOpen();
371371
// Skip flushing for indices with partial locality (warm indices)
372372
// For these indices, we don't need to commit as we will sync from the remote store on re-open
373-
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
373+
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
374374
return;
375375
}
376376
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
@@ -447,7 +447,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
447447
latestSegmentInfos.changed();
448448
}
449449
try {
450-
commitSegmentInfos(latestSegmentInfos);
450+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
451+
commitSegmentInfos(latestSegmentInfos);
452+
}
451453
} catch (IOException e) {
452454
// mark the store corrupted unless we are closing as result of engine failure.
453455
// in this case Engine#failShard will handle store corruption.

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5110,8 +5110,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
51105110
*/
51115111
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
51125112
boolean syncSegmentSuccess = false;
5113-
// For warm indices, it doesn't make sense to check for local existence of files.
5114-
overrideLocal = overrideLocal && indexSettings.isStoreLocalityPartial() == false;
51155113
long startTimeMs = System.currentTimeMillis();
51165114
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
51175115
logger.trace("Downloading segments from remote segment store");
@@ -5161,7 +5159,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51615159
}
51625160
}
51635161
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5164-
: "There should not be any segments file in the dir";
5162+
|| indexSettings.isStoreLocalityPartial() : "There should not be any segments file in the dir";
51655163
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
51665164
}
51675165
syncSegmentSuccess = true;

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@
3434
import java.util.Arrays;
3535
import java.util.Collection;
3636
import java.util.HashSet;
37+
import java.util.List;
3738
import java.util.Set;
3839
import java.util.stream.Collectors;
40+
import java.util.stream.Stream;
41+
42+
import static org.apache.lucene.index.IndexFileNames.SEGMENTS;
3943

4044
/**
4145
* Composite Directory will contain both local and remote directory
@@ -74,22 +78,47 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
7478
);
7579
}
7680

81+
/**
82+
* Returns names of all files stored in local directory
83+
* @throws IOException in case of I/O error
84+
*/
85+
private String[] listLocalFiles() throws IOException {
86+
ensureOpen();
87+
logger.trace("Composite Directory[{}]: listLocalOnly() called", this::toString);
88+
return localDirectory.listAll();
89+
}
90+
91+
/**
92+
* Returns a list of names of all block files stored in the local directory for a given file,
93+
* including the original file itself if present.
94+
*
95+
* @param fileName The name of the file to search for, along with its associated block files.
96+
* @return A list of file names, including the original file (if present) and all its block files.
97+
* @throws IOException in case of I/O error while listing files.
98+
*/
99+
private List<String> listBlockFiles(String fileName) throws IOException {
100+
return Stream.of(listLocalFiles())
101+
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
102+
.collect(Collectors.toList());
103+
}
104+
77105
/**
78106
* Returns names of all files stored in this directory in sorted order
79107
* Does not include locally stored block files (having _block_ in their names) and files pending deletion
80108
*
81109
* @throws IOException in case of I/O error
82110
*/
111+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17527
83112
@Override
84113
public String[] listAll() throws IOException {
85114
ensureOpen();
86115
logger.trace("Composite Directory[{}]: listAll() called", this::toString);
87116
String[] localFiles = localDirectory.listAll();
88117
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
89-
// String[] remoteFiles = getRemoteFiles();
90-
// allFiles.addAll(Arrays.asList(remoteFiles));
118+
String[] remoteFiles = getRemoteFiles();
119+
allFiles.addAll(Arrays.asList(remoteFiles));
91120
logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
92-
// logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
121+
logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
93122
Set<String> nonBlockLuceneFiles = allFiles.stream()
94123
.filter(file -> !FileTypeUtils.isBlockFile(file))
95124
.collect(Collectors.toUnmodifiableSet());
@@ -105,6 +134,7 @@ public String[] listAll() throws IOException {
105134
* Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory
106135
* @param name the name of an existing file.
107136
* @throws IOException in case of I/O error
137+
* @throws NoSuchFileException when file does not exist in the directory
108138
*/
109139
@Override
110140
public void deleteFile(String name) throws IOException {
@@ -113,12 +143,22 @@ public void deleteFile(String name) throws IOException {
113143
if (FileTypeUtils.isTempFile(name)) {
114144
localDirectory.deleteFile(name);
115145
} else if (Arrays.asList(listAll()).contains(name) == false) {
116-
logger.debug("The file [{}] does not exist", name);
117-
// we should not fail here as localDirectory might not contain this file.
118-
// throw new NoSuchFileException("File " + name + " not found in directory");
146+
throw new NoSuchFileException("File " + name + " not found in directory");
119147
} else {
120-
localDirectory.deleteFile(name);
121-
fileCache.remove(getFilePath(name));
148+
List<String> blockFiles = listBlockFiles(name);
149+
if (blockFiles.isEmpty()) {
150+
logger.debug("The file [{}] or its block files do not exist in local directory", name);
151+
// we should not throw exception in this case as localDirectory might not contain this file.
152+
} else {
153+
for (String blockFile : blockFiles) {
154+
if (fileCache.get(getFilePath(blockFile)) == null) {
155+
logger.debug("The file [{}] exists in local but not part of FileCache, deleting it from local", blockFile);
156+
localDirectory.deleteFile(blockFile);
157+
} else {
158+
fileCache.remove(getFilePath(blockFile));
159+
}
160+
}
161+
}
122162
}
123163
}
124164

@@ -257,6 +297,15 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
257297
public void close() throws IOException {
258298
ensureOpen();
259299
logger.trace("Composite Directory[{}]: close() called", this::toString);
300+
String[] localFiles = listLocalFiles();
301+
for (String localFile : localFiles) {
302+
// Delete the segments_N file that gets created with ref count 1 when index got created. Ideally this should not get created on
303+
// replica
304+
if (localFile.startsWith(SEGMENTS)) {
305+
fileCache.remove(getFilePath(localFile));
306+
}
307+
}
308+
fileCache.prune();
260309
localDirectory.close();
261310
}
262311

server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
@ExperimentalApi
1919
public class FileTypeUtils {
2020

21+
public static String BLOCK_FILE_IDENTIFIER = "_block_";
22+
2123
public static boolean isTempFile(String name) {
2224
return name.endsWith(".tmp");
2325
}
2426

2527
public static boolean isBlockFile(String name) {
26-
return name.contains("_block_");
28+
return name.contains(BLOCK_FILE_IDENTIFIER);
2729
}
2830

2931
public static boolean isExtraFSFile(String name) {

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,14 +310,19 @@ public CacheStats stats() {
310310
public void logCurrentState() {
311311
lock.lock();
312312
try {
313-
String allFiles = "\n";
313+
final StringBuilder allFiles = new StringBuilder("\n");
314314
for (Map.Entry<K, Node<K, V>> entry : data.entrySet()) {
315315
String path = entry.getKey().toString();
316316
String file = path.substring(path.lastIndexOf('/'));
317-
allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n";
317+
allFiles.append(file)
318+
.append(" [RefCount: ")
319+
.append(entry.getValue().refCount)
320+
.append(" , Weight: ")
321+
.append(entry.getValue().weight)
322+
.append(" ]\n");
318323
}
319-
if (allFiles.equals("\n") == false) {
320-
logger.debug("Cache entries : {}", allFiles);
324+
if (allFiles.length() > 1) {
325+
logger.trace(() -> "Cache entries : " + allFiles);
321326
}
322327
} finally {
323328
lock.unlock();

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ public void logCurrentState() {
191191
int i = 0;
192192
for (RefCountedCache<K, V> cache : table) {
193193
if (cache.size() > 0) {
194-
logger.trace("SegmentedCache " + i);
194+
final int segmentIndex = i;
195+
logger.trace(() -> "SegmentedCache " + segmentIndex);
195196
((LRUCache<K, V>) cache).logCurrentState();
196197
}
197198
i++;

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,8 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
202202
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
203203

204204
// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage
205-
// as they can will sync all files from remote in case failure.
206-
if (indexShard.indexSettings().isStoreLocalityPartial() == true) {
205+
// as replicas will sync all files from remote in case of failure.
206+
if (indexShard.indexSettings().isStoreLocalityPartial()) {
207207
return Collections.emptyList();
208208
}
209209
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,16 @@ public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTest
3939
private FSDirectory localDirectory;
4040
private CompositeDirectory compositeDirectory;
4141

42-
private final static String[] LOCAL_FILES = new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7", "temp_file.tmp" };
42+
private final static String[] LOCAL_FILES = new String[] {
43+
"_1.cfe",
44+
"_1.cfe_block_0",
45+
"_1.cfe_block_1",
46+
"_2.cfe",
47+
"_0.cfe_block_7",
48+
"_0.cfs_block_7",
49+
"temp_file.tmp" };
4350
private final static String FILE_PRESENT_LOCALLY = "_1.cfe";
51+
private final static String BLOCK_FILE_PRESENT_LOCALLY = "_1.cfe_block_0";
4452
private final static String FILE_PRESENT_IN_REMOTE_ONLY = "_0.si";
4553
private final static String NON_EXISTENT_FILE = "non_existent_file";
4654
private final static String NEW_FILE = "new_file";
@@ -67,9 +75,11 @@ public void testListAll() throws IOException {
6775

6876
public void testDeleteFile() throws IOException {
6977
assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
78+
assertTrue(existsInLocalDirectory(BLOCK_FILE_PRESENT_LOCALLY));
7079
// Delete the file and assert that it no more is a part of the directory
7180
compositeDirectory.deleteFile(FILE_PRESENT_LOCALLY);
7281
assertFalse(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
82+
assertFalse(existsInCompositeDirectory(BLOCK_FILE_PRESENT_LOCALLY));
7383
// Reading deleted file from directory should result in NoSuchFileException
7484
assertThrows(NoSuchFileException.class, () -> compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT));
7585
}

0 commit comments

Comments
 (0)