Skip to content

Commit 59876ce

Browse files
committed
Term vector API on stateless search nodes
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless. However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node. The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them. Relates ES-12112
1 parent b855266 commit 59876ce

File tree

8 files changed

+223
-29
lines changed

8 files changed

+223
-29
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@
206206
import org.elasticsearch.action.synonyms.TransportPutSynonymsAction;
207207
import org.elasticsearch.action.termvectors.MultiTermVectorsAction;
208208
import org.elasticsearch.action.termvectors.TermVectorsAction;
209+
import org.elasticsearch.action.termvectors.TransportEnsureDocsSearchableAction;
209210
import org.elasticsearch.action.termvectors.TransportMultiTermVectorsAction;
210211
import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction;
211212
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
@@ -717,6 +718,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
717718

718719
actions.register(TransportIndexAction.TYPE, TransportIndexAction.class);
719720
actions.register(TransportGetAction.TYPE, TransportGetAction.class);
721+
actions.register(TransportEnsureDocsSearchableAction.TYPE, TransportEnsureDocsSearchableAction.class);
720722
actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
721723
actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class);
722724
actions.register(TransportShardMultiTermsVectorAction.TYPE, TransportShardMultiTermsVectorAction.class);
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
10+
*/
11+
12+
package org.elasticsearch.action.termvectors;
13+
14+
import org.apache.logging.log4j.LogManager;
15+
import org.apache.logging.log4j.Logger;
16+
import org.elasticsearch.action.ActionListener;
17+
import org.elasticsearch.action.ActionResponse;
18+
import org.elasticsearch.action.ActionType;
19+
import org.elasticsearch.action.NoShardAvailableActionException;
20+
import org.elasticsearch.action.support.ActionFilters;
21+
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
22+
import org.elasticsearch.cluster.ProjectState;
23+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
26+
import org.elasticsearch.cluster.project.ProjectResolver;
27+
import org.elasticsearch.cluster.routing.ShardIterator;
28+
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.common.io.stream.Writeable;
30+
import org.elasticsearch.index.IndexService;
31+
import org.elasticsearch.index.mapper.Uid;
32+
import org.elasticsearch.index.shard.IndexShard;
33+
import org.elasticsearch.index.shard.ShardId;
34+
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.injection.guice.Inject;
36+
import org.elasticsearch.threadpool.ThreadPool;
37+
import org.elasticsearch.transport.TransportService;
38+
39+
import java.io.IOException;
40+
import java.util.List;
41+
42+
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction<MultiTermVectorsShardRequest, ActionResponse.Empty> {
43+
44+
private static final Logger logger = LogManager.getLogger(TransportEnsureDocsSearchableAction.class);
45+
private final IndicesService indicesService;
46+
47+
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "/eds";
48+
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(ACTION_NAME);
49+
50+
@Inject
51+
public TransportEnsureDocsSearchableAction(
52+
ClusterService clusterService,
53+
TransportService transportService,
54+
IndicesService indicesService,
55+
ThreadPool threadPool,
56+
ActionFilters actionFilters,
57+
ProjectResolver projectResolver,
58+
IndexNameExpressionResolver indexNameExpressionResolver
59+
) {
60+
super(
61+
ACTION_NAME,
62+
threadPool,
63+
clusterService,
64+
transportService,
65+
actionFilters,
66+
projectResolver,
67+
indexNameExpressionResolver,
68+
MultiTermVectorsShardRequest::new,
69+
threadPool.executor(ThreadPool.Names.GET)
70+
);
71+
this.indicesService = indicesService;
72+
}
73+
74+
@Override
75+
protected boolean isSubAction() {
76+
return true;
77+
}
78+
79+
@Override
80+
protected Writeable.Reader<ActionResponse.Empty> getResponseReader() {
81+
return in -> ActionResponse.Empty.INSTANCE;
82+
}
83+
84+
@Override
85+
protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
86+
return false;
87+
}
88+
89+
@Override
90+
protected ShardIterator shards(ProjectState state, InternalRequest request) {
91+
assert DiscoveryNode.isStateless(clusterService.getSettings()) : "EnsureDocsSearchableAction should only be used in stateless";
92+
final var primaryShard = state.routingTable()
93+
.shardRoutingTable(request.concreteIndex(), request.request().shardId())
94+
.primaryShard();
95+
if (primaryShard.active() == false) {
96+
throw new NoShardAvailableActionException(primaryShard.shardId(), "primary shard is not active");
97+
}
98+
DiscoveryNode node = state.cluster().nodes().get(primaryShard.currentNodeId());
99+
assert node != null;
100+
return new ShardIterator(primaryShard.shardId(), List.of(primaryShard));
101+
}
102+
103+
@Override
104+
protected void asyncShardOperation(MultiTermVectorsShardRequest request, ShardId shardId, ActionListener<ActionResponse.Empty> listener)
105+
throws IOException {
106+
assert DiscoveryNode.isStateless(clusterService.getSettings()) : "EnsureDocsSearchableAction should only be used in stateless";
107+
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE)
108+
: "EnsureDocsSearchableAction should only be executed on a stateless indexing node";
109+
logger.debug("received locally {} with {} sub requests", request, request.locations.size());
110+
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> {
111+
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
112+
final IndexShard indexShard = indexService.getShard(shardId.id());
113+
boolean refreshBeforeReturning = false;
114+
for (int i = 0; i < request.locations.size(); i++) {
115+
TermVectorsRequest termVectorsRequest = request.requests.get(i);
116+
String docId = termVectorsRequest.id();
117+
if (termVectorsRequest.realtime() && docId != null && docId.isEmpty() == false) {
118+
final var docUid = Uid.encodeId(docId);
119+
boolean docInLiveVersionMap = indexShard.withEngine(engine -> engine.isDocumentInLiveVersionMap(docUid));
120+
if (docInLiveVersionMap) {
121+
logger.debug("doc id [{}] (uid [{}]) requires refresh of index shard [{}]", docId, docUid, shardId);
122+
refreshBeforeReturning = true;
123+
break;
124+
}
125+
}
126+
}
127+
if (refreshBeforeReturning) {
128+
logger.debug("refreshing index shard [{}] due to mtv_eds action", shardId);
129+
indexShard.externalRefresh("refresh_mtv_eds", l.map(r -> ActionResponse.Empty.INSTANCE));
130+
} else {
131+
logger.debug("mts_eds action does not require refresh of index shard [{}]", shardId);
132+
l.onResponse(ActionResponse.Empty.INSTANCE);
133+
// TODO: But what if it is already undergoing a refresh? Ensure we wait on the search shards being synchronized on an
134+
// ongoing refresh.
135+
}
136+
}));
137+
}
138+
139+
@Override
140+
protected ActionResponse.Empty shardOperation(MultiTermVectorsShardRequest request, ShardId shardId) {
141+
throw new UnsupportedOperationException();
142+
}
143+
}

