Skip to content

Commit 3d0846e

Browse files
authored
[Backport 2.17] Add prefix support to hashed prefix & infix path types on remote store (#15669)
--------- Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent bf946d9 commit 3d0846e

File tree

37 files changed

+646
-99
lines changed

37 files changed

+646
-99
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4343
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
4444
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))
4545
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
46+
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
4647
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
4748
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
4849
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.core.util.FileSystemUtils;
2020
import org.opensearch.index.remote.RemoteSegmentStats;
2121
import org.opensearch.index.translog.RemoteTranslogStats;
22+
import org.opensearch.indices.RemoteStoreSettings;
2223
import org.opensearch.test.InternalTestCluster;
2324
import org.opensearch.test.OpenSearchIntegTestCase;
2425

@@ -67,14 +68,16 @@ public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
6768
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
6869
}
6970

71+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
7072
assertBusy(() -> {
7173
String shardPath = getShardLevelBlobPath(
7274
client(),
7375
indexName,
7476
new BlobPath(),
7577
String.valueOf(shardRouting.getId()),
7678
SEGMENTS,
77-
DATA
79+
DATA,
80+
segmentsPathFixedPrefix
7881
).buildAsString();
7982
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
8083
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,13 +443,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
443443

444444
void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
445445
Client client = client();
446-
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
446+
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
447+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
448+
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix).buildAsString();
447449
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
448-
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
450+
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix).buildAsString();
449451
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
450-
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
452+
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA, segmentsPathFixedPrefix).buildAsString();
451453
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
452-
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
454+
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix).buildAsString();
453455
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);
454456

455457
try (

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,16 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
205205
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
206206
int numberOfIterations = randomIntBetween(5, 15);
207207
indexData(numberOfIterations, true, INDEX_NAME);
208-
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
208+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
209+
String shardPath = getShardLevelBlobPath(
210+
client(),
211+
INDEX_NAME,
212+
BlobPath.cleanPath(),
213+
"0",
214+
SEGMENTS,
215+
METADATA,
216+
segmentsPathFixedPrefix
217+
).buildAsString();
209218
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
210219
;
211220
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
@@ -236,7 +245,16 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
236245
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
237246
int numberOfIterations = randomIntBetween(5, 15);
238247
indexData(numberOfIterations, false, INDEX_NAME);
239-
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
248+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
249+
String shardPath = getShardLevelBlobPath(
250+
client(),
251+
INDEX_NAME,
252+
BlobPath.cleanPath(),
253+
"0",
254+
SEGMENTS,
255+
METADATA,
256+
segmentsPathFixedPrefix
257+
).buildAsString();
240258
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
241259
int actualFileCount = getFileCount(indexPath);
242260
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
@@ -247,11 +265,19 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
247265
Settings.Builder settings = Settings.builder()
248266
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
249267
internalCluster().startNode(settings);
250-
268+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
251269
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
252270
int numberOfIterations = randomIntBetween(5, 15);
253271
indexData(numberOfIterations, true, INDEX_NAME);
254-
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
272+
String shardPath = getShardLevelBlobPath(
273+
client(),
274+
INDEX_NAME,
275+
BlobPath.cleanPath(),
276+
"0",
277+
SEGMENTS,
278+
METADATA,
279+
segmentsPathFixedPrefix
280+
).buildAsString();
255281
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
256282
int actualFileCount = getFileCount(indexPath);
257283
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
@@ -271,7 +297,16 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
271297
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
272298
int numberOfIterations = randomIntBetween(12, 18);
273299
indexData(numberOfIterations, true, INDEX_NAME);
274-
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
300+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
301+
String shardPath = getShardLevelBlobPath(
302+
client(),
303+
INDEX_NAME,
304+
BlobPath.cleanPath(),
305+
"0",
306+
SEGMENTS,
307+
METADATA,
308+
segmentsPathFixedPrefix
309+
).buildAsString();
275310
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
276311
;
277312
int actualFileCount = getFileCount(indexPath);
@@ -604,8 +639,10 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
604639
indexBulk(INDEX_NAME, 50);
605640
flushAndRefresh(INDEX_NAME);
606641

642+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
607643
// 3. Delete data from remote segment store
608-
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
644+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
645+
.buildAsString();
609646
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);
610647

611648
try (Stream<Path> files = Files.list(segmentDataPath)) {
@@ -844,7 +881,16 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
844881
.get()
845882
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
846883

847-
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString();
884+
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
885+
String shardPath = getShardLevelBlobPath(
886+
client(),
887+
INDEX_NAME,
888+
BlobPath.cleanPath(),
889+
"0",
890+
TRANSLOG,
891+
METADATA,
892+
translogPathFixedPrefix
893+
).buildAsString();
848894
Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath);
849895

850896
try (Stream<Path> files = Files.list(translogMetaDataPath)) {

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
1414
import org.opensearch.common.blobstore.BlobPath;
1515
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.indices.RemoteStoreSettings;
1617
import org.opensearch.test.OpenSearchIntegTestCase;
1718

1819
import java.nio.file.Path;
@@ -50,7 +51,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
5051

5152
String indexName = response.getShards()[0].getShardRouting().index().getName();
5253
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
53-
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
54+
55+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
56+
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
57+
.buildAsString();
5458
Path segmentDataRepoPath = location.resolve(shardPath);
5559
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);
5660

server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,13 +325,15 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
325325

