Skip to content

Commit 03f66f2

Browse files
mch2dk2k
authored andcommitted
[RW Separation] Introduce allocation filter to control placement of search only replicas (opensearch-project#15455)
* Introduce allocation filter to control placement of search only replicas Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Add a new decider rather than updating the existing FilterAllocationDecider Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix license header and description on SearchReplicaAllocationDecider Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Pr feedback. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix class name to pass precommit checks Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Refactor all search replica create/update tests to a single OpenSearchSingleNodeTestCase. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * remove changelog entry Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent ad64a96 commit 03f66f2

File tree

9 files changed

+546
-198
lines changed

9 files changed

+546
-198
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.allocation;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
13+
import org.opensearch.cluster.routing.ShardRouting;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.indices.replication.common.ReplicationType;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
19+
import java.util.List;
20+
import java.util.stream.Collectors;
21+
22+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
23+
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
24+
25+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
26+
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
27+
28+
@Override
29+
protected Settings featureFlagSettings() {
30+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
31+
}
32+
33+
public void testSearchReplicaDedicatedIncludes() {
34+
List<String> nodesIds = internalCluster().startNodes(3);
35+
final String node_0 = nodesIds.get(0);
36+
final String node_1 = nodesIds.get(1);
37+
final String node_2 = nodesIds.get(2);
38+
assertEquals(3, cluster().size());
39+
40+
client().admin()
41+
.cluster()
42+
.prepareUpdateSettings()
43+
.setTransientSettings(
44+
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
45+
)
46+
.execute()
47+
.actionGet();
48+
49+
createIndex(
50+
"test",
51+
Settings.builder()
52+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
53+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
55+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
56+
.build()
57+
);
58+
ensureGreen("test");
59+
// ensure primary is not on node 0 or 1,
60+
IndexShardRoutingTable routingTable = getRoutingTable();
61+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
62+
63+
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
64+
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;
65+
66+
// set the included nodes to the other open node, search replica should relocate to that node.
67+
client().admin()
68+
.cluster()
69+
.prepareUpdateSettings()
70+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
71+
.execute()
72+
.actionGet();
73+
ensureGreen("test");
74+
75+
routingTable = getRoutingTable();
76+
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
77+
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
78+
}
79+
80+
public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
81+
List<String> nodesIds = internalCluster().startNodes(3);
82+
final String node_0 = nodesIds.get(0);
83+
final String node_1 = nodesIds.get(1);
84+
final String node_2 = nodesIds.get(2);
85+
assertEquals(3, cluster().size());
86+
87+
// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
88+
client().admin()
89+
.cluster()
90+
.prepareUpdateSettings()
91+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
92+
.execute()
93+
.actionGet();
94+
95+
logger.info("--> creating an index with no replicas");
96+
createIndex(
97+
"test",
98+
Settings.builder()
99+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
100+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
101+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)
102+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
103+
.build()
104+
);
105+
ensureYellowAndNoInitializingShards("test");
106+
IndexShardRoutingTable routingTable = getRoutingTable();
107+
assertEquals(2, routingTable.searchOnlyReplicas().size());
108+
List<ShardRouting> assignedSearchShards = routingTable.searchOnlyReplicas()
109+
.stream()
110+
.filter(ShardRouting::assignedToNode)
111+
.collect(Collectors.toList());
112+
assertEquals(1, assignedSearchShards.size());
113+
assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId()));
114+
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
115+
}
116+
117+
private IndexShardRoutingTable getRoutingTable() {
118+
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
119+
return routingTable;
120+
}
121+
122+
private String getNodeName(String id) {
123+
return getClusterState().nodes().get(id).getName();
124+
}
125+
}

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
1919
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
20+
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
2021

2122
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1)
2223
public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase {
@@ -53,4 +54,15 @@ public void testUpdateFeatureFlagDisabled() {
5354
});
5455
assertTrue(settingsException.getMessage().contains("unknown setting"));
5556
}
57+
58+
public void testFilterAllocationSettingNotRegistered() {
59+
expectThrows(SettingsException.class, () -> {
60+
client().admin()
61+
.cluster()
62+
.prepareUpdateSettings()
63+
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node"))
64+
.execute()
65+
.actionGet();
66+
});
67+
}
5668
}