server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,21 @@
55
* Public License v 1"; you may not use this file except in compliance with, at
66
* your election, the "Elastic License 2.0", the "GNU Affero General Public
77
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
810
*/
911

1012
package org.elasticsearch.action.termvectors;
1113

14+
import org.elasticsearch.action.ActionListener;
1215
import org.elasticsearch.action.ActionType;
1316
import org.elasticsearch.action.support.ActionFilters;
1417
import org.elasticsearch.action.support.TransportActions;
1518
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
19+
import org.elasticsearch.client.internal.node.NodeClient;
1620
import org.elasticsearch.cluster.ProjectState;
1721
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
1823
import org.elasticsearch.cluster.project.ProjectResolver;
1924
import org.elasticsearch.cluster.routing.ShardIterator;
2025
import org.elasticsearch.cluster.service.ClusterService;
@@ -28,12 +33,15 @@
2833
import org.elasticsearch.threadpool.ThreadPool;
2934
import org.elasticsearch.transport.TransportService;
3035

36+
import java.io.IOException;
37+
3138
import static org.elasticsearch.core.Strings.format;
3239

3340
public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction<
3441
MultiTermVectorsShardRequest,
3542
MultiTermVectorsShardResponse> {
3643

44+
private final NodeClient client;
3745
private final IndicesService indicesService;
3846

3947
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]";
@@ -42,6 +50,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
4250
@Inject
4351
public TransportShardMultiTermsVectorAction(
4452
ClusterService clusterService,
53+
NodeClient client,
4554
TransportService transportService,
4655
IndicesService indicesService,
4756
ThreadPool threadPool,
@@ -60,6 +69,7 @@ public TransportShardMultiTermsVectorAction(
6069
MultiTermVectorsShardRequest::new,
6170
threadPool.executor(ThreadPool.Names.GET)
6271
);
72+
this.client = client;
6373
this.indicesService = indicesService;
6474
}
6575

