Skip to content

Commit a7b3d97

Browse files
authored
Apply cluster state metadata and routing table diff when building cluster state from remote (#18256)
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
1 parent 998ae73 commit a7b3d97

File tree

4 files changed

+138
-34
lines changed

4 files changed

+138
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2020
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
2121
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
2222
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
23+
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))
2324
- Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250)))
2425
- Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270)))
2526

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1414
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
15+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
1516
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1617
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1718
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
19+
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
1820
import org.opensearch.cluster.ClusterState;
1921
import org.opensearch.cluster.coordination.CoordinationState;
2022
import org.opensearch.cluster.coordination.PersistedStateRegistry;
23+
import org.opensearch.cluster.coordination.PersistedStateStats;
2124
import org.opensearch.cluster.coordination.PublishClusterStateStats;
2225
import org.opensearch.common.blobstore.BlobPath;
2326
import org.opensearch.common.settings.Settings;
@@ -583,6 +586,44 @@ public void testRemotePublicationSettingChangePersistedAfterFullRestart() throws
583586
});
584587
}
585588

589+
public void testPublicationIndexAlias() throws Exception {
590+
// create cluster with multi node (3 master + 2 data)
591+
prepareCluster(3, 2, INDEX_NAME, 1, 2);
592+
ensureStableCluster(5);
593+
ensureGreen(INDEX_NAME);
594+
595+
createIndex("index-1");
596+
createIndex("index-2");
597+
createIndex("index-3");
598+
599+
IndicesAliasesRequest request = new IndicesAliasesRequest(); // <1>
600+
IndicesAliasesRequest.AliasActions remoteIndexAction = new IndicesAliasesRequest.AliasActions(
601+
IndicesAliasesRequest.AliasActions.Type.REMOVE_INDEX
602+
).index("index-1");
603+
IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
604+
.index("index-2")
605+
.alias("index-1");
606+
request.addAliasAction(remoteIndexAction);
607+
request.addAliasAction(aliasAction);
608+
609+
assertAcked(client().admin().indices().aliases(request).actionGet());
610+
// assert here that NodeStats.discovery.remote_diff_download.failed_count is 0 for any/all node
611+
NodesStatsResponse nodesStatsResponse = client().admin()
612+
.cluster()
613+
.nodesStats(new NodesStatsRequest().addMetric(DISCOVERY.metricName()))
614+
.actionGet();
615+
for (NodeStats node : nodesStatsResponse.getNodes()) {
616+
List<PersistedStateStats> persistenceStats = node.getDiscoveryStats().getClusterStateStats().getPersistenceStats();
617+
for (PersistedStateStats persistedStateStats : persistenceStats) {
618+
String statsName = persistedStateStats.getStatsName();
619+
if (FULL_DOWNLOAD_STATS.equals(statsName) || DIFF_DOWNLOAD_STATS.equals(statsName)) {
620+
assertEquals(0, persistedStateStats.getFailedCount());
621+
}
622+
}
623+
}
624+
ensureGreen(INDEX_NAME);
625+
}
626+
586627
private void assertDataNodeDownloadStats(NodeStats nodeStats) {
587628
// assert cluster state stats for data node
588629
DiscoveryStats dataNodeDiscoveryStats = nodeStats.getDiscoveryStats();

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import java.util.concurrent.TimeUnit;
8585
import java.util.concurrent.atomic.AtomicBoolean;
8686
import java.util.concurrent.atomic.AtomicReference;
87+
import java.util.function.Consumer;
8788
import java.util.function.Function;
8889
import java.util.function.LongSupplier;
8990
import java.util.function.Supplier;
@@ -1202,6 +1203,51 @@ ClusterState readClusterStateInParallel(
12021203
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead,
12031204
boolean readIndexRoutingTableDiff,
12041205
boolean includeEphemeral
1206+
) {
1207+
return readClusterStateInParallel(
1208+
previousState,
1209+
manifest,
1210+
clusterUUID,
1211+
localNodeId,
1212+
indicesToRead,
1213+
customToRead,
1214+
readCoordinationMetadata,
1215+
readSettingsMetadata,
1216+
readTransientSettingsMetadata,
1217+
readTemplatesMetadata,
1218+
readDiscoveryNodes,
1219+
readClusterBlocks,
1220+
indicesRoutingToRead,
1221+
readHashesOfConsistentSettings,
1222+
clusterStateCustomToRead,
1223+
readIndexRoutingTableDiff,
1224+
includeEphemeral,
1225+
(metadataBuilder) -> {},
1226+
(routingTable) -> {}
1227+
);
1228+
}
1229+
1230+
// package private for testing
1231+
ClusterState readClusterStateInParallel(
1232+
ClusterState previousState,
1233+
ClusterMetadataManifest manifest,
1234+
String clusterUUID,
1235+
String localNodeId,
1236+
List<UploadedIndexMetadata> indicesToRead,
1237+
Map<String, UploadedMetadataAttribute> customToRead,
1238+
boolean readCoordinationMetadata,
1239+
boolean readSettingsMetadata,
1240+
boolean readTransientSettingsMetadata,
1241+
boolean readTemplatesMetadata,
1242+
boolean readDiscoveryNodes,
1243+
boolean readClusterBlocks,
1244+
List<UploadedIndexMetadata> indicesRoutingToRead,
1245+
boolean readHashesOfConsistentSettings,
1246+
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead,
1247+
boolean readIndexRoutingTableDiff,
1248+
boolean includeEphemeral,
1249+
Consumer<Metadata.Builder> metadataTransformer,
1250+
Consumer<RoutingTable> routingTableTransformer
12051251
) {
12061252
int totalReadTasks = indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata
12071253
? 1
@@ -1467,12 +1513,11 @@ ClusterState readClusterStateInParallel(
14671513
});
14681514

14691515
metadataBuilder.indices(indexMetadataMap);
1516+
metadataTransformer.accept(metadataBuilder);
14701517
if (readDiscoveryNodes) {
14711518
clusterStateBuilder.nodes(discoveryNodesBuilder.get().localNodeId(localNodeId));
14721519
}
14731520

1474-
clusterStateBuilder.metadata(metadataBuilder).version(manifest.getStateVersion()).stateUUID(manifest.getStateUUID());
1475-
14761521
readIndexRoutingTableResults.forEach(
14771522
indexRoutingTable -> indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable)
14781523
);
@@ -1481,8 +1526,12 @@ ClusterState readClusterStateInParallel(
14811526
if (routingTableDiff != null) {
14821527
newRoutingTable = routingTableDiff.apply(previousState.getRoutingTable());
14831528
}
1484-
clusterStateBuilder.routingTable(newRoutingTable);
1529+
routingTableTransformer.accept(newRoutingTable);
14851530

1531+
clusterStateBuilder.metadata(metadataBuilder)
1532+
.routingTable(newRoutingTable)
1533+
.version(manifest.getStateVersion())
1534+
.stateUUID(manifest.getStateUUID());
14861535
return clusterStateBuilder.build();
14871536
}
14881537

