Skip to content

Commit 45e354f

Browse files
committed
Introduce allocation filter to control placement of search only replicas
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 1e9fdb4 commit 45e354f

File tree

6 files changed

+292
-2
lines changed

6 files changed

+292
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3737
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
3838
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
3939
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
40+
- [Reader Writer Separation] Add allocation filter for search replicas ([#15455](https://github.com/opensearch-project/OpenSearch/pull/15455))
4041

4142
### Dependencies
4243
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.allocation;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
13+
import org.opensearch.cluster.routing.ShardRouting;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.indices.replication.common.ReplicationType;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
19+
import java.util.List;
20+
import java.util.stream.Collectors;
21+
22+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
23+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
24+
25+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
26+
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
27+
28+
@Override
29+
protected Settings featureFlagSettings() {
30+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
31+
}
32+
33+
public void testSearchReplicaDedicatedIncludes() {
34+
List<String> nodesIds = internalCluster().startNodes(3);
35+
final String node_0 = nodesIds.get(0);
36+
final String node_1 = nodesIds.get(1);
37+
final String node_2 = nodesIds.get(2);
38+
assertEquals(3, cluster().size());
39+
40+
client().admin()
41+
.cluster()
42+
.prepareUpdateSettings()
43+
.setTransientSettings(
44+
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
45+
)
46+
.execute()
47+
.actionGet();
48+
49+
createIndex(
50+
"test",
51+
Settings.builder()
52+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
53+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
55+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
56+
.build()
57+
);
58+
ensureGreen("test");
59+
// ensure primary is not on node 0 or 1,
60+
IndexShardRoutingTable routingTable = getRoutingTable();
61+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
62+
63+
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
64+
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;
65+
66+
// set the included nodes to the other open node, search replica should relocate to that node.
67+
client().admin()
68+
.cluster()
69+
.prepareUpdateSettings()
70+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
71+
.execute()
72+
.actionGet();
73+
ensureGreen("test");
74+
75+
routingTable = getRoutingTable();
76+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
77+
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
78+
}
79+
80+
public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
81+
List<String> nodesIds = internalCluster().startNodes(3);
82+
final String node_0 = nodesIds.get(0);
83+
final String node_1 = nodesIds.get(1);
84+
final String node_2 = nodesIds.get(2);
85+
assertEquals(3, cluster().size());
86+
87+
// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
88+
client().admin()
89+
.cluster()
90+
.prepareUpdateSettings()
91+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
92+
.execute()
93+
.actionGet();
94+
95+
logger.info("--> creating an index with no replicas");
96+
createIndex(
97+
"test",
98+
Settings.builder()
99+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
100+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
101+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
102+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
103+
.build()
104+
);
105+
ensureYellowAndNoInitializingShards("test");
106+
IndexShardRoutingTable routingTable = getRoutingTable();
107+
assertEquals(2, routingTable.searchOnlyReplicas().size());
108+
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
109+
.stream()
110+
.filter(ShardRouting::assignedToNode)
111+
.collect(Collectors.toList());
112+
assertEquals(1, assignedSearchShards.size());
113+
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
114+
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
115+
}
116+
117+
private IndexShardRoutingTable getRoutingTable() {
118+
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
119+
return routingTable;
120+
}
121+
122+
private String getNodeName(String id) {
123+
return getClusterState().nodes().get(id).getName();
124+
}
125+
}

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
1919
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
20+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
2021

2122
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
2223
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
@@ -53,4 +54,15 @@ public void testUpdateFeatureFlagDisabled() {
5354
});
5455
assertTrue(settingsException.getMessage().contains("unknown setting"));
5556
}
57+
58+
public void testFilterAllocationSettingNotRegistered() {
59+
expectThrows(SettingsException.class, () -> {
60+
client().admin()
61+
.cluster()
62+
.prepareUpdateSettings()
63+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
64+
.execute()
65+
.actionGet();
66+
});
67+
}
5668
}

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.common.settings.Setting;
4545
import org.opensearch.common.settings.Setting.Property;
4646
import org.opensearch.common.settings.Settings;
47+
import org.opensearch.common.util.FeatureFlags;
4748
import org.opensearch.node.remotestore.RemoteStoreNodeService;
4849

4950
import java.util.Map;
@@ -88,6 +89,8 @@ public class FilterAllocationDecider extends AllocationDecider {
8889
private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
8990
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
9091
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
92+
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
93+
9194
public static final Setting.AffixSetting<String> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
9295
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".",
9396
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
@@ -100,7 +103,12 @@ public class FilterAllocationDecider extends AllocationDecider {
100103
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".",
101104
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
102105
);
106+
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
107+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
108+
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
109+
);
103110

111+
private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;
104112
private volatile DiscoveryNodeFilters clusterRequireFilters;
105113
private volatile DiscoveryNodeFilters clusterIncludeFilters;
106114
private volatile DiscoveryNodeFilters clusterExcludeFilters;
@@ -113,7 +121,6 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
113121
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
114122
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
115123
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);
116-
117124
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
118125
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
119126
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
@@ -122,6 +129,15 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
122129
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
123130
this::setCompatibilityMode
124131
);
132+
133+
if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) {
134+
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
135+
clusterSettings.addAffixMapUpdateConsumer(
136+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
137+
this::setSearchReplicaIncludeFilters,
138+
(a, b) -> {}
139+
);
140+
}
125141
}
126142

127143
private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
@@ -203,6 +219,9 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou
203219
decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
204220
if (decision != null) return decision;
205221

222+
decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation);
223+
if (decision != null) return decision;
224+
206225
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
207226
}
208227

@@ -294,6 +313,32 @@ private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation alloc
294313
return null;
295314
}
296315

316+
private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) {
317+
if (searchReplicaIncludeFilters != null) {
318+
final boolean match = searchReplicaIncludeFilters.match(node);
319+
if (match == false && routing.isSearchOnly()) {
320+
return allocation.decision(
321+
Decision.NO,
322+
NAME,
323+
"node does not match shard setting [%s] filters [%s]",
324+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
325+
searchReplicaIncludeFilters
326+
);
327+
}
328+
// filter will only apply to search replicas
329+
if (routing.isSearchOnly() == false && match) {
330+
return allocation.decision(
331+
Decision.NO,
332+
NAME,
333+
"only search replicas can be allocated to node with setting [%s] filters [%s]",
334+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
335+
searchReplicaIncludeFilters
336+
);
337+
}
338+
}
339+
return null;
340+
}
341+
297342
private void setClusterRequireFilters(Map<String, String> filters) {
298343
clusterRequireFilters = DiscoveryNodeFilters.trimTier(
299344
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterRequireFilters, AND, filters)
@@ -311,4 +356,10 @@ private void setClusterExcludeFilters(Map<String, String> filters) {
311356
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterExcludeFilters, OR, filters)
312357
);
313358
}
359+
360+
private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
361+
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
362+
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
363+
);
364+
}
314365
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,8 @@ public void apply(Settings value, Settings current, Settings previous) {
805805
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
806806
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
807807
)
808-
)
808+
),
809+
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
810+
List.of(FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
809811
);
810812
}

0 commit comments

Comments
 (0)