326326
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
327327
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
328+
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
328329
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
329330
client(),
330331
remoteStoreEnabledIndexName,
331332
remoteStoreRepository.basePath(),
332333
"0",
333334
SEGMENTS,
334-
LOCK_FILES
335+
LOCK_FILES,
336+
segmentsPathFixedPrefix
335337
);
336338
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
337339
String[] lockFiles;

server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.core.action.ActionListener;
5656
import org.opensearch.core.common.io.stream.StreamInput;
5757
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
58+
import org.opensearch.indices.RemoteStoreSettings;
5859
import org.opensearch.repositories.RepositoriesService;
5960
import org.opensearch.repositories.Repository;
6061
import org.opensearch.repositories.RepositoryCleanupResult;
@@ -113,7 +114,8 @@ public TransportCleanupRepositoryAction(
113114
SnapshotsService snapshotsService,
114115
ThreadPool threadPool,
115116
ActionFilters actionFilters,
116-
IndexNameExpressionResolver indexNameExpressionResolver
117+
IndexNameExpressionResolver indexNameExpressionResolver,
118+
RemoteStoreSettings remoteStoreSettings
117119
) {
118120
super(
119121
CleanupRepositoryAction.NAME,
@@ -126,7 +128,10 @@ public TransportCleanupRepositoryAction(
126128
);
127129
this.repositoriesService = repositoriesService;
128130
this.snapshotsService = snapshotsService;
129-
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
131+
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(
132+
() -> repositoriesService,
133+
remoteStoreSettings.getSegmentsPathFixedPrefix()
134+
);
130135
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
131136
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
132137
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,8 @@ public void apply(Settings value, Settings current, Settings previous) {
771771
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
772772
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL,
773773
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED,
774+
RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX,
775+
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX,
774776

775777
// Composite index settings
776778
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,

server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
2121
import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput;
2222
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;
23+
import org.opensearch.indices.RemoteStoreSettings;
2324

2425
import java.io.IOException;
2526
import java.util.Collections;
@@ -68,6 +69,7 @@ public class RemoteIndexPath implements ToXContentFragment {
6869
private final Iterable<String> basePath;
6970
private final PathType pathType;
7071
private final PathHashAlgorithm pathHashAlgorithm;
72+
private final RemoteStoreSettings remoteStoreSettings;
7173

7274
/**
7375
* This keeps the map of paths that would be present in the content of the index path file. For eg - It is possible
@@ -82,7 +84,8 @@ public RemoteIndexPath(
8284
Iterable<String> basePath,
8385
PathType pathType,
8486
PathHashAlgorithm pathHashAlgorithm,
85-
Map<DataCategory, List<DataType>> pathCreationMap
87+
Map<DataCategory, List<DataType>> pathCreationMap,
88+
RemoteStoreSettings remoteStoreSettings
8689
) {
8790
if (Objects.isNull(pathCreationMap)
8891
|| Objects.isNull(pathType)
@@ -119,6 +122,7 @@ public RemoteIndexPath(
119122
this.pathType = pathType;
120123
this.pathHashAlgorithm = pathHashAlgorithm;
121124
this.pathCreationMap = pathCreationMap;
125+
this.remoteStoreSettings = remoteStoreSettings;
122126
}
123127

124128
@Override
@@ -148,6 +152,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
148152
.shardId(Integer.toString(shardNo))
149153
.dataCategory(dataCategory)
150154
.dataType(type)
155+
.fixedPrefix(
156+
dataCategory == TRANSLOG
157+
? remoteStoreSettings.getTranslogPathFixedPrefix()
158+
: remoteStoreSettings.getSegmentsPathFixedPrefix()
159+
)
151160
.build();
152161
builder.value(pathType.path(pathInput, pathHashAlgorithm).buildAsString());
153162
}

server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.gateway.remote.IndexMetadataUploadListener;
2626
import org.opensearch.gateway.remote.RemoteStateTransferException;
2727
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
28+
import org.opensearch.indices.RemoteStoreSettings;
2829
import org.opensearch.node.Node;
2930
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
3031
import org.opensearch.repositories.RepositoriesService;
@@ -79,6 +80,7 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {
7980
private final Settings settings;
8081
private final boolean isRemoteDataAttributePresent;
8182
private final boolean isTranslogSegmentRepoSame;
83+
private final RemoteStoreSettings remoteStoreSettings;
8284
private final Supplier<RepositoriesService> repositoriesService;
8385
private volatile TimeValue metadataUploadTimeout;
8486

@@ -89,7 +91,8 @@ public RemoteIndexPathUploader(
8991
ThreadPool threadPool,
9092
Settings settings,
9193
Supplier<RepositoriesService> repositoriesService,
92-
ClusterSettings clusterSettings
94+
ClusterSettings clusterSettings,
95+
RemoteStoreSettings remoteStoreSettings
9396
) {
9497
super(threadPool, ThreadPool.Names.GENERIC);
9598
this.settings = Objects.requireNonNull(settings);
@@ -100,6 +103,7 @@ public RemoteIndexPathUploader(
100103
Objects.requireNonNull(clusterSettings);
101104
metadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
102105
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setMetadataUploadTimeout);
106+
this.remoteStoreSettings = remoteStoreSettings;
103107
}
104108

105109
@Override
@@ -208,7 +212,8 @@ private void writePathToRemoteStore(
208212
basePath,
209213
pathType,
210214
hashAlgorithm,
211-
pathCreationMap
215+
pathCreationMap,
216+
remoteStoreSettings
212217
);
213218
String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion());
214219
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(remoteIndexPath, blobContainer, fileName, actionListener);

0 commit comments

Comments
 (0)