@@ -1638,41 +1687,40 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
16381687
manifest.getDiffManifest() != null
16391688
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
16401689
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
1641-
includeEphemeral
1690+
includeEphemeral,
1691+
(metadataBuilder) -> {
1692+
// remove the deleted indices from the metadata
1693+
for (String index : diff.getIndicesDeleted()) {
1694+
metadataBuilder.remove(index);
1695+
}
1696+
// remove the deleted metadata customs from the metadata
1697+
if (diff.getCustomMetadataDeleted() != null) {
1698+
for (String customType : diff.getCustomMetadataDeleted()) {
1699+
metadataBuilder.removeCustom(customType);
1700+
}
1701+
}
1702+
},
1703+
(routingTable) -> {
1704+
Map<String, IndexRoutingTable> indexRoutingTables = routingTable.getIndicesRouting();
1705+
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
1706+
for (String indexName : diff.getIndicesRoutingDeleted()) {
1707+
indexRoutingTables.remove(indexName);
1708+
}
1709+
}
1710+
}
16421711
);
16431712
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
1644-
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
1645-
// remove the deleted indices from the metadata
1646-
for (String index : diff.getIndicesDeleted()) {
1647-
metadataBuilder.remove(index);
1648-
}
1649-
// remove the deleted metadata customs from the metadata
1650-
if (diff.getCustomMetadataDeleted() != null) {
1651-
for (String customType : diff.getCustomMetadataDeleted()) {
1652-
metadataBuilder.removeCustom(customType);
1653-
}
1654-
}
1655-
16561713
// remove the deleted cluster state customs from the metadata
16571714
if (diff.getClusterStateCustomDeleted() != null) {
16581715
for (String customType : diff.getClusterStateCustomDeleted()) {
16591716
clusterStateBuilder.removeCustom(customType);
16601717
}
16611718
}
16621719

