Skip to content

Commit 9566665

Browse files
committed
Introduce allocation filter to control placement of search only replicas
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent c5d29a9 commit 9566665

File tree

5 files changed

+257
-3
lines changed

5 files changed

+257
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.allocation;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.FeatureFlags;
15+
import org.opensearch.indices.replication.common.ReplicationType;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
18+
import java.util.List;
19+
20+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
21+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
22+
23+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
24+
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
25+
26+
@Override
27+
protected Settings featureFlagSettings() {
28+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
29+
}
30+
31+
public void testSearchReplicaDedicatedIncludes() {
32+
logger.info("--> starting 2 nodes");
33+
List<String> nodesIds = internalCluster().startNodes(3);
34+
final String node_0 = nodesIds.get(0);
35+
final String node_1 = nodesIds.get(1);
36+
final String node_2 = nodesIds.get(2);
37+
assertEquals(3, cluster().size());
38+
39+
client().admin()
40+
.cluster()
41+
.prepareUpdateSettings()
42+
.setTransientSettings(
43+
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
44+
)
45+
.execute()
46+
.actionGet();
47+
48+
logger.info("--> creating an index with no replicas");
49+
createIndex(
50+
"test",
51+
Settings.builder()
52+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
53+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
55+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
56+
.build()
57+
);
58+
ensureGreen("test");
59+
// ensure primary is not on 1 or 2,
60+
IndexShardRoutingTable routingTable = getRoutingTable();
61+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
62+
63+
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
64+
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;
65+
66+
// set the included nodes to the other open node.
67+
client().admin()
68+
.cluster()
69+
.prepareUpdateSettings()
70+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
71+
.execute()
72+
.actionGet();
73+
ensureGreen("test");
74+
75+
routingTable = getRoutingTable();
76+
logger.info(routingTable);
77+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
78+
logger.info(routingTable.replicaShards());
79+
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
80+
}
81+
82+
private IndexShardRoutingTable getRoutingTable() {
83+
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
84+
return routingTable;
85+
}
86+
87+
private String getNodeName(String id) {
88+
return getClusterState().nodes().get(id).getName();
89+
}
90+
}

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
1919
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
20+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
2021

2122
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
2223
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
@@ -53,4 +54,15 @@ public void testUpdateFeatureFlagDisabled() {
5354
});
5455
assertTrue(settingsException.getMessage().contains("unknown setting"));
5556
}
57+
58+
public void testFilterAllocationSettingNotRegistered() {
59+
expectThrows(SettingsException.class, () -> {
60+
client().admin()
61+
.cluster()
62+
.prepareUpdateSettings()
63+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
64+
.execute()
65+
.actionGet();
66+
});
67+
}
5668
}

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.common.settings.Setting;
4545
import org.opensearch.common.settings.Setting.Property;
4646
import org.opensearch.common.settings.Settings;
47+
import org.opensearch.common.util.FeatureFlags;
4748
import org.opensearch.node.remotestore.RemoteStoreNodeService;
4849

4950
import java.util.Map;
@@ -84,10 +85,12 @@
8485
public class FilterAllocationDecider extends AllocationDecider {
8586

8687
public static final String NAME = "filter";
87-
8888
private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
8989
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
9090
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
91+
92+
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
93+
9194
public static final Setting.AffixSetting<String> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
9295
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".",
9396
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
@@ -100,7 +103,12 @@ public class FilterAllocationDecider extends AllocationDecider {
100103
CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX + ".",
101104
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
102105
);
106+
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
107+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
108+
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
109+
);
103110

111+
private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;
104112
private volatile DiscoveryNodeFilters clusterRequireFilters;
105113
private volatile DiscoveryNodeFilters clusterIncludeFilters;
106114
private volatile DiscoveryNodeFilters clusterExcludeFilters;
@@ -113,7 +121,6 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
113121
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
114122
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
115123
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);
116-
117124
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
118125
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
119126
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
@@ -122,6 +129,15 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting
122129
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
123130
this::setCompatibilityMode
124131
);
132+
133+
if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) {
134+
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
135+
clusterSettings.addAffixMapUpdateConsumer(
136+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
137+
this::setSearchReplicaIncludeFilters,
138+
(a, b) -> {}
139+
);
140+
}
125141
}
126142

127143
private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
@@ -203,6 +219,9 @@ private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, Rou
203219
decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
204220
if (decision != null) return decision;
205221

222+
decision = shouldSearchReplicaShardTypeFilter(shardRouting, node, allocation);
223+
if (decision != null) return decision;
224+
206225
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
207226
}
208227

@@ -294,6 +313,32 @@ private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation alloc
294313
return null;
295314
}
296315

