Skip to content

Commit 238b9e1

Browse files
authored
Term vector API on stateless search nodes (#129902)
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless. However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node. The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them. Relates ES-12112
1 parent e0f4cff commit 238b9e1

File tree

10 files changed

+187
-37
lines changed

10 files changed

+187
-37
lines changed

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mtermvectors/30_routing.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ routing:
77
settings:
88
index:
99
number_of_shards: 5
10-
number_of_replicas: 0
1110

1211
- do:
1312
cluster.health:
14-
wait_for_status: green
13+
wait_for_no_initializing_shards: true
1514

1615
- do:
1716
index:
@@ -52,11 +51,14 @@ requires routing:
5251
settings:
5352
index:
5453
number_of_shards: 5
55-
number_of_replicas: 0
5654
mappings:
5755
_routing:
5856
required: true
5957

58+
- do:
59+
cluster.health:
60+
wait_for_no_initializing_shards: true
61+
6062
- do:
6163
index:
6264
index: test_1

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/20_issue7121.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
index:
88
translog.flush_threshold_size: "512MB"
99
number_of_shards: 1
10-
number_of_replicas: 0
1110
refresh_interval: -1
1211
mappings:
1312
properties:
@@ -17,7 +16,7 @@
1716

1817
- do:
1918
cluster.health:
20-
wait_for_status: green
19+
wait_for_no_initializing_shards: true
2120

2221
- do:
2322
index:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/30_realtime.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77
settings:
88
index:
99
refresh_interval: -1
10-
number_of_replicas: 0
1110

1211
- do:
13-
cluster.health:
14-
wait_for_status: green
12+
cluster.health:
13+
wait_for_no_initializing_shards: true
1514

1615
- do:
1716
index:
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
10+
*/
11+
12+
package org.elasticsearch.action.termvectors;
13+
14+
import org.elasticsearch.action.ActionRequestValidationException;
15+
import org.elasticsearch.action.ActionResponse;
16+
import org.elasticsearch.action.ActionType;
17+
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
18+
import org.elasticsearch.common.io.stream.StreamInput;
19+
import org.elasticsearch.common.io.stream.StreamOutput;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* This action is used in serverless to ensure that documents are searchable on the search tier before processing
25+
* term vector requests. It is an intermediate action that is executed on the indexing node and responds
26+
* with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh
27+
* to ensure the search shards are up to date before returning the no-op.
28+
*/
29+
public class EnsureDocsSearchableAction {
30+
31+
private static final String ACTION_NAME = "internal:index/data/read/eds";
32+
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(ACTION_NAME);
33+
public static final String ENSURE_DOCS_SEARCHABLE_ORIGIN = "ensure_docs_searchable";
34+
35+
public static final class EnsureDocsSearchableRequest extends SingleShardRequest<EnsureDocsSearchableRequest> {
36+
37+
private int shardId; // this is not serialized over the wire, and will be 0 on the other end of the wire.
38+
private String[] docIds;
39+
40+
public EnsureDocsSearchableRequest() {}
41+
42+
public EnsureDocsSearchableRequest(StreamInput in) throws IOException {
43+
super(in);
44+
docIds = in.readStringArray();
45+
}
46+
47+
@Override
48+
public ActionRequestValidationException validate() {
49+
return super.validateNonNullIndex();
50+
}
51+
52+
public EnsureDocsSearchableRequest(String index, int shardId, String[] docIds) {
53+
super(index);
54+
this.shardId = shardId;
55+
this.docIds = docIds;
56+
}
57+
58+
public int shardId() {
59+
return this.shardId;
60+
}
61+
62+
public String[] docIds() {
63+
return docIds;
64+
}
65+
66+
@Override
67+
public void writeTo(StreamOutput out) throws IOException {
68+
super.writeTo(out);
69+
out.writeStringArray(docIds);
70+
}
71+
72+
}
73+
74+
}

server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,21 @@
55
* Public License v 1"; you may not use this file except in compliance with, at
66
* your election, the "Elastic License 2.0", the "GNU Affero General Public
77
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
810
*/
911

1012
package org.elasticsearch.action.termvectors;
1113

14+
import org.elasticsearch.action.ActionListener;
1215
import org.elasticsearch.action.ActionType;
1316
import org.elasticsearch.action.support.ActionFilters;
1417
import org.elasticsearch.action.support.TransportActions;
1518
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
19+
import org.elasticsearch.client.internal.node.NodeClient;
1620
import org.elasticsearch.cluster.ProjectState;
1721
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
1823
import org.elasticsearch.cluster.project.ProjectResolver;
1924
import org.elasticsearch.cluster.routing.ShardIterator;
2025
import org.elasticsearch.cluster.service.ClusterService;
@@ -28,20 +33,26 @@
2833
import org.elasticsearch.threadpool.ThreadPool;
2934
import org.elasticsearch.transport.TransportService;
3035

36+
import java.io.IOException;
37+
import java.util.List;
38+
3139
import static org.elasticsearch.core.Strings.format;
3240

3341
public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction<
3442
MultiTermVectorsShardRequest,
3543
MultiTermVectorsShardResponse> {
3644

45+
private final NodeClient client;
3746
private final IndicesService indicesService;
47+
private final boolean stateless;
3848

3949
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]";
4050
public static final ActionType<MultiTermVectorsShardResponse> TYPE = new ActionType<>(ACTION_NAME);
4151

4252
@Inject
4353
public TransportShardMultiTermsVectorAction(
4454
ClusterService clusterService,
55+
NodeClient client,
4556
TransportService transportService,
4657
IndicesService indicesService,
4758
ThreadPool threadPool,
@@ -60,7 +71,9 @@ public TransportShardMultiTermsVectorAction(
6071
MultiTermVectorsShardRequest::new,
6172
threadPool.executor(ThreadPool.Names.GET)
6273
);
74+
this.client = client;
6375
this.indicesService = indicesService;
76+
this.stateless = DiscoveryNode.isStateless(clusterService.getSettings());
6477
}
6578

6679
@Override
@@ -80,9 +93,44 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
8093

8194
@Override
8295
protected ShardIterator shards(ProjectState project, InternalRequest request) {
83-
ShardIterator shards = clusterService.operationRouting()
96+
ShardIterator iterator = clusterService.operationRouting()
8497
.getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference());
85-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
98+
if (iterator == null) {
99+
// We return an empty iterator to avoid hitting an indexing node in serverless (e.g., if there are no search nodes available).
100+
return new ShardIterator(null, List.of());
101+
}
102+
return ShardIterator.allSearchableShards(iterator);
103+
}
104+
105+
@Override
106+
protected void asyncShardOperation(
107+
MultiTermVectorsShardRequest request,
108+
ShardId shardId,
109+
ActionListener<MultiTermVectorsShardResponse> listener
110+
) throws IOException {
111+
if (stateless) {
112+
final String[] realTimeIds = request.requests.stream()
113+
.filter(r -> r.realtime())
114+
.map(TermVectorsRequest::id)
115+
.toArray(String[]::new);
116+
if (realTimeIds.length > 0) {
117+
final var ensureDocsSearchableRequest = new EnsureDocsSearchableAction.EnsureDocsSearchableRequest(
118+
request.index(),
119+
shardId.id(),
120+
realTimeIds
121+
);
122+
ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
123+
client.executeLocally(
124+
EnsureDocsSearchableAction.TYPE,
125+
ensureDocsSearchableRequest,
126+
listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l))
127+
);
128+
} else {
129+
super.asyncShardOperation(request, shardId, listener);
130+
}
131+
} else {
132+
super.asyncShardOperation(request, shardId, listener);
133+
}
86134
}
87135

