Skip to content

Commit 3ce8970

Browse files
author
Sandeep Kumawat
committed
Fix for failure IT's
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent 37f1490 commit 3ce8970

File tree

7 files changed

+159
-546
lines changed

7 files changed

+159
-546
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.Collection;
4343
import java.util.List;
4444
import java.util.Map;
45-
import java.util.Objects;
4645
import java.util.Optional;
4746
import java.util.Set;
4847
import java.util.concurrent.CountDownLatch;
@@ -244,12 +243,4 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
244243
return closeable.get();
245244
}
246245
}
247-
248-
protected boolean warmIndexSegmentReplicationEnabled() {
249-
return Objects.equals(
250-
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
251-
IndexModule.DataLocalityType.PARTIAL.name()
252-
);
253-
}
254-
255246
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java

Lines changed: 136 additions & 523 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
370370
ensureOpen();
371371
// Skip flushing for indices with partial locality (warm indices)
372372
// For these indices, we don't need to commit as we will sync from the remote store on re-open
373-
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
373+
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
374374
return;
375375
}
376376
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
@@ -447,7 +447,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
447447
latestSegmentInfos.changed();
448448
}
449449
try {
450-
commitSegmentInfos(latestSegmentInfos);
450+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
451+
commitSegmentInfos(latestSegmentInfos);
452+
}
451453
} catch (IOException e) {
452454
// mark the store corrupted unless we are closing as result of engine failure.
453455
// in this case Engine#failShard will handle store corruption.

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5160,8 +5160,10 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51605160
store.deleteQuiet(file);
51615161
}
51625162
}
5163-
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5164-
: "There should not be any segments file in the dir";
5163+
if (indexSettings.isStoreLocalityPartial() == false) {
5164+
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5165+
: "There should not be any segments file in the dir";
5166+
}
51655167
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
51665168
}
51675169
syncSegmentSuccess = true;

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,14 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
8080
*
8181
* @throws IOException in case of I/O error
8282
*/
83+
// TODO: Revisit listAll() implementation, Check if we should include the remote files as well.
8384
@Override
8485
public String[] listAll() throws IOException {
8586
ensureOpen();
8687
logger.trace("Composite Directory[{}]: listAll() called", this::toString);
8788
String[] localFiles = localDirectory.listAll();
8889
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
89-
// String[] remoteFiles = getRemoteFiles();
90-
// allFiles.addAll(Arrays.asList(remoteFiles));
91-
logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
92-
// logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
90+
logger.trace("listAll Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
9391
Set<String> nonBlockLuceneFiles = allFiles.stream()
9492
.filter(file -> !FileTypeUtils.isBlockFile(file))
9593
.collect(Collectors.toUnmodifiableSet());
@@ -113,12 +111,17 @@ public void deleteFile(String name) throws IOException {
113111
if (FileTypeUtils.isTempFile(name)) {
114112
localDirectory.deleteFile(name);
115113
} else if (Arrays.asList(listAll()).contains(name) == false) {
116-
logger.debug("The file [{}] does not exist", name);
117-
// we should not fail here as localDirectory might not contain this file.
118-
// throw new NoSuchFileException("File " + name + " not found in directory");
114+
logger.debug("The file [{}] does not exist in local directory", name);
115+
// we should not throw exception in this case as localDirectory might not contain this file.
119116
} else {
120-
localDirectory.deleteFile(name);
121-
fileCache.remove(getFilePath(name));
117+
// It is possible that filecache doesn't have the file, but localdirectory contains the file. We will delete it from the
118+
// localDirectory.
119+
if (fileCache.get(getFilePath(name)) == null) {
120+
logger.info("The file [{}] exist in local but not part of FileCache, deleting it from local", name);
121+
localDirectory.deleteFile(name);
122+
} else {
123+
fileCache.remove(getFilePath(name));
124+
}
122125
}
123126
}
124127

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ public void logCurrentState() {
193193
if (cache.size() > 0) {
194194
logger.trace("SegmentedCache " + i);
195195
((LRUCache<K, V>) cache).logCurrentState();
196+
} else {
197+
logger.trace("SegmentedCache is empty");
196198
}
197199
i++;
198200
}

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void setup() throws IOException {
6161

6262
public void testListAll() throws IOException {
6363
String[] actualFileNames = compositeDirectory.listAll();
64-
String[] expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1", "temp_file.tmp" };
64+
String[] expectedFileNames = new String[] { "_1.cfe", "_2.cfe", "temp_file.tmp" };
6565
assertArrayEquals(expectedFileNames, actualFileNames);
6666
}
6767

0 commit comments

Comments
 (0)