-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Term vector API on stateless search nodes #129902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Term vector API on stateless search nodes #129902
Conversation
59876ce
to
dd19b4c
Compare
dd19b4c
to
ab4e1ed
Compare
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless. However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node. The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them. Relates ES-12112
82434e3
to
e3b6a4b
Compare
Pinging @elastic/es-search (Team:Search) |
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
.../src/main/java/org/elasticsearch/action/termvectors/TransportEnsureDocsSearchableAction.java
Outdated
Show resolved
Hide resolved
); | ||
); | ||
if (iterator == null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it will execute on the indexing shard in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it'll execute the request in the receiving node. I think that we should return an empty iterator instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this from TransportGetAction#shards()
, which does the same to send the requests to the searchable shards. And there's this old discussion which seems to concluded to keep it.
However, I do see that in case it's null, it will be executed locally which may be a node that does not contain the shard (and thus might cause NPE / shard not found) or worst-case be executed on an indexing node (e.g., if all search nodes are down and the proxy sends it to an indexing node) which may try to execute it locally, meaning it may to search later on a possibly hollow indexing shard and get a weird exception (customer may see something like "cannot search a hollow shard").
So all-in-all, I agree we should better return an empty iterator, meaning it will finally give a shard not available exception (which seems better and more usual for end users).
I changed it, but question remains for you whether the same should be done for real-time gets?
ShardId shardId, | ||
ActionListener<MultiTermVectorsShardResponse> listener | ||
) throws IOException { | ||
if (DiscoveryNode.isStateless(clusterService.getSettings())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we can capture this only once instead of reevaluating for every request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it on a private variable evaluated on the constructor. Hopefully that's what you meant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I left a few comments
@@ -7,11 +7,6 @@ routing: | |||
settings: | |||
index: | |||
number_of_shards: 5 | |||
number_of_replicas: 0 | |||
|
|||
- do: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this removed intentionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes because it won't work in Serverless, as there'll be no search shard to execute the new term API. In serverless we force to have a search node and we'd like a search shard.
So this way it works in both stateful and stateless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the test would become flaky now that we don't wait for the cluster to go green, that's why I was asking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To wait for the search shard is definitely up and running, I incorporated the following piece of code
- do:
cluster.health:
wait_for_no_initializing_shards: true
which is copied from #114641 which solved a similar issue to make the tests work in both ES and serverless.
I run it also 10 locally, both core ES and severless, and it succeeds. Feel free to tell me if you have more feedback.
.../src/main/java/org/elasticsearch/action/termvectors/TransportEnsureDocsSearchableAction.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/elasticsearch/action/termvectors/TransportEnsureDocsSearchableAction.java
Outdated
Show resolved
Hide resolved
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE) | ||
: ACTION_NAME + " should only be executed on a stateless indexing node"; | ||
logger.debug("received request with {} docs", request.docIds.length); | ||
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ok failing the request if the primary moved in the meantime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so because the previous state of the code also allows for the request to fail in case shard move. Specifically:
- In stateless, the action only goes to the primary shard. If we see for example
TransportTermVectorsAction#asyncShardOperation()
, it accessesindicesService.indexServiceSafe()
which will throw if the primary has moved. And TransportSingleShardAction will then fail the request with NoShardAvailableActionException. - In stateful, the action iterates over all searchable shards, so including the primary and replicas. And each time a failure is met (e.g., if the shard moved), it will try the next shard. But it's possible with 1 primary and 1 replica that maybe both move around the same time, and the TransportSingleShardAction will then fail the request with NoShardAvailableActionException.
are we ok failing the request if the primary moved in the meantime?
So yes requests already could fail. But this PR makes the stateless behavior a bit worse though, because we're doubling the possibilities that a shard is not found (first the search shard may be moved, and then the primary shard may be moved). It does not "break" the premise that the request may fail though. Do you think we should do something more, like a reroute phase, and it should it be only in serverless only or also in stateful (if all shards move)? Maybe it can be an amendment ticket for the future.
.../src/main/java/org/elasticsearch/action/termvectors/TransportEnsureDocsSearchableAction.java
Outdated
Show resolved
Hide resolved
); | ||
); | ||
if (iterator == null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it'll execute the request in the receiving node. I think that we should return an empty iterator instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -7,11 +7,6 @@ routing: | |||
settings: | |||
index: | |||
number_of_shards: 5 | |||
number_of_replicas: 0 | |||
|
|||
- do: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes because it won't work in Serverless, as there'll be no search shard to execute the new term API. In serverless we force to have a search node and we'd like a search shard.
So this way it works in both stateful and stateless.
.../src/main/java/org/elasticsearch/action/termvectors/TransportEnsureDocsSearchableAction.java
Outdated
Show resolved
Hide resolved
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE) | ||
: ACTION_NAME + " should only be executed on a stateless indexing node"; | ||
logger.debug("received request with {} docs", request.docIds.length); | ||
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so because the previous state of the code also allows for the request to fail in case shard move. Specifically:
- In stateless, the action only goes to the primary shard. If we see for example
TransportTermVectorsAction#asyncShardOperation()
, it accessesindicesService.indexServiceSafe()
which will throw if the primary has moved. And TransportSingleShardAction will then fail the request with NoShardAvailableActionException. - In stateful, the action iterates over all searchable shards, so including the primary and replicas. And each time a failure is met (e.g., if the shard moved), it will try the next shard. But it's possible with 1 primary and 1 replica that maybe both move around the same time, and the TransportSingleShardAction will then fail the request with NoShardAvailableActionException.
are we ok failing the request if the primary moved in the meantime?
So yes requests already could fail. But this PR makes the stateless behavior a bit worse though, because we're doubling the possibilities that a shard is not found (first the search shard may be moved, and then the primary shard may be moved). It does not "break" the premise that the request may fail though. Do you think we should do something more, like a reroute phase, and it should it be only in serverless only or also in stateful (if all shards move)? Maybe it can be an amendment ticket for the future.
ShardId shardId, | ||
ActionListener<MultiTermVectorsShardResponse> listener | ||
) throws IOException { | ||
if (DiscoveryNode.isStateless(clusterService.getSettings())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it on a private variable evaluated on the constructor. Hopefully that's what you meant.
); | ||
); | ||
if (iterator == null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this from TransportGetAction#shards()
, which does the same to send the requests to the searchable shards. And there's this old discussion which seems to concluded to keep it.
However, I do see that in case it's null, it will be executed locally which may be a node that does not contain the shard (and thus might cause NPE / shard not found) or worst-case be executed on an indexing node (e.g., if all search nodes are down and the proxy sends it to an indexing node) which may try to execute it locally, meaning it may to search later on a possibly hollow indexing shard and get a weird exception (customer may see something like "cannot search a hollow shard").
So all-in-all, I agree we should better return an empty iterator, meaning it will finally give a shard not available exception (which seems better and more usual for end users).
I changed it, but question remains for you whether the same should be done for real-time gets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. My only outstanding comment is around the yaml tests.
@@ -7,11 +7,6 @@ routing: | |||
settings: | |||
index: | |||
number_of_shards: 5 | |||
number_of_replicas: 0 | |||
|
|||
- do: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the test would become flaky now that we don't wait for the cluster to go green, that's why I was asking.
server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fcofdez ! Please take another look, and see the new origin string.
@@ -7,11 +7,6 @@ routing: | |||
settings: | |||
index: | |||
number_of_shards: 5 | |||
number_of_replicas: 0 | |||
|
|||
- do: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To wait for the search shard is definitely up and running, I incorporated the following piece of code
- do:
cluster.health:
wait_for_no_initializing_shards: true
which is copied from #114641 which solved a similar issue to make the tests work in both ES and serverless.
I run it also 10 locally, both core ES and severless, and it succeeds. Feel free to tell me if you have more feedback.
@@ -132,6 +133,7 @@ public static void switchUserBasedOnActionOriginAndExecute( | |||
case SECURITY_PROFILE_ORIGIN: | |||
securityContext.executeAsInternalUser(InternalUsers.SECURITY_PROFILE_USER, version, consumer); | |||
break; | |||
case ENSURE_DOCS_SEARCHABLE_ORIGIN: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @fcofdez , please review this new addition to the PR. It follows a similar approach as is used in PostWriteRefresh.java to send unpromotable refreshes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not an expert on the auth/security, but this looks reasonable to me.
…king * upstream/main: (100 commits) Term vector API on stateless search nodes (elastic#129902) TEST Fix ThreadPoolMergeSchedulerStressTestIT testMergingFallsBehindAndThenCatchesUp (elastic#131636) Add inference.put_custom rest-api-spec (elastic#131660) ESQL: Fewer serverless docs in tests (elastic#131651) Skip search on indices with INDEX_REFRESH_BLOCK (elastic#129132) Mute org.elasticsearch.indices.cluster.RemoteSearchForceConnectTimeoutIT testTimeoutSetting elastic#131656 [jdk] Resolve EA OpenJDK builds to our JDK archive (elastic#131237) Add optimized path for intermediate values aggregator (elastic#131390) Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline (elastic#131236) Refresh potential lost connections at query start for `_search` (elastic#130463) Add template_id to patterned-text type (elastic#131401) Integrate LIKE/RLIKE LIST with ReplaceStringCasingWithInsensitiveRegexMatch rule (elastic#131531) [ES|QL] Add doc for the COMPLETION command (elastic#131010) ESQL: Add times to topn status (elastic#131555) ESQL: Add asynchronous pre-optimization step for logical plan (elastic#131440) ES|QL: Improve generative tests for FORK [130015] (elastic#131206) Update index mapping update privileges (elastic#130894) ESQL: Added Sample operator NamedWritable to plugin (elastic#131541) update `kibana_system` to grant it access to `.chat-*` system index (elastic#131419) Clarify heap size configuration (elastic#131607) ...
…-tracking * upstream/main: (44 commits) Term vector API on stateless search nodes (elastic#129902) TEST Fix ThreadPoolMergeSchedulerStressTestIT testMergingFallsBehindAndThenCatchesUp (elastic#131636) Add inference.put_custom rest-api-spec (elastic#131660) ESQL: Fewer serverless docs in tests (elastic#131651) Skip search on indices with INDEX_REFRESH_BLOCK (elastic#129132) Mute org.elasticsearch.indices.cluster.RemoteSearchForceConnectTimeoutIT testTimeoutSetting elastic#131656 [jdk] Resolve EA OpenJDK builds to our JDK archive (elastic#131237) Add optimized path for intermediate values aggregator (elastic#131390) Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline (elastic#131236) Refresh potential lost connections at query start for `_search` (elastic#130463) Add template_id to patterned-text type (elastic#131401) Integrate LIKE/RLIKE LIST with ReplaceStringCasingWithInsensitiveRegexMatch rule (elastic#131531) [ES|QL] Add doc for the COMPLETION command (elastic#131010) ESQL: Add times to topn status (elastic#131555) ESQL: Add asynchronous pre-optimization step for logical plan (elastic#131440) ES|QL: Improve generative tests for FORK [130015] (elastic#131206) Update index mapping update privileges (elastic#130894) ESQL: Added Sample operator NamedWritable to plugin (elastic#131541) update `kibana_system` to grant it access to `.chat-*` system index (elastic#131419) Clarify heap size configuration (elastic#131607) ...
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless (see #94257). However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node.
The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them.
Relates ES-12112