-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Skip search shards with INDEX_REFRESH_BLOCK #129132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 15 commits
2aa74e3
12b6b81
9c705cd
1c75721
1ecc447
cdb4bc1
b7ade2d
cd991c2
5f50d5c
3f86fb8
0edc27c
9de6f06
be37bf6
17706e2
8759a07
bf8a2be
8887609
0f0200a
7689263
598e906
4cdfbd0
76ecade
86c9a5d
7200edc
d72600b
61d40c4
51d5196
24f2770
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.search; | ||
|
||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; | ||
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; | ||
import org.elasticsearch.action.search.ClosePointInTimeRequest; | ||
import org.elasticsearch.action.search.OpenPointInTimeRequest; | ||
import org.elasticsearch.action.search.SearchRequest; | ||
import org.elasticsearch.action.search.TransportClosePointInTimeAction; | ||
import org.elasticsearch.action.search.TransportOpenPointInTimeAction; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.query.QueryBuilders; | ||
import org.elasticsearch.search.builder.PointInTimeBuilder; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; | ||
|
||
public class SearchWithIndexBlocksIT extends ESIntegTestCase { | ||
|
||
public void testSearchIndexWithIndexRefreshBlock() { | ||
createIndex("test"); | ||
|
||
var addIndexBlockRequest = new AddIndexBlockRequest(IndexMetadata.APIBlock.REFRESH, "test"); | ||
client().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest).actionGet(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The refresh block should be added automatically to newly created indices as long as they have replicas and the "use refresh block" setting is enabled in the node setting. We should remove the ability to add the refresh block through the Add Index Block API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for taking a look @tlrx! I was hoping to test this change outside of the context of Serverless. But I agree it's not appropriate to add the refresh block to that API for testing purposes only, so I will see if I can construct the scenario in some other way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, I was able to get the setup I was looking for by adding the block directly to cluster state in the tests. |
||
|
||
indexRandom( | ||
true, | ||
prepareIndex("test").setId("1").setSource("field", "value"), | ||
prepareIndex("test").setId("2").setSource("field", "value"), | ||
prepareIndex("test").setId("3").setSource("field", "value"), | ||
prepareIndex("test").setId("4").setSource("field", "value"), | ||
prepareIndex("test").setId("5").setSource("field", "value"), | ||
prepareIndex("test").setId("6").setSource("field", "value") | ||
); | ||
|
||
assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), 0); | ||
} | ||
|
||
public void testSearchMultipleIndicesEachWithAnIndexRefreshBlock() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this could be folded into a single test, where one or more indices are randomly created, most of some with replicas but other without replicas, and then allocate zero or more search shards and check the expected results, finally assigning all search shards and check the results again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've folded this into a single test with some additional randomization. My goal is to keep the integration tests in the Serverless PR, so I'll add the test scenario you're proposing there. |
||
createIndex("test"); | ||
createIndex("test2"); | ||
|
||
var addIndexBlockRequest = new AddIndexBlockRequest(IndexMetadata.APIBlock.REFRESH, "test", "test2"); | ||
client().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest).actionGet(); | ||
|
||
indexRandom( | ||
true, | ||
prepareIndex("test").setId("1").setSource("field", "value"), | ||
prepareIndex("test").setId("2").setSource("field", "value"), | ||
prepareIndex("test").setId("3").setSource("field", "value"), | ||
prepareIndex("test").setId("4").setSource("field", "value"), | ||
prepareIndex("test").setId("5").setSource("field", "value"), | ||
prepareIndex("test").setId("6").setSource("field", "value"), | ||
prepareIndex("test2").setId("1").setSource("field", "value"), | ||
prepareIndex("test2").setId("2").setSource("field", "value"), | ||
prepareIndex("test2").setId("3").setSource("field", "value"), | ||
prepareIndex("test2").setId("4").setSource("field", "value"), | ||
prepareIndex("test2").setId("5").setSource("field", "value"), | ||
prepareIndex("test2").setId("6").setSource("field", "value") | ||
); | ||
|
||
assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), 0); | ||
} | ||
|
||
public void testSearchMultipleIndicesWithOneIndexRefreshBlock() { | ||
createIndex("test"); | ||
createIndex("test2"); | ||
|
||
// Only block test | ||
var addIndexBlockRequest = new AddIndexBlockRequest(IndexMetadata.APIBlock.REFRESH, "test"); | ||
client().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest).actionGet(); | ||
|
||
indexRandom( | ||
true, | ||
prepareIndex("test").setId("1").setSource("field", "value"), | ||
prepareIndex("test").setId("2").setSource("field", "value"), | ||
prepareIndex("test").setId("3").setSource("field", "value"), | ||
prepareIndex("test").setId("4").setSource("field", "value"), | ||
prepareIndex("test").setId("5").setSource("field", "value"), | ||
prepareIndex("test").setId("6").setSource("field", "value"), | ||
prepareIndex("test2").setId("1").setSource("field", "value"), | ||
prepareIndex("test2").setId("2").setSource("field", "value"), | ||
prepareIndex("test2").setId("3").setSource("field", "value"), | ||
prepareIndex("test2").setId("4").setSource("field", "value"), | ||
prepareIndex("test2").setId("5").setSource("field", "value"), | ||
prepareIndex("test2").setId("6").setSource("field", "value") | ||
); | ||
|
||
// We should get test2 results (not blocked) | ||
assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), 6); | ||
} | ||
|
||
public void testOpenPITWithIndexRefreshBlock() { | ||
createIndex("test"); | ||
|
||
var addIndexBlockRequest = new AddIndexBlockRequest(IndexMetadata.APIBlock.REFRESH, "test"); | ||
client().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest).actionGet(); | ||
|
||
indexRandom( | ||
true, | ||
prepareIndex("test").setId("1").setSource("field", "value"), | ||
prepareIndex("test").setId("2").setSource("field", "value"), | ||
prepareIndex("test").setId("3").setSource("field", "value"), | ||
prepareIndex("test").setId("4").setSource("field", "value"), | ||
prepareIndex("test").setId("5").setSource("field", "value"), | ||
prepareIndex("test").setId("6").setSource("field", "value") | ||
); | ||
|
||
BytesReference pitId = null; | ||
try { | ||
OpenPointInTimeRequest openPITRequest = new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueSeconds(10)) | ||
.allowPartialSearchResults(true); | ||
pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openPITRequest).actionGet().getPointInTimeId(); | ||
SearchRequest searchRequest = new SearchRequest().source( | ||
new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueSeconds(10))) | ||
); | ||
assertHitCount(client().search(searchRequest), 0); | ||
} finally { | ||
if (pitId != null) { | ||
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest, | |
Property.NodeScope | ||
); | ||
|
||
private static final OriginalIndices SKIPPED_INDICES = new OriginalIndices(Strings.EMPTY_ARRAY, IndicesOptions.strictExpandOpen()); | ||
cbuescher marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private final ThreadPool threadPool; | ||
private final ClusterService clusterService; | ||
private final TransportService transportService; | ||
|
@@ -233,6 +235,10 @@ private Map<String, OriginalIndices> buildPerIndexOriginalIndices( | |
for (String index : indices) { | ||
if (hasBlocks) { | ||
blocks.indexBlockedRaiseException(projectState.projectId(), ClusterBlockLevel.READ, index); | ||
if (blocks.hasIndexBlock(projectState.projectId(), index, IndexMetadata.INDEX_REFRESH_BLOCK)) { | ||
res.put(index, SKIPPED_INDICES); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we are doing the filtering in the right place. It is a bit counter intuitive that we would resolve the shards given the indices and the skip some of them. Can we not filter the indices to start with? Maybe that removes the need to use that SKIPPED_INDICES marker thing too :) Do we need to check for this block only in the search API? By the way, something probably needs to happen in ES|QL too around this (does not need to be in this PR, but I am raising the need to track that). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've adjusted my approach based on our discussion (documented in the PR description). Now I ignore blocked indices before resolving shards.
I've added a test to prove that _msearch hits this code path. Are you asking about other 'search-flavored' APIs? I believe this will work for ES|QL, as EsqlSearchShardsAction calls TransportSearchShardAction which calls TransportSearchAction#getLocalShardsIterator. But thanks for bringing that up, I'll make sure there's a follow up task created to test this in ES|QL and make any additional changes necessary. |
||
continue; | ||
} | ||
} | ||
|
||
String[] aliases = indexNameExpressionResolver.allIndexAliases(projectState.metadata(), index, indicesAndAliases); | ||
|
@@ -588,7 +594,7 @@ public void onFailure(Exception e) {} | |
); | ||
} | ||
|
||
static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) { | ||
static void adjustSearchType(SearchRequest searchRequest, boolean oneOrZeroValidShards) { | ||
// if there's a kNN search, always use DFS_QUERY_THEN_FETCH | ||
if (searchRequest.hasKnnSearch()) { | ||
searchRequest.searchType(DFS_QUERY_THEN_FETCH); | ||
|
@@ -603,7 +609,7 @@ static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) { | |
} | ||
|
||
// optimize search type for cases where there is only one shard group to search on | ||
if (singleShard) { | ||
if (oneOrZeroValidShards) { | ||
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard | ||
searchRequest.searchType(QUERY_THEN_FETCH); | ||
} | ||
|
@@ -1304,7 +1310,8 @@ private void executeSearch( | |
|
||
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, projectState.cluster()); | ||
|
||
adjustSearchType(searchRequest, shardIterators.size() == 1); | ||
boolean oneOrZeroValidShards = shardIterators.size() == 1 || allOrAllButOneSkipped(shardIterators); | ||
cbuescher marked this conversation as resolved.
Show resolved
Hide resolved
|
||
adjustSearchType(searchRequest, oneOrZeroValidShards); | ||
|
||
final DiscoveryNodes nodes = projectState.cluster().nodes(); | ||
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup( | ||
|
@@ -1337,6 +1344,30 @@ private void executeSearch( | |
); | ||
} | ||
|
||
/** | ||
* Determines if all, or all but one, iterators are skipped. | ||
* (At this point, iterators may be marked as skipped due to index level blockers). | ||
benchaplin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* We expect skipped iteators to be unlikely, so returning fast after we see more | ||
* than one "not skipped" is an intended optimization. | ||
* | ||
* @param searchShardIterators all the shard iterators derived from indices being searched | ||
* @return true if all of them are already skipped, or only one is not skipped | ||
*/ | ||
private boolean allOrAllButOneSkipped(List<SearchShardIterator> searchShardIterators) { | ||
int notSkippedCount = 0; | ||
|
||
for (SearchShardIterator searchShardIterator : searchShardIterators) { | ||
if (searchShardIterator.skip() == false) { | ||
notSkippedCount++; | ||
if (notSkippedCount > 1) { | ||
return false; | ||
} | ||
} | ||
} | ||
|
||
return true; | ||
} | ||
|
||
Executor asyncSearchExecutor(final String[] indices) { | ||
boolean seenSystem = false; | ||
boolean seenCritical = false; | ||
|
@@ -1889,7 +1920,13 @@ List<SearchShardIterator> getLocalShardsIterator( | |
final ShardId shardId = shardRouting.shardId(); | ||
OriginalIndices finalIndices = originalIndices.get(shardId.getIndex().getName()); | ||
assert finalIndices != null; | ||
list[i++] = new SearchShardIterator(clusterAlias, shardId, shardRouting.getShardRoutings(), finalIndices); | ||
list[i++] = new SearchShardIterator( | ||
clusterAlias, | ||
shardId, | ||
shardRouting.getShardRoutings(), | ||
finalIndices, | ||
finalIndices == SKIPPED_INDICES | ||
cbuescher marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
} | ||
// the returned list must support in-place sorting, so this is the most memory efficient we can do here | ||
return Arrays.asList(list); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also want to have ESQL test for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tracking that to be a followup task.