1663-
HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(
1664-
updatedClusterState.getRoutingTable().getIndicesRouting()
1665-
);
1666-
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
1667-
for (String indexName : diff.getIndicesRoutingDeleted()) {
1668-
indexRoutingTables.remove(indexName);
1669-
}
1670-
}
1671-
16721720
ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
16731721
.version(manifest.getStateVersion())
1674-
.metadata(metadataBuilder)
1675-
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
1722+
.metadata(updatedClusterState.metadata())
1723+
.routingTable(updatedClusterState.routingTable())
16761724
.build();
16771725
if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
16781726
&& manifest.getClusterStateChecksum() != null) {

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3512,7 +3512,9 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException {
35123512
anyBoolean(),
35133513
eq(emptyMap()),
35143514
anyBoolean(),
3515-
anyBoolean()
3515+
anyBoolean(),
3516+
any(),
3517+
any()
35163518
);
35173519
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);
35183520

@@ -3554,7 +3556,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio
35543556
anyBoolean(),
35553557
eq(emptyMap()),
35563558
anyBoolean(),
3557-
anyBoolean()
3559+
anyBoolean(),
3560+
any(),
3561+
any()
35583562
);
35593563
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);
35603564

@@ -3596,7 +3600,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I
35963600
anyBoolean(),
35973601
eq(emptyMap()),
35983602
anyBoolean(),
3599-
anyBoolean()
3603+
anyBoolean(),
3604+
any(),
3605+
any()
36003606
);
36013607
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);
36023608
verify(mockService, times(1)).validateClusterStateFromChecksum(
@@ -3637,7 +3643,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I
36373643
anyBoolean(),
36383644
eq(emptyMap()),
36393645
anyBoolean(),
3640-
anyBoolean()
3646+
anyBoolean(),
3647+
any(),
3648+
any()
36413649
);
36423650
doReturn(clusterState).when(mockService)
36433651
.readClusterStateInParallel(
@@ -3657,7 +3665,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I
36573665
eq(true),
36583666
eq(manifest.getClusterStateCustomMap()),
36593667
eq(false),
3660-
eq(true)
3668+
eq(true),
3669+
any(),
3670+
any()
36613671
);
36623672

36633673
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);
@@ -3699,7 +3709,9 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio
36993709
anyBoolean(),
37003710
eq(emptyMap()),
37013711
anyBoolean(),
3702-
anyBoolean()
3712+
anyBoolean(),
3713+
any(),
3714+
any()
37033715
);
37043716
doReturn(clusterState).when(mockService)
37053717
.readClusterStateInParallel(
@@ -3719,7 +3731,9 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio
37193731
eq(true),
37203732
eq(manifest.getClusterStateCustomMap()),
37213733
eq(false),
3722-
eq(true)
3734+
eq(true),
3735+
any(),
3736+
any()
37233737
);
37243738

37253739
expectThrows(IllegalStateException.class, () -> mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID));

0 commit comments

Comments
 (0)