From 6fd9cd57cba3f90a4822922e1cd92772c5fe63fa Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 30 Aug 2024 13:04:50 -0700 Subject: [PATCH 1/7] Introduce allocation filter to control placement of search only replicas Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../SearchReplicaFilteringAllocationIT.java | 125 ++++++++++++++++++ .../SearchOnlyReplicaFeatureFlagIT.java | 12 ++ .../decider/FilterAllocationDecider.java | 53 +++++++- .../common/settings/ClusterSettings.java | 4 +- .../decider/FilterAllocationDeciderTests.java | 99 ++++++++++++++ 6 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 453f32ee74878..fdcf69f74a6a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) +- [Reader Writer Separation] Add allocation filter for search replicas ([#15455](https://github.com/opensearch-project/OpenSearch/pull/15455)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java new file mode 100644 index 0000000000000..9f2ea871a9b83 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.allocation; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase { + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); + } + + public void testSearchReplicaDedicatedIncludes() { + List nodesIds = internalCluster().startNodes(3); + final String node_0 = nodesIds.get(0); + final String node_1 = nodesIds.get(1); + final String node_2 = nodesIds.get(2); + assertEquals(3, cluster().size()); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0) + ) + .execute() + .actionGet(); + + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureGreen("test"); + // ensure primary is not on node 0 or 1, + IndexShardRoutingTable routingTable = getRoutingTable(); + assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId())); + + String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()); + String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0; + + // set the included nodes to the other open node, search replica should relocate to that node. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode)) + .execute() + .actionGet(); + ensureGreen("test"); + + routingTable = getRoutingTable(); + assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId())); + assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId())); + } + + public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() { + List nodesIds = internalCluster().startNodes(3); + final String node_0 = nodesIds.get(0); + final String node_1 = nodesIds.get(1); + final String node_2 = nodesIds.get(2); + assertEquals(3, cluster().size()); + + // set filter on 1 node and set search replica count to 2 - should leave 1 unassigned + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1)) + .execute() + .actionGet(); + + logger.info("--> creating an index with no replicas"); + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureYellowAndNoInitializingShards("test"); + IndexShardRoutingTable routingTable = getRoutingTable(); + assertEquals(2, routingTable.searchOnlyReplicas().size()); + List assignedSearchShards = routingTable.searchOnlyReplicas() + .stream() + .filter(ShardRouting::assignedToNode) + .collect(Collectors.toList()); + assertEquals(1, assignedSearchShards.size()); + assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId())); + assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count()); + } + + private IndexShardRoutingTable getRoutingTable() { + IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0); + return routingTable; + } + + private String getNodeName(String id) { + return getClusterState().nodes().get(id).getName(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java index e5a05c04fa7ee..f3fc569f1a3fd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java @@ -17,6 +17,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1) public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase { @@ -53,4 +54,15 @@ public void testUpdateFeatureFlagDisabled() { }); assertTrue(settingsException.getMessage().contains("unknown setting")); } + + public void testFilterAllocationSettingNotRegistered() { + expectThrows(SettingsException.class, () -> { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node")) + .execute() + .actionGet(); + }); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index d3200c1bc9d75..a48ba889914b7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -44,6 +44,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.node.remotestore.RemoteStoreNodeService; import java.util.Map; @@ -88,6 +89,8 @@ public class FilterAllocationDecider extends AllocationDecider { private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require"; private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include"; private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude"; + private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include"; + public static final Setting.AffixSetting CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting( CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) @@ -100,7 +103,12 @@ public class FilterAllocationDecider extends AllocationDecider { CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) ); + public static final Setting.AffixSetting SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting( + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".", + key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) + ); + private volatile DiscoveryNodeFilters searchReplicaIncludeFilters; private volatile DiscoveryNodeFilters clusterRequireFilters; private volatile DiscoveryNodeFilters clusterIncludeFilters; private volatile DiscoveryNodeFilters clusterExcludeFilters; @@ -113,7 +121,6 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings); this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings); - clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {}); clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {}); clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {}); @@ -122,6 +129,15 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, this::setCompatibilityMode ); + + if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) { + setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); + clusterSettings.addAffixMapUpdateConsumer( + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING, + this::setSearchReplicaIncludeFilters, + (a, b) -> {} + ); + } } private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) { @@ -203,6 +219,9 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation); if (decision != null) return decision; + decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation); + if (decision != null) return decision; + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); } @@ -294,6 +313,32 @@ private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation alloc return null; } + private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) { + if (searchReplicaIncludeFilters != null) { + final boolean match = searchReplicaIncludeFilters.match(node); + if (match == false && routing.isSearchOnly()) { + return allocation.decision( + Decision.NO, + NAME, + "node does not match shard setting [%s] filters [%s]", + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, + searchReplicaIncludeFilters + ); + } + // filter will only apply to search replicas + if (routing.isSearchOnly() == false && match) { + return allocation.decision( + Decision.NO, + NAME, + "only search replicas can be allocated to node with setting [%s] filters [%s]", + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, + searchReplicaIncludeFilters + ); + } + } + return null; + } + private void setClusterRequireFilters(Map filters) { clusterRequireFilters = DiscoveryNodeFilters.trimTier( DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterRequireFilters, AND, filters) @@ -311,4 +356,10 @@ private void setClusterExcludeFilters(Map filters) { DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterExcludeFilters, OR, filters) ); } + + private void setSearchReplicaIncludeFilters(Map filters) { + searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier( + DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters) + ); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 9a6b3f1118709..fb796b99c6e46 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -805,6 +805,8 @@ public void apply(Settings value, Settings current, Settings previous) { OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ) - ) + ), + List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL), + List.of(FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING) ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index 2e303887e0f1b..af5a58ab04354 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -53,16 +53,21 @@ import org.opensearch.common.settings.Settings; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_RESIZE_SOURCE_NAME; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_RESIZE_SOURCE_UUID; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; +import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; @@ -462,4 +467,98 @@ public void testMixedModeRemoteStoreAllocation() { decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation); assertEquals(decision.toString(), Type.NO, decision.type()); } + + public void testSearchReplicaRoutingDedicatedIncludes() { + FeatureFlagSetter.set(READER_WRITER_SPLIT_EXPERIMENTAL); + // we aren't using a settingsModule here so we need to set feature flag gated setting + Set> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), settings); + Settings initialSettings = Settings.builder() + .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2") + .build(); + + FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings); + AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList( + filterAllocationDecider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider() + ) + ); + AllocationService service = new AllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY); + RoutingTable routingTable = state.routingTable(); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + + ShardRouting searchReplica = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + false, + true, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "") + ); + + ShardRouting regularReplica = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + false, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + + ShardRouting primary = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + true, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + + Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate( + searchReplica, + state.getRoutingNodes().node("node2"), + allocation + ); + assertEquals(decision.toString(), Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Type.YES, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + + Settings updatedSettings = Settings.builder() + .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2") + .build(); + clusterSettings.applySettings(updatedSettings); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Type.YES, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Type.NO, decision.type()); + } } From 18e16b3b33c80369690a6fa8e518252b9ec1202d Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 30 Aug 2024 16:24:28 -0700 Subject: [PATCH 2/7] Add a new decider rather than updating the existing FilterAllocationDecider Signed-off-by: Marc Handalian --- .../SearchReplicaFilteringAllocationIT.java | 2 +- .../org/opensearch/cluster/ClusterModule.java | 5 + .../decider/FilterAllocationDecider.java | 53 +----- .../SearchReplicaAllocationDecider.java | 151 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 3 +- .../decider/FilterAllocationDeciderTests.java | 101 +----------- .../SearchReplicaAllocationDeciderTest.java | 133 +++++++++++++++ 7 files changed, 294 insertions(+), 154 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java index 9f2ea871a9b83..5f65d6647f26d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java @@ -20,7 +20,7 @@ import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; -import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase { diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index bb51c42252448..404b5c9882973 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -75,6 +75,7 @@ import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; @@ -85,6 +86,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; import org.opensearch.core.ParseField; @@ -379,6 +381,9 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); + if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) { + addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings)); + } addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index a48ba889914b7..d3200c1bc9d75 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -44,7 +44,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.node.remotestore.RemoteStoreNodeService; import java.util.Map; @@ -89,8 +88,6 @@ public class FilterAllocationDecider extends AllocationDecider { private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require"; private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include"; private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude"; - private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include"; - public static final Setting.AffixSetting CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting( CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".", key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) @@ -103,12 +100,7 @@ public class FilterAllocationDecider extends AllocationDecider { CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".", key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) ); - public static final Setting.AffixSetting SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting( - SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".", - key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) - ); - private volatile DiscoveryNodeFilters searchReplicaIncludeFilters; private volatile DiscoveryNodeFilters clusterRequireFilters; private volatile DiscoveryNodeFilters clusterIncludeFilters; private volatile DiscoveryNodeFilters clusterExcludeFilters; @@ -121,6 +113,7 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings); this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings); + clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {}); clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {}); clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {}); @@ -129,15 +122,6 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, this::setCompatibilityMode ); - - if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) { - setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); - clusterSettings.addAffixMapUpdateConsumer( - SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING, - this::setSearchReplicaIncludeFilters, - (a, b) -> {} - ); - } } private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) { @@ -219,9 +203,6 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation); if (decision != null) return decision; - decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation); - if (decision != null) return decision; - return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); } @@ -313,32 +294,6 @@ private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation alloc return null; } - private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) { - if (searchReplicaIncludeFilters != null) { - final boolean match = searchReplicaIncludeFilters.match(node); - if (match == false && routing.isSearchOnly()) { - return allocation.decision( - Decision.NO, - NAME, - "node does not match shard setting [%s] filters [%s]", - SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, - searchReplicaIncludeFilters - ); - } - // filter will only apply to search replicas - if (routing.isSearchOnly() == false && match) { - return allocation.decision( - Decision.NO, - NAME, - "only search replicas can be allocated to node with setting [%s] filters [%s]", - SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, - searchReplicaIncludeFilters - ); - } - } - return null; - } - private void setClusterRequireFilters(Map filters) { clusterRequireFilters = DiscoveryNodeFilters.trimTier( DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterRequireFilters, AND, filters) @@ -356,10 +311,4 @@ private void setClusterExcludeFilters(Map filters) { DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterExcludeFilters, OR, filters) ); } - - private void setSearchReplicaIncludeFilters(Map filters) { - searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier( - DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters) - ); - } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java new file mode 100644 index 0000000000000..7ec18ac398938 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeFilters; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.remotestore.RemoteStoreNodeService; + +import java.util.Map; + +import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; +import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR; + +/** + * This {@link AllocationDecider} control shard allocation by include and + * exclude filters via dynamic cluster and index routing settings. + *

+ * This filter is used to make explicit decision on which nodes certain shard + * can / should be allocated. The decision if a shard can be allocated, must not + * be allocated or should be allocated is based on either cluster wide dynamic + * settings ({@code cluster.routing.allocation.*}) or index specific dynamic + * settings ({@code index.routing.allocation.*}). All of those settings can be + * changed at runtime via the cluster or the index update settings API. + *

+ * Note: Cluster settings are applied first and will override index specific + * settings such that if a shard can be allocated according to the index routing + * settings it wont be allocated on a node if the cluster specific settings + * would disallow the allocation. Filters are applied in the following order: + *
    + *
  1. {@code required} - filters required allocations. + * If any {@code required} filters are set the allocation is denied if the index is not in the set of {@code required} to allocate + * on the filtered node
  2. + *
  3. {@code include} - filters "allowed" allocations. + * If any {@code include} filters are set the allocation is denied if the index is not in the set of {@code include} filters for + * the filtered node
  4. + *
  5. {@code exclude} - filters "prohibited" allocations. + * If any {@code exclude} filters are set the allocation is denied if the index is in the set of {@code exclude} filters for the + * filtered node
  6. + *
+ * + * @opensearch.internal + */ +public class SearchReplicaAllocationDecider extends AllocationDecider { + + public static final String NAME = "filter"; + private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include"; + public static final Setting.AffixSetting SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting( + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".", + key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) + ); + + private volatile DiscoveryNodeFilters searchReplicaIncludeFilters; + + private volatile RemoteStoreNodeService.Direction migrationDirection; + private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode; + + public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); + clusterSettings.addAffixMapUpdateConsumer( + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING, + this::setSearchReplicaIncludeFilters, + (a, b) -> {} + ); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { + Decision decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation); + if (decision != null) return decision; + + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); + } + + private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) { + if (searchReplicaIncludeFilters != null) { + final boolean match = searchReplicaIncludeFilters.match(node); + if (match == false && routing.isSearchOnly()) { + return allocation.decision( + Decision.NO, + NAME, + "node does not match shard setting [%s] filters [%s]", + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, + searchReplicaIncludeFilters + ); + } + // filter will only apply to search replicas + if (routing.isSearchOnly() == false && match) { + return allocation.decision( + Decision.NO, + NAME, + "only search replicas can be allocated to node with setting [%s] filters [%s]", + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, + searchReplicaIncludeFilters + ); + } + } + return null; + } + + private void setSearchReplicaIncludeFilters(Map filters) { + searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier( + DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters) + ); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fb796b99c6e46..24ec67ba87719 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -76,6 +76,7 @@ import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterApplierService; @@ -807,6 +808,6 @@ public void apply(Settings value, Settings current, Settings previous) { ) ), List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL), - List.of(FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING) + List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING) ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index af5a58ab04354..f226c45553d57 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -53,21 +53,16 @@ import org.opensearch.common.settings.Settings; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; -import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_RESIZE_SOURCE_NAME; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_RESIZE_SOURCE_UUID; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; -import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; @@ -328,7 +323,7 @@ private ClusterState createInitialClusterState(AllocationService service, Settin return createInitialClusterState(service, indexSettings, Settings.EMPTY); } - private ClusterState createInitialClusterState(AllocationService service, Settings idxSettings, Settings clusterSettings) { + static ClusterState createInitialClusterState(AllocationService service, Settings idxSettings, Settings clusterSettings) { Metadata.Builder metadata = Metadata.builder(); metadata.persistentSettings(clusterSettings); final Settings.Builder indexSettings = settings(Version.CURRENT).put(idxSettings); @@ -467,98 +462,4 @@ public void testMixedModeRemoteStoreAllocation() { decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation); assertEquals(decision.toString(), Type.NO, decision.type()); } - - public void testSearchReplicaRoutingDedicatedIncludes() { - FeatureFlagSetter.set(READER_WRITER_SPLIT_EXPERIMENTAL); - // we aren't using a settingsModule here so we need to set feature flag gated setting - Set> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), settings); - Settings initialSettings = Settings.builder() - .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2") - .build(); - - FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings); - AllocationDeciders allocationDeciders = new AllocationDeciders( - Arrays.asList( - filterAllocationDecider, - new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), - new ReplicaAfterPrimaryActiveAllocationDecider() - ) - ); - AllocationService service = new AllocationService( - allocationDeciders, - new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), - EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE - ); - ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY); - RoutingTable routingTable = state.routingTable(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); - allocation.debugDecision(true); - - ShardRouting searchReplica = ShardRouting.newUnassigned( - routingTable.index("sourceIndex").shard(0).shardId(), - false, - true, - RecoverySource.PeerRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "") - ); - - ShardRouting regularReplica = ShardRouting.newUnassigned( - routingTable.index("sourceIndex").shard(0).shardId(), - false, - false, - RecoverySource.PeerRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") - ); - - ShardRouting primary = ShardRouting.newUnassigned( - routingTable.index("sourceIndex").shard(0).shardId(), - true, - false, - RecoverySource.PeerRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") - ); - - Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate( - searchReplica, - state.getRoutingNodes().node("node2"), - allocation - ); - assertEquals(decision.toString(), Type.YES, decision.type()); - decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); - assertEquals(decision.toString(), Type.YES, decision.type()); - - decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - - decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - - Settings updatedSettings = Settings.builder() - .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2") - .build(); - clusterSettings.applySettings(updatedSettings); - - decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation); - assertEquals(decision.toString(), Type.YES, decision.type()); - decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - - decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); - assertEquals(decision.toString(), Type.YES, decision.type()); - - decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); - assertEquals(decision.toString(), Type.YES, decision.type()); - decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); - assertEquals(decision.toString(), Type.NO, decision.type()); - } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java new file mode 100644 index 0000000000000..e0521e4682130 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.EmptyClusterInfoService; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.snapshots.EmptySnapshotsInfoService; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; + +public class SearchReplicaAllocationDeciderTest extends OpenSearchAllocationTestCase { + + public void testSearchReplicaRoutingDedicatedIncludes() { + // we aren't using a settingsModule here so we need to set feature flag gated setting + Set> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), settings); + Settings initialSettings = Settings.builder() + .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2") + .build(); + + SearchReplicaAllocationDecider filterAllocationDecider = new SearchReplicaAllocationDecider(initialSettings, clusterSettings); + AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList( + filterAllocationDecider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider() + ) + ); + AllocationService service = new AllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + ClusterState state = FilterAllocationDeciderTests.createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY); + RoutingTable routingTable = state.routingTable(); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + + ShardRouting searchReplica = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + false, + true, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "") + ); + + ShardRouting regularReplica = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + false, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + + ShardRouting primary = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + true, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + + Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate( + searchReplica, + state.getRoutingNodes().node("node2"), + allocation + ); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + + Settings updatedSettings = Settings.builder() + .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2") + .build(); + clusterSettings.applySettings(updatedSettings); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canRemain(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canRemain(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canRemain(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + } +} From 0855d9f136beac7f3ef2602bdc74dfc9017c155e Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 30 Aug 2024 16:59:05 -0700 Subject: [PATCH 3/7] Fix license header and description on SearchReplicaAllocationDecider Signed-off-by: Marc Handalian --- .../SearchReplicaAllocationDecider.java | 55 ++----------------- 1 file changed, 5 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java index 7ec18ac398938..1f2e6ff47e92b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java @@ -6,30 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.cluster.routing.allocation.decider; import org.opensearch.cluster.node.DiscoveryNode; @@ -49,32 +25,11 @@ import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR; /** - * This {@link AllocationDecider} control shard allocation by include and - * exclude filters via dynamic cluster and index routing settings. - *

