Skip to content

Commit 63fbd0b

Browse files
authored
Add back primary shard preference for queries (#7375)
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
1 parent 3613881 commit 63fbd0b

File tree

24 files changed

+257
-29
lines changed

24 files changed

+257
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8484
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
8585
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
8686
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
87+
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
8788

8889
### Dependencies
8990
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0

server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
394394
assertNotNull(mapper.mappers().getMapper("field2"));
395395
});
396396

397-
assertBusy(() -> assertTrue(client().prepareGet("index", "2").get().isExists()));
397+
assertBusy(() -> assertTrue(client().prepareGet("index", "2").setPreference("_primary").get().isExists()));
398398

399399
// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
400400
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled

server/src/internalClusterTest/java/org/opensearch/search/basic/SearchWhileCreatingIndexIT.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,13 @@ private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas)
8888
ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();
8989
while (status != ClusterHealthStatus.GREEN) {
9090
// first, verify that search normal search works
91-
SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).get();
91+
SearchResponse searchResponse = client().prepareSearch("test")
92+
.setPreference("_primary")
93+
.setQuery(QueryBuilders.termQuery("field", "test"))
94+
.execute()
95+
.actionGet();
9296
assertHitCount(searchResponse, 1);
97+
9398
Client client = client();
9499
searchResponse = client.prepareSearch("test")
95100
.setPreference(preference + Integer.toString(counter++))

server/src/internalClusterTest/java/org/opensearch/search/fetch/subphase/MatchedQueriesIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ public void testMatchedWithShould() throws Exception {
328328
.should(queryStringQuery("dolor").queryName("dolor"))
329329
.should(queryStringQuery("elit").queryName("elit"))
330330
)
331+
.setPreference("_primary")
331332
.get();
332333

333334
assertHitCount(searchResponse, 2L);