@@ -80,9 +90,29 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
8090

8191
@Override
8292
protected ShardIterator shards(ProjectState project, InternalRequest request) {
83-
ShardIterator shards = clusterService.operationRouting()
93+
ShardIterator iterator = clusterService.operationRouting()
8494
.getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference());
85-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
95+
if (iterator == null) {
96+
return null;
97+
}
98+
return ShardIterator.allSearchableShards(iterator);
99+
}
100+
101+
@Override
102+
protected void asyncShardOperation(
103+
MultiTermVectorsShardRequest request,
104+
ShardId shardId,
105+
ActionListener<MultiTermVectorsShardResponse> listener
106+
) throws IOException {
107+
boolean ensureDocsSearchable = DiscoveryNode.isStateless(clusterService.getSettings())
108+
&& request.requests.stream().anyMatch(r -> r.realtime() && r.id() != null && r.id().isEmpty() == false);
109+
if (ensureDocsSearchable) {
110+
client.executeLocally(TransportEnsureDocsSearchableAction.TYPE, request, listener.delegateFailureAndWrap((l, ignored) -> {
111+
super.asyncShardOperation(request, shardId, l);
112+
}));
113+
} else {
114+
super.asyncShardOperation(request, shardId, listener);
115+
}
86116
}
87117

88118
@Override

server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,19 @@
55
* Public License v 1"; you may not use this file except in compliance with, at
66
* your election, the "Elastic License 2.0", the "GNU Affero General Public
77
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
810
*/
911

1012
package org.elasticsearch.action.termvectors;
1113

1214
import org.elasticsearch.action.ActionListener;
1315
import org.elasticsearch.action.support.ActionFilters;
1416
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
17+
import org.elasticsearch.client.internal.node.NodeClient;
1518
import org.elasticsearch.cluster.ProjectState;
1619
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
20+
import org.elasticsearch.cluster.node.DiscoveryNode;
1721
import org.elasticsearch.cluster.project.ProjectResolver;
1822
import org.elasticsearch.cluster.routing.ShardIterator;
1923
import org.elasticsearch.cluster.service.ClusterService;
@@ -34,11 +38,13 @@
3438
*/
3539
public class TransportTermVectorsAction extends TransportSingleShardAction<TermVectorsRequest, TermVectorsResponse> {
3640

41+
private final NodeClient client;
3742
private final IndicesService indicesService;
3843

3944
@Inject
4045
public TransportTermVectorsAction(
4146
ClusterService clusterService,
47+
NodeClient client,
4248
TransportService transportService,
4349
IndicesService indicesService,
4450
ThreadPool threadPool,
@@ -57,6 +63,7 @@ public TransportTermVectorsAction(
5763
TermVectorsRequest::new,
5864
threadPool.executor(ThreadPool.Names.GET)
5965
);
66+
this.client = client;
6067
this.indicesService = indicesService;
6168
}
6269

@@ -69,21 +76,18 @@ protected ShardIterator shards(ProjectState project, InternalRequest request) {
6976
.getFirst();
7077
}
7178

72-
return operationRouting.useOnlyPromotableShardsForStateless(
73-
operationRouting.getShards(
74-
79+
ShardIterator iterator = clusterService.operationRouting()
80+
.getShards(
7581
project,
76-
7782
request.concreteIndex(),
78-
7983
request.request().id(),
80-
8184
request.request().routing(),
82-
8385
request.request().preference()
84-
85-
)
86-
);
86+
);
87+
if (iterator == null) {
88+
return null;
89+
}
90+
return ShardIterator.allSearchableShards(iterator);
8791
}
8892