server/src/main/java/org/opensearch/cluster/ClusterModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
7676
import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
7777
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
78+
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
7879
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
7980
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
8081
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
@@ -85,6 +86,7 @@
8586
import org.opensearch.common.settings.Setting;
8687
import org.opensearch.common.settings.Setting.Property;
8788
import org.opensearch.common.settings.Settings;
89+
import org.opensearch.common.util.FeatureFlags;
8890
import org.opensearch.common.util.concurrent.ThreadContext;
8991
import org.opensearch.common.util.set.Sets;
9092
import org.opensearch.core.ParseField;
@@ -379,6 +381,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
379381
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
380382
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
381383
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
384+
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) {
385+
addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings));
386+
}
382387
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
383388
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
384389
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation.decider;
10+
11+
import org.opensearch.cluster.node.DiscoveryNode;
12+
import org.opensearch.cluster.node.DiscoveryNodeFilters;
13+
import org.opensearch.cluster.routing.RoutingNode;
14+
import org.opensearch.cluster.routing.ShardRouting;
15+
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
16+
import org.opensearch.common.settings.ClusterSettings;
17+
import org.opensearch.common.settings.Setting;
18+
import org.opensearch.common.settings.Setting.Property;
19+
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.node.remotestore.RemoteStoreNodeService;
21+
22+
import java.util.Map;
23+
24+
import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR;
25+
import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
26+
27+
/**
28+
* This allocation decider is similar to FilterAllocationDecider but provides
29+
* the option to filter specifically for search replicas.
30+
* The filter behaves similar to an include for any defined node attribute.
31+
* A search replica can be allocated to only nodes with one of the specified attributes while
32+
* other shard types will be rejected from nodes with any othe attributes.
33+
* @opensearch.internal
34+
*/
35+
public class SearchReplicaAllocationDecider extends AllocationDecider {
36+
37+
public static final String NAME = "filter";
38+
private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include";
39+
public static final Setting.AffixSetting<String> SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting(
40+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".",
41+
key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope)
42+
);
43+
44+
private volatile DiscoveryNodeFilters searchReplicaIncludeFilters;
45+
46+
private volatile RemoteStoreNodeService.Direction migrationDirection;
47+
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;
48+
49+
public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
50+
setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
51+
clusterSettings.addAffixMapUpdateConsumer(
52+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING,
53+
this::setSearchReplicaIncludeFilters,
54+
(a, b) -> {}
55+
);
56+
}
57+
58+
@Override
59+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
60+
return shouldFilter(shardRouting, node.node(), allocation);
61+
}
62+
63+
@Override
64+
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
65+
return shouldFilter(shardRouting, node.node(), allocation);
66+
}
67+
68+
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
69+
if (searchReplicaIncludeFilters != null) {
70+
final boolean match = searchReplicaIncludeFilters.match(node);
71+
if (match == false && shardRouting.isSearchOnly()) {
72+
return allocation.decision(
73+
Decision.NO,
74+
NAME,
75+
"node does not match shard setting [%s] filters [%s]",
76+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
77+
searchReplicaIncludeFilters
78+
);
79+
}
80+
// filter will only apply to search replicas
81+
if (shardRouting.isSearchOnly() == false && match) {
82+
return allocation.decision(
83+
Decision.NO,
84+
NAME,
85+
"only search replicas can be allocated to node with setting [%s] filters [%s]",
86+
SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX,
87+
searchReplicaIncludeFilters
88+
);
89+
}
90+
}
91+
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
92+
}
93+
94+
private void setSearchReplicaIncludeFilters(Map<String, String> filters) {
95+
searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier(
96+
DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters)
97+
);
98+
}
99+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
7777
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
7878
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
79+
import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider;
7980
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
8081
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
8182
import org.opensearch.cluster.service.ClusterApplierService;
@@ -813,6 +814,8 @@ public void apply(Settings value, Settings current, Settings previous) {
813814
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
814815
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
815816
)
816-
)
817+
),
818+
List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL),
819+
List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING)
817820
);
818821
}

server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,10 @@
137137
import static java.util.Collections.singleton;
138138
import static java.util.Collections.singletonList;
139139
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING;
140-
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING;
141140
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
142141
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
143142
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
144143
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
145-
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
146144
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
147145
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
148146
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
@@ -156,7 +154,6 @@
156154
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards;
157155
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings;
158156
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases;
159-
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL;
160157
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
161158
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
162159
import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY;
@@ -2507,71 +2504,6 @@ public void testApplyContextWithSettingsOverlap() throws IOException {
25072504
}
25082505
}
25092506

2510-
public void testDefaultSearchReplicasSetting() {
2511-
FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build());
2512-
Settings templateSettings = Settings.EMPTY;
2513-
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
2514-
final Settings.Builder requestSettings = Settings.builder();
2515-
request.settings(requestSettings.build());
2516-
Settings indexSettings = aggregateIndexSettings(
2517-
ClusterState.EMPTY_STATE,
2518-
request,
2519-
templateSettings,
2520-
null,
2521-
Settings.EMPTY,
2522-
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
2523-
randomShardLimitService(),
2524-
Collections.emptySet(),
2525-
clusterSettings
2526-
);
2527-
assertFalse(INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(indexSettings));
2528-
}
2529-
2530-
public void testSearchReplicasValidationWithSegmentReplication() {
2531-
FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build());
2532-
Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
2533-
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
2534-
final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2);
2535-
request.settings(requestSettings.build());
2536-
Settings indexSettings = aggregateIndexSettings(
2537-
ClusterState.EMPTY_STATE,
2538-
request,
2539-
templateSettings,
2540-
null,
2541-
Settings.EMPTY,
2542-
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
2543-
randomShardLimitService(),
2544-
Collections.emptySet(),
2545-
clusterSettings
2546-
);
2547-
assertEquals("2", indexSettings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
2548-
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE));
2549-
}
2550-
2551-
public void testSearchReplicasValidationWithDocumentReplication() {
2552-
FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build());
2553-
Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();
2554-
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
2555-
final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2);
2556-
request.settings(requestSettings.build());
2557-
2558-
IllegalArgumentException exception = expectThrows(
2559-
IllegalArgumentException.class,
2560-
() -> aggregateIndexSettings(
2561-
ClusterState.EMPTY_STATE,
2562-
request,
2563-
templateSettings,
2564-
null,
2565-
Settings.EMPTY,
2566-
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
2567-
randomShardLimitService(),
2568-
Collections.emptySet(),
2569-
clusterSettings
2570-
)
2571-
);
2572-
assertEquals("To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", exception.getMessage());
2573-
}
2574-
25752507
private IndexTemplateMetadata addMatchingTemplate(Consumer<IndexTemplateMetadata.Builder> configurator) {
25762508
IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*");
25772509
configurator.accept(builder);

0 commit comments

Comments
 (0)