88136
@Override

server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,19 @@
55
* Public License v 1"; you may not use this file except in compliance with, at
66
* your election, the "Elastic License 2.0", the "GNU Affero General Public
77
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
810
*/
911

1012
package org.elasticsearch.action.termvectors;
1113

1214
import org.elasticsearch.action.ActionListener;
1315
import org.elasticsearch.action.support.ActionFilters;
1416
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
17+
import org.elasticsearch.client.internal.node.NodeClient;
1518
import org.elasticsearch.cluster.ProjectState;
1619
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
20+
import org.elasticsearch.cluster.node.DiscoveryNode;
1721
import org.elasticsearch.cluster.project.ProjectResolver;
1822
import org.elasticsearch.cluster.routing.ShardIterator;
1923
import org.elasticsearch.cluster.service.ClusterService;
@@ -28,17 +32,21 @@
2832
import org.elasticsearch.transport.TransportService;
2933

3034
import java.io.IOException;
35+
import java.util.List;
3136

3237
/**
3338
* Performs the get operation.
3439
*/
3540
public class TransportTermVectorsAction extends TransportSingleShardAction<TermVectorsRequest, TermVectorsResponse> {
3641

42+
private final NodeClient client;
3743
private final IndicesService indicesService;
44+
private final boolean stateless;
3845

3946
@Inject
4047
public TransportTermVectorsAction(
4148
ClusterService clusterService,
49+
NodeClient client,
4250
TransportService transportService,
4351
IndicesService indicesService,
4452
ThreadPool threadPool,
@@ -57,7 +65,9 @@ public TransportTermVectorsAction(
5765
TermVectorsRequest::new,
5866
threadPool.executor(ThreadPool.Names.GET)
5967
);
68+
this.client = client;
6069
this.indicesService = indicesService;
70+
this.stateless = DiscoveryNode.isStateless(clusterService.getSettings());
6171
}
6272

6373
@Override
@@ -69,21 +79,19 @@ protected ShardIterator shards(ProjectState project, InternalRequest request) {
6979
.getFirst();
7080
}
7181

72-
return operationRouting.useOnlyPromotableShardsForStateless(
73-
operationRouting.getShards(
74-
82+
ShardIterator iterator = clusterService.operationRouting()
83+
.getShards(
7584
project,
76-
7785
request.concreteIndex(),
78-
7986
request.request().id(),
80-
8187
request.request().routing(),
82-
8388
request.request().preference()
84-
85-
)
86-
);
89+
);
90+
if (iterator == null) {
91+
// We return an empty iterator to avoid hitting an indexing node in serverless (e.g., if there are no search nodes available).
92+
return new ShardIterator(null, List.of());
93+
}
94+
return ShardIterator.allSearchableShards(iterator);
8795
}
8896