server/src/internalClusterTest/java/org/opensearch/search/functionscore/RandomScoreFunctionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void testConsistentHitsWithSameSeed() throws Exception {
117117
for (int o = 0; o < outerIters; o++) {
118118
final int seed = randomInt();
119119
String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!!
120-
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
120+
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
121121
while (preference.startsWith("_")) {
122122
preference = randomRealisticUnicodeOfLengthBetween(1, 10);
123123
}

server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ public void testStopOneNodePreferenceWithRedState() throws IOException {
8989
internalCluster().stopRandomDataNode();
9090
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get();
9191
String[] preferences = new String[] {
92+
"_primary",
9293
"_local",
94+
"_primary_first",
9395
"_prefer_nodes:somenode",
9496
"_prefer_nodes:server2",
9597
"_prefer_nodes:somenode,server2" };
@@ -140,13 +142,32 @@ public void testSimplePreference() {
140142
client().prepareIndex("test").setSource("field1", "value1").get();
141143
refresh();
142144

143-
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
145+
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
146+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
147+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
148+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
149+
150+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
151+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
152+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
144153
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
145154

146-
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").get();
155+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
156+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
157+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
158+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
159+
160+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
161+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
162+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
163+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
164+
165+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
166+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
167+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
147168
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
148169

149-
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").get();
170+
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
150171
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
151172
}
152173

server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,15 @@ public void testProfileMatchesRegular() throws Exception {
148148
.setProfile(false)
149149
.addSort("id.keyword", SortOrder.ASC)
150150
.setSearchType(SearchType.QUERY_THEN_FETCH)
151+
.setPreference("_primary")
151152
.setRequestCache(false);
152153

153154
SearchRequestBuilder profile = client().prepareSearch("test")
154155
.setQuery(q)
155156
.setProfile(true)
156157
.addSort("id.keyword", SortOrder.ASC)
157158
.setSearchType(SearchType.QUERY_THEN_FETCH)
159+
.setPreference("_primary")
158160
.setRequestCache(false);
159161

160162
MultiSearchResponse.Item[] responses = client().prepareMultiSearch().add(vanilla).add(profile).get().getResponses();

server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void testSearchRandomPreference() throws InterruptedException, ExecutionE
9999
int iters = scaledRandomIntBetween(10, 20);
100100
for (int i = 0; i < iters; i++) {
101101
String randomPreference = randomUnicodeOfLengthBetween(0, 4);
102-
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
102+
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
103103
while (randomPreference.startsWith("_")) {
104104
randomPreference = randomUnicodeOfLengthBetween(0, 4);
105105
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ public ClusterSearchShardsRequest routing(String... routings) {
152152

153153
/**
154154
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
155-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
155+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
156+
* or a custom value, which guarantees that the same order
156157
* will be used across different requests.
157158
*/
158159
public ClusterSearchShardsRequest preference(String preference) {

server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequestBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public ClusterSearchShardsRequestBuilder setRouting(String... routing) {
7676

7777
/**
7878
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
79-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
79+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
80+
* or a custom value, which guarantees that the same order
8081
* will be used across different requests.
8182
*/
8283
public ClusterSearchShardsRequestBuilder setPreference(String preference) {

server/src/main/java/org/opensearch/action/get/GetRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ public GetRequest routing(String routing) {
150150

151151
/**
152152
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
153-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
153+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
154+
* or a custom value, which guarantees that the same order
154155
* will be used across different requests.
155156
*/
156157
public GetRequest preference(String preference) {

server/src/main/java/org/opensearch/action/get/GetRequestBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public GetRequestBuilder setRouting(String routing) {
7373

7474
/**
7575
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
76-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
76+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
77+
* or a custom value, which guarantees that the same order
7778
* will be used across different requests.
7879
*/
7980
public GetRequestBuilder setPreference(String preference) {

server/src/main/java/org/opensearch/action/get/MultiGetRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,8 @@ public ActionRequestValidationException validate() {
316316

317317
/**
318318
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
319-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
319+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
320+
* or a custom value, which guarantees that the same order
320321
* will be used across different requests.
321322
*/
322323
public MultiGetRequest preference(String preference) {

server/src/main/java/org/opensearch/action/get/MultiGetShardRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public int shardId() {
9494

9595
/**
9696
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
97-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
97+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
98+
* or a custom value, which guarantees that the same order
9899
* will be used across different requests.
99100
*/
100101
public MultiGetShardRequest preference(String preference) {

server/src/main/java/org/opensearch/action/search/SearchRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,8 @@ public SearchRequest routing(String... routings) {
432432

433433
/**
434434
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
435-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
435+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
436+
* or a custom value, which guarantees that the same order
436437
* will be used across different requests.
437438
*/
438439
public SearchRequest preference(String preference) {

server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ public SearchRequestBuilder setRouting(String... routing) {
151151

152152
/**
153153
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
154-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
154+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
155+
* or a custom value, which guarantees that the same order
155156
* will be used across different requests.
156157
*/
157158
public SearchRequestBuilder setPreference(String preference) {

server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsShardRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public int shardId() {
8686

8787
/**
8888
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
89-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
89+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
90+
* or a custom value, which guarantees that the same order
9091
* will be used across different requests.
9192
*/
9293
public MultiTermVectorsShardRequest preference(String preference) {

server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ public String preference() {
336336

337337
/**
338338
* Sets the preference to execute the search. Defaults to randomize across
339-
* shards. Can be set to {@code _local} to prefer local shards or a custom value,
340-
* which guarantees that the same order will be used across different
339+
* shards. Can be set to {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
340+
* or a custom value, which guarantees that the same order will be used across different
341341
* requests.
342342
*/
343343
public TermVectorsRequest preference(String preference) {

server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequestBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ public TermVectorsRequestBuilder setRouting(String routing) {
9696

9797
/**
9898
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
99-
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
99+
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
100+
* or a custom value, which guarantees that the same order
100101
* will be used across different requests.
101102
*/
102103
public TermVectorsRequestBuilder setPreference(String preference) {

server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.HashMap;
5555
import java.util.HashSet;
5656
import java.util.Iterator;
57+
import java.util.LinkedList;
5758
import java.util.List;
5859
import java.util.Locale;
5960
import java.util.Map;
@@ -574,6 +575,96 @@ public ShardIterator primaryShardIt() {
574575
return new PlainShardIterator(shardId, Collections.emptyList());
575576
}
576577

578+
/**
579+
* Returns true if no primaries are active or initializing for this shard
580+
*/
581+
private boolean noPrimariesActive() {
582+
return this.primary != null && !this.primary.active() && !this.primary.initializing();
583+
}
584+
585+
/**
586+
* Returns an iterator only on the active primary shard.
587+
*/
588+
public ShardIterator primaryActiveInitializingShardIt() {
589+
if (noPrimariesActive()) {
590+
return new PlainShardIterator(shardId, Collections.emptyList());
591+
}
592+
return primaryShardIt();
593+
}
594+
595+
/**
596+
* Returns an ordered iterator on the active primary shard, followed by replica shards.
597+
*/
598+
public ShardIterator primaryFirstActiveInitializingShardsIt() {
599+
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
600+
// fill it in a randomized fashion
601+
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
602+
ordered.add(shardRouting);
603+
if (shardRouting.primary()) {
604+
// switch, its the matching node id
605+
ordered.set(ordered.size() - 1, ordered.get(0));
606+
ordered.set(0, shardRouting);
607+
}
608+
}
609+
// no need to worry about primary first here..., its temporal
610+
if (!allInitializingShards.isEmpty()) {
611+
ordered.addAll(allInitializingShards);
612+
}
613+
return new PlainShardIterator(shardId, ordered);
614+
}
615+
616+
/**
617+
* Returns an iterator on replica shards.
618+
*/
619+
public ShardIterator replicaActiveInitializingShardIt() {
620+
// If the primaries are unassigned, return an empty list (there aren't
621+
// any replicas to query anyway)
622+
if (noPrimariesActive()) {
623+
return new PlainShardIterator(shardId, Collections.emptyList());
624+
}
625+
626+
LinkedList<ShardRouting> ordered = new LinkedList<>();
627+
for (ShardRouting replica : shuffler.shuffle(replicas)) {
628+
if (replica.active()) {
629+
ordered.addFirst(replica);
630+
} else if (replica.initializing()) {
631+
ordered.addLast(replica);
632+
}
633+
}
634+
return new PlainShardIterator(shardId, ordered);
635+
}
636+
637+
/**
638+
* Returns an ordered iterator on active replica shards, followed by the primary shard.
639+
*/
640+
public ShardIterator replicaFirstActiveInitializingShardsIt() {
641+
// If the primaries are unassigned, return an empty list (there aren't
642+
// any replicas to query anyway)
643+
if (noPrimariesActive()) {
644+
return new PlainShardIterator(shardId, Collections.emptyList());
645+
}
646+
647+
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
648+
// fill it in a randomized fashion with the active replicas
649+
for (ShardRouting replica : shuffler.shuffle(replicas)) {
650+
if (replica.active()) {
651+
ordered.add(replica);
652+
}
653+
}
654+
655+
// Add the primary shard
656+
ordered.add(primary);
657+
658+
// Add initializing shards last
659+
if (!allInitializingShards.isEmpty()) {
660+
ordered.addAll(allInitializingShards);
661+
}
662+
return new PlainShardIterator(shardId, ordered);
663+
}
664+
665+
/**
666+
* Returns an iterator on active and initializing shards residing on the provided nodeId.
667+
*/
577668
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
578669
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
579670
int seed = shuffler.nextSeed();

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,14 @@ private ShardIterator preferenceActiveShardIterator(
342342
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
343343
case LOCAL:
344344
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
345+
case PRIMARY:
346+
return indexShard.primaryActiveInitializingShardIt();
347+
case REPLICA:
348+
return indexShard.replicaActiveInitializingShardIt();
349+
case PRIMARY_FIRST:
350+
return indexShard.primaryFirstActiveInitializingShardsIt();
351+
case REPLICA_FIRST:
352+
return indexShard.replicaFirstActiveInitializingShardsIt();
345353
case ONLY_LOCAL:
346354
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
347355
case ONLY_NODES:

server/src/main/java/org/opensearch/cluster/routing/Preference.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,26 @@ public enum Preference {
5353
*/
5454
LOCAL("_local"),
5555

56+
/**
57+
* Route to primary shards
58+
*/
59+
PRIMARY("_primary"),
60+
61+
/**
62+
* Route to replica shards
63+
*/
64+
REPLICA("_replica"),
65+
66+
/**
67+
* Route to primary shards first
68+
*/
69+
PRIMARY_FIRST("_primary_first"),
70+
71+
/**
72+
* Route to replica shards first
73+
*/
74+
REPLICA_FIRST("_replica_first"),
75+
5676
/**
5777
* Route to the local shard only
5878
*/
@@ -92,6 +112,16 @@ public static Preference parse(String preference) {
92112
return PREFER_NODES;
93113
case "_local":
94114
return LOCAL;
115+
case "_primary":
116+
return PRIMARY;
117+
case "_replica":
118+
return REPLICA;
119+
case "_primary_first":
120+
case "_primaryFirst":
121+
return PRIMARY_FIRST;
122+
case "_replica_first":
123+
case "_replicaFirst":
124+
return REPLICA_FIRST;
95125
case "_only_local":
96126
case "_onlyLocal":
97127
return ONLY_LOCAL;

0 commit comments

Comments
 (0)