Skip to content

Commit 82689a2

Browse files
rajiv-kvakolarkunnu
authored andcommitted
create publication repos during join task execution (opensearch-project#16383)
* create publication repos during join task Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
1 parent de5a5d7 commit 82689a2

File tree

4 files changed

+272
-15
lines changed

4 files changed

+272
-15
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.remotemigration.MigrationBaseTestCase;
1515
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
1616
import org.opensearch.repositories.blobstore.BlobStoreRepository;
17+
import org.opensearch.repositories.fs.FsRepository;
1718
import org.opensearch.repositories.fs.ReloadableFsRepository;
1819
import org.opensearch.test.InternalSettingsPlugin;
1920
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() {
9798
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
9899
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
99100
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
100-
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
101+
.put(routingTableRepoTypeAttributeKey, FsRepository.TYPE)
101102
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
102103
return builder;
103104
}
104105

105106
public Settings.Builder remoteWithRoutingTableNodeSetting() {
106107
// Remote Cluster with Routing table
108+
107109
return Settings.builder()
108110
.put(
109-
buildRemoteStoreNodeAttributes(
111+
remoteStoreClusterSettings(
110112
REPOSITORY_NAME,
111113
segmentRepoPath,
114+
ReloadableFsRepository.TYPE,
112115
REPOSITORY_2_NAME,
113116
translogRepoPath,
117+
ReloadableFsRepository.TYPE,
114118
REPOSITORY_NAME,
115119
segmentRepoPath,
116-
false
120+
ReloadableFsRepository.TYPE
117121
)
118122
)
119123
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.cluster.metadata.IndexMetadata;
4343
import org.opensearch.cluster.metadata.Metadata;
4444
import org.opensearch.cluster.metadata.RepositoriesMetadata;
45+
import org.opensearch.cluster.metadata.RepositoryMetadata;
4546
import org.opensearch.cluster.node.DiscoveryNode;
4647
import org.opensearch.cluster.node.DiscoveryNodes;
4748
import org.opensearch.cluster.routing.RerouteService;
@@ -57,6 +58,7 @@
5758
import java.util.Collection;
5859
import java.util.Collections;
5960
import java.util.HashMap;
61+
import java.util.LinkedHashMap;
6062
import java.util.List;
6163
import java.util.Locale;
6264
import java.util.Map;
@@ -185,11 +187,30 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
185187
// for every set of node join task which we can optimize to not compute if cluster state already has
186188
// repository information.
187189
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
188-
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
189-
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
190-
dn,
191-
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
192-
);
190+
Optional<DiscoveryNode> remotePublicationDN = currentNodes.getNodes()
191+
.values()
192+
.stream()
193+
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
194+
.findFirst();
195+
RepositoriesMetadata existingRepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE);
196+
Map<String, RepositoryMetadata> repositories = new LinkedHashMap<>();
197+
if (existingRepositoriesMetadata != null) {
198+
existingRepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
199+
}
200+
if (remoteDN.isPresent()) {
201+
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
202+
remoteDN.get(),
203+
existingRepositoriesMetadata
204+
);
205+
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
206+
}
207+
if (remotePublicationDN.isPresent()) {
208+
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
209+
remotePublicationDN.get(),
210+
existingRepositoriesMetadata
211+
);
212+
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
213+
}
193214

194215
assert nodesBuilder.isLocalNodeElectedClusterManager();
195216

@@ -219,15 +240,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
219240
ensureNodeCommissioned(node, currentState.metadata());
220241
nodesBuilder.add(node);
221242

222-
if (remoteDN.isEmpty() && node.isRemoteStoreNode()) {
243+
if ((remoteDN.isEmpty() && node.isRemoteStoreNode())
244+
|| (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) {
223245
// This is hit only on cases where we encounter first remote node
224246
logger.info("Updating system repository now for remote store");
225-
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
247+
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
226248
node,
227-
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
249+
existingRepositoriesMetadata
228250
);
251+
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
229252
}
230-
231253
nodesChanged = true;
232254
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
233255
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
@@ -241,7 +263,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
241263
}
242264
results.success(joinTask);
243265
}
244-
266+
RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values()));
245267
if (nodesChanged) {
246268
rerouteService.reroute(
247269
"post-join reroute",

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) {
168168
* node repository metadata an exception will be thrown and the node will not be allowed to join the cluster.
169169
*/
170170
public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) {
171-
if (joiningNode.isRemoteStoreNode()) {
171+
if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) {
172172
List<RepositoryMetadata> updatedRepositoryMetadataList = new ArrayList<>();
173173
List<RepositoryMetadata> newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata()
174174
.repositories();

0 commit comments

Comments
 (0)