Skip to content

Commit 3c3457d

Browse files
[RW Separation] Introduce allocation filter to control placement of search only replicas (#15455)
* Introduce allocation filter to control placement of search only replicas Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Add a new decider rather than updating the existing FilterAllocationDecider Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix license header and description on SearchReplicaAllocationDecider Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Pr feedback. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix class name to pass precommit checks Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Refactor all search replica create/update tests to a single OpenSearchSingleNodeTestCase. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * remove changelog entry Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com> (cherry picked from commit b345439) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 153f978 commit 3c3457d

File tree

9 files changed

+546
-133
lines changed

9 files changed

+546
-133
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.allocation;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
13+
import org.opensearch.cluster.routing.ShardRouting;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.indices.replication.common.ReplicationType;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
19+
import java.util.List;
20+
import java.util.stream.Collectors;
21+
22+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
23+
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
24+
25+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
26+
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
27+
28+
@Override
29+
protected Settings featureFlagSettings() {
30+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
31+
}
32+
33+
public void testSearchReplicaDedicatedIncludes() {
34+
List<String> nodesIds = internalCluster().startNodes(3);
35+
final String node_0 = nodesIds.get(0);
36+
final String node_1 = nodesIds.get(1);
37+
final String node_2 = nodesIds.get(2);
38+
assertEquals(3, cluster().size());
39+
40+
client().admin()
41+
.cluster()
42+
.prepareUpdateSettings()
43+
.setTransientSettings(
44+
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
45+
)
46+
.execute()
47+
.actionGet();
48+
49+
createIndex(
50+
"test",
51+
Settings.builder()
52+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
53+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
55+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
56+
.build()
57+
);
58+
ensureGreen("test");
59+
// ensure primary is not on node 0 or 1,
60+
IndexShardRoutingTable routingTable = getRoutingTable();
61+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
62+
63+
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
64+
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;
65+
66+
// set the included nodes to the other open node, search replica should relocate to that node.
67+
client().admin()
68+
.cluster()
69+
.prepareUpdateSettings()
70+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
71+
.execute()
72+
.actionGet();
73+
ensureGreen("test");
74+
75+
routingTable = getRoutingTable();
76+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
77+
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
78+
}
79+
80+
public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
81+
List<String> nodesIds = internalCluster().startNodes(3);
82+
final String node_0 = nodesIds.get(0);
83+
final String node_1 = nodesIds.get(1);
84+
final String node_2 = nodesIds.get(2);
85+
assertEquals(3, cluster().size());
86+
87+
// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
88+
client().admin()
89+
.cluster()
90+
.prepareUpdateSettings()
91+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
92+
.execute()
93+
.actionGet();
94+
95+
logger.info("--> creating an index with no replicas");
96+
createIndex(
97+
"test",
98+
Settings.builder()
99+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
100+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
101+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
102+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
103+
.build()
104+
);
105+
ensureYellowAndNoInitializingShards("test");
106+
IndexShardRoutingTable routingTable = getRoutingTable();
107+
assertEquals(2, routingTable.searchOnlyReplicas().size());
108+
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
109+
.stream()
110+
.filter(ShardRouting::assignedToNode)
111+
.collect(Collectors.toList());
112+
assertEquals(1, assignedSearchShards.size());
113+
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
114+
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
115+
}
116+
117+
private IndexShardRoutingTable getRoutingTable() {
118+
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
119+
return routingTable;
120+
}
121+
122+
private String getNodeName(String id) {
123+
return getClusterState().nodes().get(id).getName();
124+
}
125+
}

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
1818
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
19+
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
1920

2021
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
2122
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
@@ -52,4 +53,15 @@ public void testUpdateFeatureFlagDisabled() {
5253
});
5354
assertTrue(exception.getMessage().contains("unknown setting"));
5455
}
56+
57+
public void testFilterAllocationSettingNotRegistered() {
58+
expectThrows(SettingsException.class, () -> {
59+
client().admin()
60+
.cluster()
61+
.prepareUpdateSettings()
62+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
63+
.execute()
64+
.actionGet();
65+
});
66+
}
5567
}

