Skip to content

Commit 264cc15

Browse files
committed
Introduce allocation filter to control placement of search only replicas
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent 6e563e5 commit 264cc15

File tree

7 files changed

+250
-4
lines changed

7 files changed

+250
-4
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.allocation;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.FeatureFlags;
15+
import org.opensearch.indices.replication.common.ReplicationType;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
18+
import java.util.List;
19+
20+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
21+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
22+
23+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
24+
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
25+
26+
@Override
27+
protected Settings featureFlagSettings() {
28+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
29+
}
30+
31+
public void testSearchReplicaDedicatedIncludes() {
32+
logger.info("--> starting 2 nodes");
33+
List<String> nodesIds = internalCluster().startNodes(3);
34+
final String node_0 = nodesIds.get(0);
35+
final String node_1 = nodesIds.get(1);
36+
final String node_2 = nodesIds.get(2);
37+
assertEquals(3, cluster().size());
38+
39+
client().admin()
40+
.cluster()
41+
.prepareUpdateSettings()
42+
.setTransientSettings(
43+
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
44+
)
45+
.execute()
46+
.actionGet();
47+
48+
logger.info("--> creating an index with no replicas");
49+
createIndex(
50+
"test",
51+
Settings.builder()
52+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
53+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
55+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
56+
.build()
57+
);
58+
ensureGreen("test");
59+
// ensure primary is not on 1 or 2,
60+
IndexShardRoutingTable routingTable = getRoutingTable();
61+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
62+
63+
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
64+
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;
65+
66+
// set the included nodes to the other open node.
67+
client().admin()
68+
.cluster()
69+
.prepareUpdateSettings()
70+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
71+
.execute()
72+
.actionGet();
73+
ensureGreen("test");
74+
75+
routingTable = getRoutingTable();
76+
logger.info(routingTable);
77+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
78+
logger.info(routingTable.replicaShards());
79+
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
80+
}
81+
82+
private IndexShardRoutingTable getRoutingTable() {
83+
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
84+
return routingTable;
85+
}
86+
87+
private String getNodeName(String id) {
88+
return getClusterState().nodes().get(id).getName();
89+
}
90+
}

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
1919
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
2020
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
21+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
2122

2223
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
2324
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
@@ -66,4 +67,15 @@ public void testUpdateFeatureFlagDisabled() {
6667
});
6768
assertTrue(settingsException.getMessage().contains("unknown setting"));
6869
}
70+
71+
public void testFilterAllocationSettingNotRegistered() {
72+
expectThrows(SettingsException.class, () -> {
73+
client().admin()
74+
.cluster()
75+
.prepareUpdateSettings()
76+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
77+
.execute()
78+
.actionGet();
79+
});
80+
}
6981
}

server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public List<ShardRouting> getShards() {
212212
}
213213

214214
/**
215-
* Returns a {@link List} of the search only shards in the RoutingTable
215+
* Returns a {@link List} of the search only replicas in the RoutingTable
216216
*
217217
* @return a {@link List} of shards
218218
*/

server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ private ShardRouting initializeTargetRelocatingShard() {
154154
relocatingNodeId,
155155
currentNodeId,
156156
primary,
157+
searchOnly,
157158
ShardRoutingState.INITIALIZING,
158159
PeerRecoverySource.INSTANCE,
159160
unassignedInfo,

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.common.settings.Setting;
4545
import org.opensearch.common.settings.Setting.Property;
4646
import org.opensearch.common.settings.Settings;
47+
import org.opensearch.common.util.FeatureFlags;
4748
import org.opensearch.node.remotestore.RemoteStoreNodeService;
4849

4950
import java.util.Map;
@@ -84,10 +85,12 @@
8485
public class FilterAllocationDecider extends AllocationDecider {
8586

8687
public static final String NAME = "filter";
87-
8888
private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
8989
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
9090
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
91+
92+
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
93+
9194
public static final Setting.AffixSetting<String> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
9295
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".",
9396
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
@@ -100,7 +103,12 @@ public class FilterAllocationDecider extends AllocationDecider {
100103
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".",
101104
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
102105
);
106+
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
107+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
108+
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
109+
);
103110

111+
private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;
104112
private volatile DiscoveryNodeFilters clusterRequireFilters;
105113
private volatile DiscoveryNodeFilters clusterIncludeFilters;
106114
private volatile DiscoveryNodeFilters clusterExcludeFilters;
@@ -113,7 +121,6 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
113121
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
114122
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
115123
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);
116-
117124
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
118125
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
119126
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
@@ -122,6 +129,15 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
122129
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
123130
this::setCompatibilityMode
124131
);
132+
133+
if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) {
134+
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
135+
clusterSettings.addAffixMapUpdateConsumer(
136+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
137+
this::setSearchReplicaIncludeFilters,
138+
(a, b) -> {}
139+
);
140+
}
125141
}
126142

127143
private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
@@ -203,6 +219,9 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou
203219
decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
204220
if (decision != null) return decision;
205221

