-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Writable Warm] Composite Directory implementation and integrating it with FileCache #12782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
sohami
merged 22 commits into
opensearch-project:main
from
rayshrey:composite-directory-poc
Jun 20, 2024
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
3c6ad60
Composite Directory POC
rayshrey 30ddaea
Refactor TransferManager interface to RemoteStoreFileTrackerAdapter
rayshrey a7361d2
Implement block level fetch for Composite Directory
rayshrey 54beed2
Removed CACHE state from FileTracker
rayshrey 87772f0
Fixes after latest pull
rayshrey 8fb76d1
Add new setting for warm, remove store type setting, FileTracker and …
rayshrey 932ad92
Modify TransferManager - replace BlobContainer with Functional Interf…
rayshrey 3e74318
Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockComposi…
rayshrey dca1a07
Modify constructors to avoid breaking public api contract and code re…
rayshrey 723aba0
Add experimental annotations for newly created classes and review com…
rayshrey 959f90c
Use ref count as a temporary measure to prevent file from eviction un…
rayshrey 2632738
Remove method level locks
rayshrey ba34798
Handle tmp file deletion
rayshrey 85003ee
Nit fixes
rayshrey de1895e
Handle delete and close in Composite Directory, log current state of …
rayshrey f9d880b
Refactor usages of WRITEABLE_REMOTE_INDEX_SETTING to TIERED_REMOTE_IN…
rayshrey e1b18ad
Add tests for FileCachedIndexInput and review comment fixes
rayshrey 5e008db
Add additional IT for feature flag disabled
rayshrey c1e6b18
Move setting for Partial Locality type behind Feature Flag, fix bug f…
rayshrey 74f9c29
Minor test and nit fixes
rayshrey 7fce3d3
Add javadocs for FullFileCachedIndexInput
rayshrey 901849b
Minor precommit fixes
rayshrey File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
160 changes: 160 additions & 0 deletions
160
server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
|
||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.FilterDirectory; | ||
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.opensearch.action.admin.indices.get.GetIndexRequest; | ||
import org.opensearch.action.admin.indices.get.GetIndexResponse; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.settings.SettingsException; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.index.shard.IndexShard; | ||
import org.opensearch.index.store.CompositeDirectory; | ||
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; | ||
import org.opensearch.index.store.remote.filecache.FileCache; | ||
import org.opensearch.index.store.remote.utils.FileTypeUtils; | ||
import org.opensearch.indices.IndicesService; | ||
import org.opensearch.node.Node; | ||
import org.opensearch.test.InternalTestCluster; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
|
||
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) | ||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) | ||
// Uncomment the below line to enable trace level logs for this test for better debugging | ||
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE") | ||
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase { | ||
|
||
protected static final String INDEX_NAME = "test-idx-1"; | ||
protected static final int NUM_DOCS_IN_BULK = 1000; | ||
|
||
/* | ||
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) | ||
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory | ||
*/ | ||
@Override | ||
protected boolean addMockIndexStorePlugin() { | ||
return false; | ||
} | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
Settings.Builder featureSettings = Settings.builder(); | ||
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); | ||
return featureSettings.build(); | ||
} | ||
|
||
public void testWritableWarmFeatureFlagDisabled() { | ||
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build(); | ||
InternalTestCluster internalTestCluster = internalCluster(); | ||
internalTestCluster.startClusterManagerOnlyNode(clusterSettings); | ||
internalTestCluster.startDataOnlyNode(clusterSettings); | ||
|
||
Settings indexSettings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) | ||
.build(); | ||
|
||
try { | ||
prepareCreate(INDEX_NAME).setSettings(indexSettings).get(); | ||
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled"); | ||
} catch (SettingsException ex) { | ||
assertEquals( | ||
"unknown setting [" | ||
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey() | ||
+ "] please check that any required plugins are installed, or check the " | ||
+ "breaking changes documentation for removed settings", | ||
ex.getMessage() | ||
); | ||
} | ||
} | ||
|
||
public void testWritableWarmBasic() throws Exception { | ||
InternalTestCluster internalTestCluster = internalCluster(); | ||
internalTestCluster.startClusterManagerOnlyNode(); | ||
internalTestCluster.startDataOnlyNode(); | ||
Settings settings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) | ||
.build(); | ||
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get()); | ||
|
||
// Verify from the cluster settings if the data locality is partial | ||
GetIndexResponse getIndexResponse = client().admin() | ||
.indices() | ||
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true)) | ||
.get(); | ||
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME); | ||
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); | ||
|
||
// Ingesting some docs | ||
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK); | ||
flushAndRefresh(INDEX_NAME); | ||
|
||
// ensuring cluster is green after performing force-merge | ||
ensureGreen(); | ||
|
||
SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); | ||
// Asserting that search returns same number of docs as ingested | ||
assertHitCount(searchResponse, NUM_DOCS_IN_BULK); | ||
|
||
// Ingesting docs again before force merge | ||
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK); | ||
flushAndRefresh(INDEX_NAME); | ||
|
||
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache(); | ||
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class) | ||
.indexService(resolveIndex(INDEX_NAME)) | ||
.getShardOrNull(0); | ||
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate()); | ||
|
||
// Force merging the index | ||
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll())); | ||
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get(); | ||
flushAndRefresh(INDEX_NAME); | ||
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll())); | ||
|
||
Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream() | ||
.filter(filesAfterMerge::contains) | ||
.filter(file -> !FileTypeUtils.isLockFile(file)) | ||
.filter(file -> !FileTypeUtils.isSegmentsFile(file)) | ||
.collect(Collectors.toUnmodifiableSet()); | ||
|
||
// Asserting that after merge all the files from previous gen are no more part of the directory | ||
assertTrue(filesFromPreviousGenStillPresent.isEmpty()); | ||
|
||
// Asserting that files from previous gen are not present in File Cache as well | ||
filesBeforeMerge.stream() | ||
.filter(file -> !FileTypeUtils.isLockFile(file)) | ||
.filter(file -> !FileTypeUtils.isSegmentsFile(file)) | ||
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file)))); | ||
|
||
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file | ||
// leaks | ||
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); | ||
fileCache.prune(); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.