316+
private Decision shouldSearchReplicaShardTypeFilter(ShardRouting routing, DiscoveryNode node, RoutingAllocation allocation) {
317+
if (searchReplicaIncludeFilters != null) {
318+
final boolean match = searchReplicaIncludeFilters.match(node);
319+
if (match == false && routing.isSearchOnly()) {
320+
return allocation.decision(
321+
Decision.NO,
322+
NAME,
323+
"node does not match shard setting [%s] filters [%s]",
324+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
325+
searchReplicaIncludeFilters
326+
);
327+
}
328+
// filter will only apply to search replicas
329+
if (routing.isSearchOnly() == false && match) {
330+
return allocation.decision(
331+
Decision.NO,
332+
NAME,
333+
"only search replicas can be allocated to node with setting [%s] filters [%s]",
334+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
335+
searchReplicaIncludeFilters
336+
);
337+
}
338+
}
339+
return null;
340+
}
341+
297342
private void setClusterRequireFilters(Map<String, String> filters) {
298343
clusterRequireFilters = DiscoveryNodeFilters.trimTier(
299344
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterRequireFilters, AND, filters)
@@ -311,4 +356,10 @@ private void setClusterExcludeFilters(Map<String, String> filters) {
311356
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(clusterExcludeFilters, OR, filters)
312357
);
313358
}
359+
360+
private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
361+
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
362+
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
363+
);
364+
}
314365
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,8 @@ public void apply(Settings value, Settings current, Settings previous) {
804804
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
805805
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
806806
)
807-
)
807+
),
808+
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
809+
List.of(FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
808810
);
809811
}

server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,21 @@
5353
import org.opensearch.common.settings.Settings;
5454
import org.opensearch.node.remotestore.RemoteStoreNodeService;
5555
import org.opensearch.snapshots.EmptySnapshotsInfoService;
56+
import org.opensearch.test.FeatureFlagSetter;
5657
import org.opensearch.test.gateway.TestGatewayAllocator;
5758

5859
import java.util.Arrays;
5960
import java.util.Collections;
61+
import java.util.HashSet;
62+
import java.util.Set;
6063

6164
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_RESIZE_SOURCE_NAME;
6265
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_RESIZE_SOURCE_UUID;
6366
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
6467
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
6568
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
69+
import static org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
70+
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL;
6671
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
6772
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
6873

@@ -462,4 +467,98 @@ public void testMixedModeRemoteStoreAllocation() {
462467
decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation);
463468
assertEquals(decision.toString(), Type.NO, decision.type());
464469
}
470+
471+
public void testSearchReplicaRoutingDedicatedIncludes() {
472+
FeatureFlagSetter.set(READER_WRITER_SPLIT_EXPERIMENTAL);
473+
// we aren't using a settingsModule here so we need to set feature flag gated setting
474+
Set<Setting<?>> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
475+
settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING);
476+
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), settings);
477+
Settings initialSettings = Settings.builder()
478+
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2")
479+
.build();
480+
481+
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings);
482+
AllocationDeciders allocationDeciders = new AllocationDeciders(
483+
Arrays.asList(
484+
filterAllocationDecider,
485+
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
486+
new ReplicaAfterPrimaryActiveAllocationDecider()
487+
)
488+
);
489+
AllocationService service = new AllocationService(
490+
allocationDeciders,
491+
new TestGatewayAllocator(),
492+
new BalancedShardsAllocator(Settings.EMPTY),
493+
EmptyClusterInfoService.INSTANCE,
494+
EmptySnapshotsInfoService.INSTANCE
495+
);
496+
ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
497+
RoutingTable routingTable = state.routingTable();
498+
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
499+
allocation.debugDecision(true);
500+
501+
ShardRouting searchReplica = ShardRouting.newUnassigned(
502+
routingTable.index("sourceIndex").shard(0).shardId(),
503+
false,
504+
true,
505+
RecoverySource.PeerRecoverySource.INSTANCE,
506+
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
507+
);
508+
509+
ShardRouting regularReplica = ShardRouting.newUnassigned(
510+
routingTable.index("sourceIndex").shard(0).shardId(),
511+
false,
512+
false,
513+
RecoverySource.PeerRecoverySource.INSTANCE,
514+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
515+
);
516+
517+
ShardRouting primary = ShardRouting.newUnassigned(
518+
routingTable.index("sourceIndex").shard(0).shardId(),
519+
true,
520+
false,
521+
RecoverySource.PeerRecoverySource.INSTANCE,
522+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
523+
);
524+
525+
Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
526+
searchReplica,
527+
state.getRoutingNodes().node("node2"),
528+
allocation
529+
);
530+
assertEquals(decision.toString(), Type.YES, decision.type());
531+
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
532+
assertEquals(decision.toString(), Type.YES, decision.type());
533+
534+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
535+
assertEquals(decision.toString(), Type.NO, decision.type());
536+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
537+
assertEquals(decision.toString(), Type.NO, decision.type());
538+
539+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
540+
assertEquals(decision.toString(), Type.NO, decision.type());
541+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
542+
assertEquals(decision.toString(), Type.NO, decision.type());
543+
544+
Settings updatedSettings = Settings.builder()
545+
.put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2")
546+
.build();
547+
clusterSettings.applySettings(updatedSettings);
548+
549+
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation);
550+
assertEquals(decision.toString(), Type.YES, decision.type());
551+
decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation);
552+
assertEquals(decision.toString(), Type.NO, decision.type());
553+
554+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation);
555+
assertEquals(decision.toString(), Type.NO, decision.type());
556+
decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation);
557+
assertEquals(decision.toString(), Type.YES, decision.type());
558+
559+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation);
560+
assertEquals(decision.toString(), Type.YES, decision.type());
561+
decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation);
562+
assertEquals(decision.toString(), Type.NO, decision.type());
563+
}
465564
}

0 commit comments

Comments
 (0)