222+
decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation);
223+
if (decision != null) return decision;
224+
206225
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
207226
}
208227

@@ -294,6 +313,32 @@ private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation alloc
294313
return null;
295314
}
296315

316+
private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) {
317+
if (searchReplicaIncludeFilters != null) {
318+
final boolean match = searchReplicaIncludeFilters.match(node);
319+
if (match == false && routing.isSearchOnly()) {
320+
return allocation.decision(
321+
Decision.NO,
322+
NAME,
323+
"node does not match shard setting [%s] filters [%s]",
324+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
325+
searchReplicaIncludeFilters
326+
);
327+
}
328+
// filter will only apply to search replicas
329+
if (routing.isSearchOnly() == false && match) {
330+
return allocation.decision(
331+
Decision.NO,
332+
NAME,
333+
"only search replicas can be allocated to node with setting [%s] filters [%s]",
334+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
335+
searchReplicaIncludeFilters
336+
);
337+
}
338+
}
339+
return null;
340+
}
341+
297342
private void setClusterRequireFilters(Map<String, String> filters) {
298343
clusterRequireFilters = DiscoveryNodeFilters.trimTier(
299344
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterRequireFilters, AND, filters)
@@ -311,4 +356,10 @@ private void setClusterExcludeFilters(Map<String, String> filters) {
311356
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterExcludeFilters, OR, filters)
312357
);
313358
}
359+
360+
private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
361+
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
362+
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
363+
);
364+
}
314365
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,8 @@ public void apply(Settings value, Settings current, Settings previous) {
804804
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
805805
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
806806
)
807-
)
807+
),
808+
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
809+
List.of(FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
808810
);
809811
}

server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,4 +462,94 @@ public void testMixedModeRemoteStoreAllocation() {
462462
decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation);
463463
assertEquals(decision.toString(), Type.NO, decision.type());
464464
}
465+
466+
public void testSearchReplicaRoutingDedicatedIncludes() {
467+
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
468+
Settings initialSettings = Settings.builder()
469+
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2")
470+
.build();
471+
472+
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings);
473+
AllocationDeciders allocationDeciders = new AllocationDeciders(
474+
Arrays.asList(
475+
filterAllocationDecider,
476+
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
477+
new ReplicaAfterPrimaryActiveAllocationDecider()
478+
)
479+
);
480+
AllocationService service = new AllocationService(
481+
allocationDeciders,
482+
new TestGatewayAllocator(),
483+
new BalancedShardsAllocator(Settings.EMPTY),
484+
EmptyClusterInfoService.INSTANCE,
485+
EmptySnapshotsInfoService.INSTANCE
486+
);
487+
ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
488+
RoutingTable routingTable = state.routingTable();
489+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
490+
allocation.debugDecision(true);
491+
492+
ShardRouting searchReplica = ShardRouting.newUnassigned(
493+
routingTable.index("sourceIndex").shard(0).shardId(),
494+
false,
495+
true,
496+
RecoverySource.PeerRecoverySource.INSTANCE,
497+
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
498+
);
499+
500+
ShardRouting regularReplica = ShardRouting.newUnassigned(
501+
routingTable.index("sourceIndex").shard(0).shardId(),
502+
false,
503+
false,
504+
RecoverySource.PeerRecoverySource.INSTANCE,
505+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
506+
);
507+
508+
ShardRouting primary = ShardRouting.newUnassigned(
509+
routingTable.index("sourceIndex").shard(0).shardId(),
510+
true,
511+
false,
512+
RecoverySource.PeerRecoverySource.INSTANCE,
513+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
514+
);
515+
516+
Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
517+
searchReplica,
518+
state.getRoutingNodes().node("node2"),
519+
allocation
520+
);
521+
assertEquals(decision.toString(), Type.YES, decision.type());
522+
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
523+
assertEquals(decision.toString(), Type.YES, decision.type());
524+
525+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
526+
assertEquals(decision.toString(), Type.NO, decision.type());
527+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
528+
assertEquals(decision.toString(), Type.NO, decision.type());
529+
530+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
531+
assertEquals(decision.toString(), Type.NO, decision.type());
532+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
533+
assertEquals(decision.toString(), Type.NO, decision.type());
534+
535+
Settings updatedSettings = Settings.builder()
536+
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2")
537+
.build();
538+
clusterSettings.applySettings(updatedSettings);
539+
540+
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation);
541+
assertEquals(decision.toString(), Type.YES, decision.type());
542+
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
543+
assertEquals(decision.toString(), Type.NO, decision.type());
544+
545+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
546+
assertEquals(decision.toString(), Type.NO, decision.type());
547+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
548+
assertEquals(decision.toString(), Type.YES, decision.type());
549+
550+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
551+
assertEquals(decision.toString(), Type.YES, decision.type());
552+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
553+
assertEquals(decision.toString(), Type.NO, decision.type());
554+
}
465555
}

0 commit comments

Comments
 (0)