- * This filter is used to make explicit decision on which nodes certain shard - * can / should be allocated. The decision if a shard can be allocated, must not - * be allocated or should be allocated is based on either cluster wide dynamic - * settings ({@code cluster.routing.allocation.*}) or index specific dynamic - * settings ({@code index.routing.allocation.*}). All of those settings can be - * changed at runtime via the cluster or the index update settings API. - *

- * Note: Cluster settings are applied first and will override index specific - * settings such that if a shard can be allocated according to the index routing - * settings it wont be allocated on a node if the cluster specific settings - * would disallow the allocation. Filters are applied in the following order: - *
    - *
  1. {@code required} - filters required allocations. - * If any {@code required} filters are set the allocation is denied if the index is not in the set of {@code required} to allocate - * on the filtered node
  2. - *
  3. {@code include} - filters "allowed" allocations. - * If any {@code include} filters are set the allocation is denied if the index is not in the set of {@code include} filters for - * the filtered node
  4. - *
  5. {@code exclude} - filters "prohibited" allocations. - * If any {@code exclude} filters are set the allocation is denied if the index is in the set of {@code exclude} filters for the - * filtered node
  6. - *
- * + * This allocation decider is similar to FilterAllocationDecider but provides + * the option to filter specifically for search replicas. + * The filter behaves similar to an include for any defined node attribute. + * A search replica can be allocated to only nodes with one of the specified attributes while + * other shard types will be rejected from nodes with any othe attributes. * @opensearch.internal */ public class SearchReplicaAllocationDecider extends AllocationDecider { From 46fdb9bfbe0cd7e263b4ed3e511a6dc9503c3e91 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 30 Aug 2024 17:01:41 -0700 Subject: [PATCH 4/7] Pr feedback. Signed-off-by: Marc Handalian --- .../decider/SearchReplicaAllocationDecider.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java index 1f2e6ff47e92b..955c396bee4da 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java @@ -66,16 +66,9 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl } private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { - Decision decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation); - if (decision != null) return decision; - - return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); - } - - private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) { if (searchReplicaIncludeFilters != null) { final boolean match = searchReplicaIncludeFilters.match(node); - if (match == false && routing.isSearchOnly()) { + if (match == false && shardRouting.isSearchOnly()) { return allocation.decision( Decision.NO, NAME, @@ -85,7 +78,7 @@ private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, Discov ); } // filter will only apply to search replicas - if (routing.isSearchOnly() == false && match) { + if (shardRouting.isSearchOnly() == false && match) { return allocation.decision( Decision.NO, NAME, @@ -95,7 +88,7 @@ private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, Discov ); } } - return null; + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); } private void setSearchReplicaIncludeFilters(Map filters) { From ec112a0c73399924b80ed4f20a520aaaf68c91d3 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 30 Aug 2024 17:10:43 -0700 Subject: [PATCH 5/7] Fix class name to pass precommit checks Signed-off-by: Marc Handalian --- .../indices/settings/SearchOnlyReplicaFeatureFlagIT.java | 2 +- ...eciderTest.java => SearchReplicaAllocationDeciderTests.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename server/src/test/java/org/opensearch/cluster/routing/allocation/decider/{SearchReplicaAllocationDeciderTest.java => SearchReplicaAllocationDeciderTests.java} (98%) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java index f3fc569f1a3fd..ef18cff7e5b29 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java @@ -17,7 +17,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; -import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1) public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java similarity index 98% rename from server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java rename to server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java index e0521e4682130..8d4f4cdee26cc 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTest.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java @@ -30,7 +30,7 @@ import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; -public class SearchReplicaAllocationDeciderTest extends OpenSearchAllocationTestCase { +public class SearchReplicaAllocationDeciderTests extends OpenSearchAllocationTestCase { public void testSearchReplicaRoutingDedicatedIncludes() { // we aren't using a settingsModule here so we need to set feature flag gated setting From 02977be573d00446fbc4801dd69c3480518600fd Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sat, 31 Aug 2024 14:44:16 -0700 Subject: [PATCH 6/7] Refactor all search replica create/update tests to a single OpenSearchSingleNodeTestCase. Signed-off-by: Marc Handalian --- .../org/opensearch/cluster/ClusterModule.java | 2 +- .../MetadataCreateIndexServiceTests.java | 68 ---- .../metadata/SearchOnlyReplicaTests.java | 295 ++++++++++-------- 3 files changed, 168 insertions(+), 197 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 404b5c9882973..d9bb87a517927 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -381,7 +381,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); - if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) { + if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) { addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings)); } addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 410c8a3d41940..67ab6bed07294 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -137,12 +137,10 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING; -import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; @@ -156,7 +154,6 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; -import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY; @@ -2501,71 +2498,6 @@ public void testApplyContextWithSettingsOverlap() throws IOException { } } - public void testDefaultSearchReplicasSetting() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); - Settings templateSettings = Settings.EMPTY; - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); - final Settings.Builder requestSettings = Settings.builder(); - request.settings(requestSettings.build()); - Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, - request, - templateSettings, - null, - Settings.EMPTY, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - randomShardLimitService(), - Collections.emptySet(), - clusterSettings - ); - assertFalse(INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(indexSettings)); - } - - public void testSearchReplicasValidationWithSegmentReplication() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); - Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); - final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2); - request.settings(requestSettings.build()); - Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, - request, - templateSettings, - null, - Settings.EMPTY, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - randomShardLimitService(), - Collections.emptySet(), - clusterSettings - ); - assertEquals("2", indexSettings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS)); - assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); - } - - public void testSearchReplicasValidationWithDocumentReplication() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); - Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); - final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2); - request.settings(requestSettings.build()); - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> aggregateIndexSettings( - ClusterState.EMPTY_STATE, - request, - templateSettings, - null, - Settings.EMPTY, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - randomShardLimitService(), - Collections.emptySet(), - clusterSettings - ) - ); - assertEquals("To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", exception.getMessage()); - } - private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*"); configurator.accept(builder); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java index b1dd397c97218..3d11193a07884 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java @@ -24,9 +24,11 @@ import org.opensearch.indices.ShardLimitValidator; import org.opensearch.indices.cluster.ClusterStateChanges; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.util.ArrayList; import java.util.Collections; @@ -40,157 +42,194 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; -public class SearchOnlyReplicaTests extends OpenSearchTestCase { +public class SearchOnlyReplicaTests extends OpenSearchSingleNodeTestCase { - public void testUpdateSearchReplicaCount() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private ThreadPool threadPool; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + terminate(threadPool); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .build(); + } + + public void testCreateWithDefaultSearchReplicasSetting() { final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + ClusterState state = createIndexWithSettings(cluster, Settings.builder().build()); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(1, indexShardRoutingTable.replicaShards().size()); + assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(1, indexShardRoutingTable.writerReplicas().size()); + } - try { - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); - // node for search replicas - we'll start with 1 and add another - for (int i = 0; i < 2; i++) { - allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); - } - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - - CreateIndexRequest request = new CreateIndexRequest( - "index", + public void testSearchReplicasValidationWithDocumentReplication() { + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + RuntimeException exception = expectThrows( + RuntimeException.class, + () -> createIndexWithSettings( + cluster, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); - assertEquals(1, indexShardRoutingTable.replicaShards().size()); - assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); - assertEquals(0, indexShardRoutingTable.writerReplicas().size()); - - // add another replica - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) - ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); - assertEquals(2, indexShardRoutingTable.replicaShards().size()); - assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); - assertEquals(0, indexShardRoutingTable.writerReplicas().size()); - - // remove all replicas - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) - ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); - assertEquals(0, indexShardRoutingTable.replicaShards().size()); - assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); - assertEquals(0, indexShardRoutingTable.writerReplicas().size()); - } finally { - terminate(threadPool); - } + ) + ); + assertEquals( + "To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", + exception.getCause().getMessage() + ); } - public void testUpdateSearchReplicasOverShardLimit() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + public void testUpdateSearchReplicaCount() { final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - try { - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); + ClusterState state = createIndexWithSettings( + cluster, + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(1, indexShardRoutingTable.replicaShards().size()); + assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + + // add another replica + state = cluster.updateSettings( + state, + new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) + ); + rerouteUntilActive(state, cluster); + indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(2, indexShardRoutingTable.replicaShards().size()); + assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + + // remove all replicas + state = cluster.updateSettings( + state, + new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) + ); + rerouteUntilActive(state, cluster); + indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(0, indexShardRoutingTable.replicaShards().size()); + assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + } + private ClusterState createIndexWithSettings(ClusterStateChanges cluster, Settings settings) { + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + // node for search replicas - we'll start with 1 and add another + for (int i = 0; i < 2; i++) { allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + } + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + CreateIndexRequest request = new CreateIndexRequest("index", settings).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + return state; + } - CreateIndexRequest request = new CreateIndexRequest( - "index", - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - - // add another replica - ClusterState finalState = state; - Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - expectThrows( - RuntimeException.class, - () -> cluster.updateSettings( - finalState, - new UpdateSettingsRequest("index").settings( - Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() - ) - ) - ); + public void testUpdateSearchReplicasOverShardLimit() { + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - } finally { - terminate(threadPool); - } + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + + // add another replica + ClusterState finalState = state; + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); + expectThrows( + RuntimeException.class, + () -> cluster.updateSettings( + finalState, + new UpdateSettingsRequest("index").settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() + ) + ) + ); } public void testUpdateSearchReplicasOnDocrepCluster() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - try { - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); - - allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); - - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - - CreateIndexRequest request = new CreateIndexRequest( - "index", - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) - .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - - // add another replica - ClusterState finalState = state; - Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - expectThrows( - RuntimeException.class, - () -> cluster.updateSettings( - finalState, - new UpdateSettingsRequest("index").settings( - Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() - ) + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + + // add another replica + ClusterState finalState = state; + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); + expectThrows( + RuntimeException.class, + () -> cluster.updateSettings( + finalState, + new UpdateSettingsRequest("index").settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() ) - ); - } finally { - terminate(threadPool); - } + ) + ); + } private static void rerouteUntilActive(ClusterState state, ClusterStateChanges cluster) { From 1efafc07b6648c5bde9f8228fe17601ac29915d1 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 4 Sep 2024 11:10:44 -0700 Subject: [PATCH 7/7] remove changelog entry Signed-off-by: Marc Handalian --- CHANGELOG.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc9d5bd7a496d..71868de93812e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709)) - [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028)) +- [Workload Management] Add Update QueryGroup API Logic ([#14775](https://github.com/opensearch-project/OpenSearch/pull/14775)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) - Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774)) - Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) @@ -38,17 +39,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) - Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494)) - Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290)) -- Add fieldType to AbstractQueryBuilder and FieldSortBuilder ([#15328](https://github.com/opensearch-project/OpenSearch/pull/15328))) - [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) - [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) +- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363)) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) - [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))) - Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409)) - Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557)) - Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568)) - [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218)) -- [Reader Writer Separation] Add allocation filter for search replicas ([#15455](https://github.com/opensearch-project/OpenSearch/pull/15455)) +- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010)) +- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508)) +- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131)) +- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) @@ -102,6 +106,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix split response processor not included in allowlist ([#15393](https://github.com/opensearch-project/OpenSearch/pull/15393)) - Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394)) - Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069)) +- Fix terms query on wildcard field returns nothing ([#15607](https://github.com/opensearch-project/OpenSearch/pull/15607)) ### Security