8997
@Override
@@ -103,7 +111,22 @@ protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId,
103111
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
104112
IndexShard indexShard = indexService.getShard(shardId.id());
105113
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
106-
super.asyncShardOperation(request, shardId, listener);
114+
if (stateless) {
115+
// Ensure that the document is searchable before we execute the term vectors request
116+
final var ensureDocsSearchableRequest = new EnsureDocsSearchableAction.EnsureDocsSearchableRequest(
117+
request.index(),
118+
shardId.id(),
119+
new String[] { request.id() }
120+
);
121+
ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
122+
client.executeLocally(
123+
EnsureDocsSearchableAction.TYPE,
124+
ensureDocsSearchableRequest,
125+
listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l))
126+
);
127+
} else {
128+
super.asyncShardOperation(request, shardId, listener);
129+
}
107130
} else {
108131
indexShard.ensureShardSearchActive(b -> {
109132
try {

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.cluster.metadata.ProjectMetadata;
15-
import org.elasticsearch.cluster.node.DiscoveryNode;
1615
import org.elasticsearch.cluster.node.DiscoveryNodes;
1716
import org.elasticsearch.common.Strings;
1817
import org.elasticsearch.common.settings.ClusterSettings;
@@ -42,11 +41,9 @@ public class OperationRouting {
4241
);
4342

4443
private boolean useAdaptiveReplicaSelection;
45-
private final boolean isStateless;
4644

4745
@SuppressWarnings("this-escape")
4846
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
49-
this.isStateless = DiscoveryNode.isStateless(settings);
5047
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
5148
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
5249
}
@@ -78,19 +75,6 @@ public ShardIterator getShards(ProjectState projectState, String index, int shar
7875
return preferenceActiveShardIterator(indexShard, nodes.getLocalNodeId(), nodes, preference, null, null);
7976
}
8077

81-
public ShardIterator useOnlyPromotableShardsForStateless(ShardIterator shards) {
82-
// If it is stateless, only route promotable shards. This is a temporary workaround until a more cohesive solution can be
83-
// implemented for search shards.
84-
if (isStateless && shards != null) {
85-
return new ShardIterator(
86-
shards.shardId(),
87-
shards.getShardRoutings().stream().filter(ShardRouting::isPromotableToPrimary).collect(Collectors.toList())
88-
);
89-
} else {
90-
return shards;
91-
}
92-
}
93-
9478
public List<ShardIterator> searchShards(
9579
ProjectState projectState,
9680
String[] concreteIndices,

0 commit comments

Comments
 (0)