8993
@Override
@@ -103,7 +107,25 @@ protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId,
103107
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
104108
IndexShard indexShard = indexService.getShard(shardId.id());
105109
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
106-
super.asyncShardOperation(request, shardId, listener);
110+
boolean ensureDocSearchable = DiscoveryNode.isStateless(clusterService.getSettings())
111+
&& request.id() != null
112+
&& request.id().isEmpty() == false;
113+
if (ensureDocSearchable) {
114+
// Ensure that the document is searchable before we execute the term vectors request
115+
MultiTermVectorsShardRequest ensureDocSearchableRequest = new MultiTermVectorsShardRequest(request.index(), shardId.id());
116+
ensureDocSearchableRequest.add(0, request);
117+
ensureDocSearchableRequest.preference(request.preference());
118+
ensureDocSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
119+
client.executeLocally(
120+
TransportEnsureDocsSearchableAction.TYPE,
121+
ensureDocSearchableRequest,
122+
listener.delegateFailureAndWrap((l, ignored) -> {
123+
super.asyncShardOperation(request, shardId, l);
124+
})
125+
);
126+
} else {
127+
super.asyncShardOperation(request, shardId, listener);
128+
}
107129
} else {
108130
indexShard.ensureShardSearchActive(b -> {
109131
try {

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.cluster.metadata.ProjectMetadata;
15-
import org.elasticsearch.cluster.node.DiscoveryNode;
1615
import org.elasticsearch.cluster.node.DiscoveryNodes;
1716
import org.elasticsearch.common.Strings;
1817
import org.elasticsearch.common.settings.ClusterSettings;
@@ -42,11 +41,9 @@ public class OperationRouting {
4241
);
4342

4443
private boolean useAdaptiveReplicaSelection;
45-
private final boolean isStateless;
4644

4745
@SuppressWarnings("this-escape")
4846
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
49-
this.isStateless = DiscoveryNode.isStateless(settings);
5047
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
5148
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
5249
}
@@ -78,19 +75,6 @@ public ShardIterator getShards(ProjectState projectState, String index, int shar
7875
return preferenceActiveShardIterator(indexShard, nodes.getLocalNodeId(), nodes, preference, null, null);
7976
}
8077

81-
public ShardIterator useOnlyPromotableShardsForStateless(ShardIterator shards) {
82-
// If it is stateless, only route promotable shards. This is a temporary workaround until a more cohesive solution can be
83-
// implemented for search shards.
84-
if (isStateless && shards != null) {
85-
return new ShardIterator(
86-
shards.shardId(),
87-
shards.getShardRoutings().stream().filter(ShardRouting::isPromotableToPrimary).collect(Collectors.toList())
88-
);
89-
} else {
90-
return shards;
91-
}
92-
}
93-
9478
public List<ShardIterator> searchShards(
9579
ProjectState projectState,
9680
String[] concreteIndices,

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,10 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher, boo
902902
}
903903
}
904904

905+
public boolean isDocumentInLiveVersionMap(BytesRef uid) {
906+
return false;
907+
}
908+
905909
public abstract GetResult get(
906910
Get get,
907911
MappingLookup mappingLookup,

0 commit comments

Comments
 (0)