server/src/main/java/org/opensearch/cluster/ClusterModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
7575
import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
7676
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
77+
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
7778
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
7879
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
7980
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
@@ -84,6 +85,7 @@
8485
import org.opensearch.common.settings.Setting;
8586
import org.opensearch.common.settings.Setting.Property;
8687
import org.opensearch.common.settings.Settings;
88+
import org.opensearch.common.util.FeatureFlags;
8789
import org.opensearch.common.util.concurrent.ThreadContext;
8890
import org.opensearch.common.util.set.Sets;
8991
import org.opensearch.core.ParseField;
@@ -376,6 +378,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
376378
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
377379
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
378380
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
381+
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
382+
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
383+
}
379384
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
380385
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
381386
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation.decider;
10+
11+
import org.opensearch.cluster.node.DiscoveryNode;
12+
import org.opensearch.cluster.node.DiscoveryNodeFilters;
13+
import org.opensearch.cluster.routing.RoutingNode;
14+
import org.opensearch.cluster.routing.ShardRouting;
15+
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
16+
import org.opensearch.common.settings.ClusterSettings;
17+
import org.opensearch.common.settings.Setting;
18+
import org.opensearch.common.settings.Setting.Property;
19+
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.node.remotestore.RemoteStoreNodeService;
21+
22+
import java.util.Map;
23+
24+
import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
25+
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
26+
27+
/**
28+
* This allocation decider is similar to FilterAllocationDecider but provides
29+
* the option to filter specifically for search replicas.
30+
* The filter behaves similar to an include for any defined node attribute.
31+
* A search replica can be allocated to only nodes with one of the specified attributes while
32+
* other shard types will be rejected from nodes with any othe attributes.
33+
* @opensearch.internal
34+
*/
35+
public class SearchReplicaAllocationDecider extends AllocationDecider {
36+
37+
public static final String NAME = "filter";
38+
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
39+
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
40+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
41+
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
42+
);
43+
44+
private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;
45+
46+
private volatile RemoteStoreNodeService.Direction migrationDirection;
47+
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;
48+
49+
public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
50+
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
51+
clusterSettings.addAffixMapUpdateConsumer(
52+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
53+
this::setSearchReplicaIncludeFilters,
54+
(a, b) -> {}
55+
);
56+
}
57+
58+
@Override
59+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
60+
return shouldFilter(shardRouting, node.node(), allocation);
61+
}
62+
63+
@Override
64+
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
65+
return shouldFilter(shardRouting, node.node(), allocation);
66+
}
67+
68+
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
69+
if (searchReplicaIncludeFilters != null) {
70+
final boolean match = searchReplicaIncludeFilters.match(node);
71+
if (match == false && shardRouting.isSearchOnly()) {
72+
return allocation.decision(
73+
Decision.NO,
74+
NAME,
75+
"node does not match shard setting [%s] filters [%s]",
76+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
77+
searchReplicaIncludeFilters
78+
);
79+
}
80+
// filter will only apply to search replicas
81+
if (shardRouting.isSearchOnly() == false && match) {
82+
return allocation.decision(
83+
Decision.NO,
84+
NAME,
85+
"only search replicas can be allocated to node with setting [%s] filters [%s]",
86+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
87+
searchReplicaIncludeFilters
88+
);
89+
}
90+
}
91+
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
92+
}
93+
94+
private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
95+
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
96+
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
97+
);
98+
}
99+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
7878
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
7979
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
80+
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
8081
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
8182
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
8283
import org.opensearch.cluster.service.ClusterApplierService;
@@ -816,6 +817,8 @@ public void apply(Settings value, Settings current, Settings previous) {
816817
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
817818
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
818819
)
819-
)
820+
),
821+
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
822+
List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
820823
);
821824
}

server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,10 @@
138138
import static java.util.Collections.singleton;
139139
import static java.util.Collections.singletonList;
140140
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING;
141-
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING;
142141
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
143142
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
144143
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
145144
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
146-
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
147145
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
148146
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
149147
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
@@ -157,7 +155,6 @@
157155
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards;
158156
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings;
159157
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases;
160-
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL;
161158
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
162159
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
163160
import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY;

0 commit comments

Comments
 (0)