Skip to content

Commit a2370bb

Browse files
ashking94shiv0408
authored andcommitted
Refactor remote store flow to support any path type (opensearch-project#12822)
Signed-off-by: Ashish Singh <ssashish@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 717b964 commit a2370bb

20 files changed

+418
-110
lines changed

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@
8888
import org.opensearch.index.mapper.MapperService;
8989
import org.opensearch.index.mapper.MapperService.MergeReason;
9090
import org.opensearch.index.query.QueryShardContext;
91-
import org.opensearch.index.remote.RemoteStorePathResolver;
9291
import org.opensearch.index.remote.RemoteStorePathType;
92+
import org.opensearch.index.remote.RemoteStorePathTypeResolver;
9393
import org.opensearch.index.shard.IndexSettingProvider;
9494
import org.opensearch.index.translog.Translog;
9595
import org.opensearch.indices.IndexCreationException;
@@ -113,6 +113,7 @@
113113
import java.util.List;
114114
import java.util.Locale;
115115
import java.util.Map;
116+
import java.util.Objects;
116117
import java.util.Optional;
117118
import java.util.Set;
118119
import java.util.concurrent.atomic.AtomicInteger;
@@ -170,7 +171,7 @@ public class MetadataCreateIndexService {
170171
private AwarenessReplicaBalance awarenessReplicaBalance;
171172

172173
@Nullable
173-
private final RemoteStorePathResolver remoteStorePathResolver;
174+
private final RemoteStorePathTypeResolver remoteStorePathTypeResolver;
174175

175176
public MetadataCreateIndexService(
176177
final Settings settings,
@@ -203,8 +204,8 @@ public MetadataCreateIndexService(
203204

204205
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
205206
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
206-
remoteStorePathResolver = isRemoteDataAttributePresent(settings)
207-
? new RemoteStorePathResolver(clusterService.getClusterSettings())
207+
remoteStorePathTypeResolver = isRemoteDataAttributePresent(settings)
208+
? new RemoteStorePathTypeResolver(clusterService.getClusterSettings())
208209
: null;
209210
}
210211

@@ -553,7 +554,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
553554
tmpImdBuilder.setRoutingNumShards(routingNumShards);
554555
tmpImdBuilder.settings(indexSettings);
555556
tmpImdBuilder.system(isSystem);
556-
addRemoteCustomData(tmpImdBuilder);
557+
addRemoteStorePathTypeInCustomData(tmpImdBuilder, true);
557558

558559
// Set up everything, now locally create the index to see that things are ok, and apply
559560
IndexMetadata tempMetadata = tmpImdBuilder.build();
@@ -562,17 +563,24 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
562563
return tempMetadata;
563564
}
564565

565-
public void addRemoteCustomData(IndexMetadata.Builder tmpImdBuilder) {
566-
if (remoteStorePathResolver != null) {
566+
/**
567+
* Adds the remote store path type information in custom data of index metadata.
568+
*
569+
* @param tmpImdBuilder index metadata builder.
570+
* @param assertNullOldType flag to verify that the old remote store path type is null
571+
*/
572+
public void addRemoteStorePathTypeInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
573+
if (remoteStorePathTypeResolver != null) {
567574
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
568575
// in the remote store custom data map.
569576
Map<String, String> existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
570577
Map<String, String> remoteCustomData = existingRemoteCustomData == null
571578
? new HashMap<>()
572579
: new HashMap<>(existingRemoteCustomData);
573580
// Determine the path type for use using the remoteStorePathResolver.
574-
String newPathType = remoteStorePathResolver.resolveType().toString();
581+
String newPathType = remoteStorePathTypeResolver.getType().toString();
575582
String oldPathType = remoteCustomData.put(RemoteStorePathType.NAME, newPathType);
583+
assert !assertNullOldType || Objects.isNull(oldPathType);
576584
logger.trace(() -> new ParameterizedMessage("Added new path type {}, replaced old path type {}", newPathType, oldPathType));
577585
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
578586
}

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,8 @@ public synchronized IndexShard createShard(
507507
remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
508508
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
509509
this.indexSettings.getUUID(),
510-
shardId
510+
shardId,
511+
this.indexSettings.getRemoteStorePathType()
511512
);
512513
}
513514
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.opensearch.core.common.unit.ByteSizeUnit;
4949
import org.opensearch.core.common.unit.ByteSizeValue;
5050
import org.opensearch.core.index.Index;
51+
import org.opensearch.index.remote.RemoteStorePathType;
5152
import org.opensearch.index.translog.Translog;
5253
import org.opensearch.indices.replication.common.ReplicationType;
5354
import org.opensearch.ingest.IngestService;
@@ -59,6 +60,7 @@
5960
import java.util.Collections;
6061
import java.util.List;
6162
import java.util.Locale;
63+
import java.util.Map;
6264
import java.util.Optional;
6365
import java.util.concurrent.TimeUnit;
6466
import java.util.function.Consumer;
@@ -1905,4 +1907,11 @@ public double getDocIdFuzzySetFalsePositiveProbability() {
19051907
public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePositiveProbability) {
19061908
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
19071909
}
1910+
1911+
public RemoteStorePathType getRemoteStorePathType() {
1912+
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
1913+
return remoteCustomData != null && remoteCustomData.containsKey(RemoteStorePathType.NAME)
1914+
? RemoteStorePathType.parseString(remoteCustomData.get(RemoteStorePathType.NAME))
1915+
: RemoteStorePathType.FIXED;
1916+
}
19081917
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.remote;
10+
11+
import java.util.Set;
12+
13+
import static org.opensearch.index.remote.RemoteStoreDataEnums.DataType.DATA;
14+
import static org.opensearch.index.remote.RemoteStoreDataEnums.DataType.METADATA;
15+
16+
/**
17+
* This class contains the different enums related to remote store data categories and types.
18+
*
19+
* @opensearch.internal
20+
*/
21+
public class RemoteStoreDataEnums {
22+
23+
/**
24+
* Categories of the data in Remote store.
25+
*/
26+
public enum DataCategory {
27+
SEGMENTS("segments", Set.of(DataType.values())),
28+
TRANSLOG("translog", Set.of(DATA, METADATA));
29+
30+
private final String name;
31+
private final Set<DataType> supportedDataTypes;
32+
33+
DataCategory(String name, Set<DataType> supportedDataTypes) {
34+
this.name = name;
35+
this.supportedDataTypes = supportedDataTypes;
36+
}
37+
38+
public boolean isSupportedDataType(DataType dataType) {
39+
return supportedDataTypes.contains(dataType);
40+
}
41+
42+
public String getName() {
43+
return name;
44+
}
45+
}
46+
47+
/**
48+
* Types of data in remote store.
49+
*/
50+
public enum DataType {
51+
DATA("data"),
52+
METADATA("metadata"),
53+
LOCK_FILES("lock_files");
54+
55+
private final String name;
56+
57+
DataType(String name) {
58+
this.name = name;
59+
}
60+
61+
public String getName() {
62+
return name;
63+
}
64+
}
65+
}

server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
package org.opensearch.index.remote;
1010

11+
import org.opensearch.common.blobstore.BlobPath;
12+
import org.opensearch.index.remote.RemoteStoreDataEnums.DataCategory;
13+
import org.opensearch.index.remote.RemoteStoreDataEnums.DataType;
14+
1115
import java.util.Locale;
1216

1317
/**
@@ -18,13 +22,46 @@
1822
*/
1923
public enum RemoteStorePathType {
2024

21-
FIXED,
22-
HASHED_PREFIX;
25+
FIXED {
26+
@Override
27+
public BlobPath generatePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType) {
28+
return basePath.add(indexUUID).add(shardId).add(dataCategory).add(dataType);
29+
}
30+
},
31+
HASHED_PREFIX {
32+
@Override
33+
public BlobPath generatePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType) {
34+
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
35+
// throw new UnsupportedOperationException("Not implemented"); --> Not using this for unblocking couple of tests.
36+
return basePath.add(indexUUID).add(shardId).add(dataCategory).add(dataType);
37+
}
38+
};
39+
40+
/**
41+
* @param basePath base path of the underlying blob store repository
42+
* @param indexUUID of the index
43+
* @param shardId shard id
44+
* @param dataCategory is either translog or segment
45+
* @param dataType can be one of data, metadata or lock_files.
46+
* @return the blob path for the underlying remote store path type.
47+
*/
48+
public BlobPath path(BlobPath basePath, String indexUUID, String shardId, DataCategory dataCategory, DataType dataType) {
49+
assert dataCategory.isSupportedDataType(dataType) : "category:"
50+
+ dataCategory
51+
+ " type:"
52+
+ dataType
53+
+ " are not supported together";
54+
return generatePath(basePath, indexUUID, shardId, dataCategory.getName(), dataType.getName());
55+
}
56+
57+
abstract BlobPath generatePath(BlobPath basePath, String indexUUID, String shardId, String dataCategory, String dataType);
2358

2459
public static RemoteStorePathType parseString(String remoteStoreBlobPathType) {
2560
try {
2661
return RemoteStorePathType.valueOf(remoteStoreBlobPathType.toUpperCase(Locale.ROOT));
27-
} catch (IllegalArgumentException e) {
62+
} catch (IllegalArgumentException | NullPointerException e) {
63+
// IllegalArgumentException is thrown when the input does not match any enum name
64+
// NullPointerException is thrown when the input is null
2865
throw new IllegalArgumentException("Could not parse RemoteStorePathType for [" + remoteStoreBlobPathType + "]");
2966
}
3067
}

server/src/main/java/org/opensearch/index/remote/RemoteStorePathResolver.java renamed to server/src/main/java/org/opensearch/index/remote/RemoteStorePathTypeResolver.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@
1616
*
1717
* @opensearch.internal
1818
*/
19-
public class RemoteStorePathResolver {
19+
public class RemoteStorePathTypeResolver {
2020

21-
private final ClusterSettings clusterSettings;
21+
private volatile RemoteStorePathType type;
2222

23-
public RemoteStorePathResolver(ClusterSettings clusterSettings) {
24-
this.clusterSettings = clusterSettings;
23+
public RemoteStorePathTypeResolver(ClusterSettings clusterSettings) {
24+
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
25+
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, this::setType);
2526
}
2627

27-
public RemoteStorePathType resolveType() {
28-
return clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
28+
public RemoteStorePathType getType() {
29+
return type;
30+
}
31+
32+
public void setType(RemoteStorePathType type) {
33+
this.type = type;
2934
}
3035
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4932,7 +4932,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
49324932
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
49334933
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
49344934
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
4935-
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool());
4935+
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathType());
49364936
}
49374937

49384938
/*
@@ -4949,7 +4949,14 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
49494949
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
49504950
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
49514951
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
4952-
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger);
4952+
RemoteFsTranslog.download(
4953+
repository,
4954+
shardId,
4955+
getThreadPool(),
4956+
shardPath().resolveTranslog(),
4957+
indexSettings.getRemoteStorePathType(),
4958+
logger
4959+
);
49534960
}
49544961

49554962
/**

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.index.engine.Engine;
5959
import org.opensearch.index.engine.EngineException;
6060
import org.opensearch.index.mapper.MapperService;
61+
import org.opensearch.index.remote.RemoteStorePathType;
6162
import org.opensearch.index.seqno.SequenceNumbers;
6263
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
6364
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
@@ -409,7 +410,8 @@ void recoverFromSnapshotAndRemoteStore(
409410
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
410411
remoteStoreRepository,
411412
indexUUID,
412-
shardId
413+
shardId,
414+
RemoteStorePathType.FIXED // TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
413415
);
414416
sourceRemoteDirectory.initializeToSpecificCommit(
415417
primaryTerm,

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
3232
import org.opensearch.core.action.ActionListener;
3333
import org.opensearch.core.index.shard.ShardId;
34+
import org.opensearch.index.remote.RemoteStorePathType;
3435
import org.opensearch.index.remote.RemoteStoreUtils;
3536
import org.opensearch.index.store.lockmanager.FileLockInfo;
3637
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
@@ -897,13 +898,15 @@ public static void remoteDirectoryCleanup(
897898
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
898899
String remoteStoreRepoForIndex,
899900
String indexUUID,
900-
ShardId shardId
901+
ShardId shardId,
902+
RemoteStorePathType pathType
901903
) {
902904
try {
903905
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
904906
remoteStoreRepoForIndex,
905907
indexUUID,
906-
shardId
908+
shardId,
909+
pathType
907910
);
908911
remoteSegmentStoreDirectory.deleteStaleSegments(0);
909912
remoteSegmentStoreDirectory.deleteIfEmpty();

0 commit comments

Comments
 (0)