Skip to content

Commit 607f6ae

Browse files
committed
Minor test and nit fixes
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
1 parent 353cbbd commit 607f6ae

File tree

9 files changed

+48
-23
lines changed

9 files changed

+48
-23
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.index.IndexModule;
2424
import org.opensearch.index.query.QueryBuilders;
2525
import org.opensearch.index.shard.IndexShard;
26+
import org.opensearch.index.store.CompositeDirectory;
2627
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
2728
import org.opensearch.index.store.remote.filecache.FileCache;
2829
import org.opensearch.index.store.remote.utils.FileTypeUtils;
@@ -146,6 +147,12 @@ public void testWritableWarmBasic() throws Exception {
146147
// Asserting that after merge all the files from previous gen are no more part of the directory
147148
assertTrue(filesFromPreviousGenStillPresent.isEmpty());
148149

150+
// Asserting that files from previous gen are not present in File Cache as well
151+
filesBeforeMerge.stream()
152+
.filter(file -> !FileTypeUtils.isLockFile(file))
153+
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
154+
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file))));
155+
149156
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
150157
// leaks
151158
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1688,5 +1688,4 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster
16881688
}
16891689

16901690
}
1691-
16921691
}

server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public CloseableFilterIndexOutput(IndexOutput out, String fileName, OnCloseListe
4444

4545
@Override
4646
public void close() throws IOException {
47-
super.close();
4847
if (isClosed.get() == false) {
48+
super.close();
4949
onCloseListener.onClose(fileName);
5050
isClosed.set(true);
5151
}

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.opensearch.common.lucene.store.InputStreamIndexInput;
2222
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
2323
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
24+
import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput;
2425
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
2526
import org.opensearch.index.store.remote.filecache.FileCache;
26-
import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInputImpl;
2727
import org.opensearch.index.store.remote.utils.BlockIOContext;
2828
import org.opensearch.index.store.remote.utils.FileTypeUtils;
2929
import org.opensearch.index.store.remote.utils.TransferManager;
@@ -116,7 +116,7 @@ public void deleteFile(String name) throws IOException {
116116
} else if (Arrays.asList(listAll()).contains(name) == false) {
117117
throw new NoSuchFileException("File " + name + " not found in directory");
118118
} else {
119-
fileCache.remove(localDirectory.getDirectory().resolve(name));
119+
fileCache.remove(getFilePath(name));
120120
}
121121
}
122122

@@ -131,7 +131,7 @@ public long fileLength(String name) throws IOException {
131131
ensureOpen();
132132
logger.trace("Composite Directory[{}]: fileLength() called {}", this::toString, () -> name);
133133
long fileLength;
134-
Path key = localDirectory.getDirectory().resolve(name);
134+
Path key = getFilePath(name);
135135
if (FileTypeUtils.isTempFile(name) || fileCache.get(key) != null) {
136136
try {
137137
fileLength = localDirectory.fileLength(name);
@@ -194,7 +194,7 @@ public void rename(String source, String dest) throws IOException {
194194
ensureOpen();
195195
logger.trace("Composite Directory[{}]: rename() called : source-{}, dest-{}", this::toString, () -> source, () -> dest);
196196
localDirectory.rename(source, dest);
197-
fileCache.remove(localDirectory.getDirectory().resolve(source));
197+
fileCache.remove(getFilePath(source));
198198
cacheFile(dest);
199199
}
200200

@@ -215,7 +215,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
215215
return localDirectory.openInput(name, context);
216216
}
217217
// Return directly from the FileCache (via TransferManager) if complete file is present
218-
Path key = localDirectory.getDirectory().resolve(name);
218+
Path key = getFilePath(name);
219219
CachedIndexInput indexInput = fileCache.get(key);
220220
if (indexInput != null) {
221221
logger.trace("Composite Directory[{}]: Complete file {} found in FileCache", this::toString, () -> name);
@@ -281,8 +281,13 @@ Uncomment the below commented line(to remove the file from cache once uploaded)
281281
this::toString,
282282
() -> file
283283
);
284-
fileCache.decRef(localDirectory.getDirectory().resolve(file));
285-
// fileCache.remove(localDirectory.getDirectory().resolve(fileName));
284+
fileCache.decRef(getFilePath(file));
285+
// fileCache.remove(getFilePath(fileName));
286+
}
287+
288+
// Visibility public since we need it in IT tests
289+
public Path getFilePath(String name) {
290+
return localDirectory.getDirectory().resolve(name);
286291
}
287292

288293
/**
@@ -327,13 +332,13 @@ private String[] getRemoteFiles() throws IOException {
327332
}
328333

329334
private void cacheFile(String name) throws IOException {
330-
Path filePath = localDirectory.getDirectory().resolve(name);
335+
Path filePath = getFilePath(name);
331336
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
332337
// so that it can be evicted after that
333338
// this is just a temporary solution, will pin the file once support for that is added in FileCache
334339
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
335340
// successfully uploaded to Remote
336-
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
341+
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
337342
}
338343

339344
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* @opensearch.experimental
2222
*/
2323
@ExperimentalApi
24-
public class FullFileCachedIndexInputImpl implements CachedIndexInput {
24+
public class CachedFullFileIndexInput implements CachedIndexInput {
2525
private final FileCache fileCache;
2626
private final Path path;
2727
private final FullFileCachedIndexInput fullFileCachedIndexInput;
@@ -30,7 +30,7 @@ public class FullFileCachedIndexInputImpl implements CachedIndexInput {
3030
/**
3131
* Constructor - takes IndexInput as parameter
3232
*/
33-
public FullFileCachedIndexInputImpl(FileCache fileCache, Path path, IndexInput indexInput) {
33+
public CachedFullFileIndexInput(FileCache fileCache, Path path, IndexInput indexInput) {
3434
this.fileCache = fileCache;
3535
this.path = path;
3636
fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, path, indexInput);

server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public void close() throws IOException {
7171
clones.forEach(indexInput -> {
7272
try {
7373
indexInput.close();
74-
} catch (IOException e) {
75-
throw new RuntimeException(e);
74+
} catch (Exception e) {
75+
logger.trace("Exception while closing clone - {}", e.getMessage());
7676
}
7777
});
7878
try {
@@ -81,6 +81,7 @@ public void close() throws IOException {
8181
logger.trace("FullFileCachedIndexInput already closed");
8282
}
8383
luceneIndexInput = null;
84+
clones.clear();
8485
closed = true;
8586
}
8687
}

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,13 @@ public void testClose() throws IOException {
157157

158158
public void testAfterSyncToRemote() throws IOException {
159159
// File will be present locally until uploaded to Remote
160-
assertTrue(existsInLocalDirectory("_1.cfe"));
161-
compositeDirectory.afterSyncToRemote("_1.cfe");
160+
assertTrue(existsInLocalDirectory(FILE_PRESENT_LOCALLY));
161+
compositeDirectory.afterSyncToRemote(FILE_PRESENT_LOCALLY);
162162
fileCache.prune();
163163
// After uploading to Remote, refCount will be decreased by 1 making it 0 and will be evicted if cache is pruned
164-
assertFalse(existsInLocalDirectory("_1.cfe"));
164+
assertFalse(existsInLocalDirectory(FILE_PRESENT_LOCALLY));
165+
// Asserting file is not present in FileCache
166+
assertNull(fileCache.get(getFilePath(FILE_PRESENT_LOCALLY)));
165167
}
166168

167169
private void addFilesToDirectory(String[] files) throws IOException {

server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void setup() throws IOException {
4646

4747
protected void setupIndexInputAndAddToFileCache() {
4848
fileCachedIndexInput = new FileCachedIndexInput(fileCache, filePath, underlyingIndexInput);
49-
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, fileCachedIndexInput));
49+
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fileCachedIndexInput));
5050
}
5151

5252
public void testClone() throws IOException {

server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.store.remote.filecache;
1010

11+
import org.apache.lucene.store.AlreadyClosedException;
1112
import org.apache.lucene.store.IndexInput;
1213

1314
import java.io.IOException;
@@ -18,16 +19,20 @@ public class FullFileCachedIndexInputTests extends FileCachedIndexInputTests {
1819
@Override
1920
protected void setupIndexInputAndAddToFileCache() {
2021
fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, filePath, underlyingIndexInput);
21-
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, fullFileCachedIndexInput));
22+
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fullFileCachedIndexInput));
2223
}
2324

2425
@Override
2526
public void testClone() throws IOException {
2627
setupIndexInputAndAddToFileCache();
2728

28-
// Since the file ia already in cache and has refCount 1, activeUsage and totalUsage will be same
29+
// Since the file is already in cache and has refCount 1, activeUsage and totalUsage will be same
2930
assertTrue(isActiveAndTotalUsageSame());
3031

32+
// Getting the file cache entry (which wil increase the ref count, hence doing dec ref immediately afterwards)
33+
CachedIndexInput cachedIndexInput = fileCache.get(filePath);
34+
fileCache.decRef(filePath);
35+
3136
// Decrementing the refCount explicitly on the file which will make it inactive (as refCount will drop to 0)
3237
fileCache.decRef(filePath);
3338
assertFalse(isActiveAndTotalUsageSame());
@@ -38,9 +43,15 @@ public void testClone() throws IOException {
3843
FileCachedIndexInput clonedFileCachedIndexInput3 = clonedFileCachedIndexInput2.clone();
3944
assertTrue(isActiveAndTotalUsageSame());
4045

41-
// Closing the parent will close all the clones decreasing the refCount to 0
42-
fullFileCachedIndexInput.close();
46+
// closing the first level clone will close all subsequent level clones and reduce ref count to 0
47+
clonedFileCachedIndexInput1.close();
4348
assertFalse(isActiveAndTotalUsageSame());
49+
50+
fileCache.prune();
51+
52+
// since the file cache entry was evicted the corresponding CachedIndexInput will be closed and will throw exception when trying to
53+
// read the index input
54+
assertThrows(AlreadyClosedException.class, cachedIndexInput::getIndexInput);
4455
}
4556

4657
@Override

0 commit comments

Comments
 (0)