Skip to content

Commit 0173f13

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add early termination support for concurrent segment search
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent b9edb5a commit 0173f13

File tree

6 files changed

+106
-12
lines changed

6 files changed

+106
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
1111
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
1212
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
13+
- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306))
1314

1415
### Dependencies
1516
- Bump `log4j-core` from 2.18.0 to 2.19.0
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
import org.opensearch.common.settings.FeatureFlagSettings;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.FeatureFlags;
15+
16+
public class ConcurrentSegmentSearchCancellationIT extends SearchCancellationIT {
17+
@Override
18+
protected Settings featureFlagSettings() {
19+
Settings.Builder featureSettings = Settings.builder();
20+
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
21+
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
22+
}
23+
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
24+
return featureSettings.build();
25+
}
26+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
import org.opensearch.common.settings.FeatureFlagSettings;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.FeatureFlags;
15+
16+
public class ConcurrentSegmentSearchTimeoutIT extends SearchTimeoutIT {
17+
18+
@Override
19+
protected Settings featureFlagSettings() {
20+
Settings.Builder featureSettings = Settings.builder();
21+
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
22+
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
23+
}
24+
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
25+
return featureSettings.build();
26+
}
27+
}

server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,7 @@ private void verifyCancellationException(ShardSearchFailure[] failures) {
207207
// failure may happen while executing the search or while sending shard request for next phase.
208208
// Below assertion is handling both the cases
209209
final Throwable topFailureCause = searchFailure.getCause();
210-
assertTrue(
211-
searchFailure.toString(),
212-
topFailureCause instanceof TransportException || topFailureCause instanceof TaskCancelledException
213-
);
210+
assertTrue(searchFailure.toString(), topFailureCause instanceof RuntimeException);
214211
if (topFailureCause instanceof TransportException) {
215212
assertTrue(topFailureCause.getCause() instanceof TaskCancelledException);
216213
}

server/src/internalClusterTest/java/org/opensearch/search/SearchTimeoutIT.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
5252
import static org.opensearch.index.query.QueryBuilders.scriptQuery;
5353
import static org.opensearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME;
54-
import static org.hamcrest.Matchers.equalTo;
5554

5655
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
5756
public class SearchTimeoutIT extends OpenSearchIntegTestCase {
@@ -67,17 +66,37 @@ protected Settings nodeSettings(int nodeOrdinal) {
6766
}
6867

6968
public void testSimpleTimeout() throws Exception {
70-
for (int i = 0; i < 32; i++) {
69+
final int numDocs = 1000;
70+
for (int i = 0; i < numDocs; i++) {
7171
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
7272
}
7373
refresh("test");
7474

7575
SearchResponse searchResponse = client().prepareSearch("test")
76-
.setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
76+
.setTimeout(new TimeValue(1, TimeUnit.MILLISECONDS))
7777
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
7878
.setAllowPartialSearchResults(true)
7979
.get();
80-
assertThat(searchResponse.isTimedOut(), equalTo(true));
80+
assertTrue(searchResponse.isTimedOut());
81+
assertEquals(0, searchResponse.getFailedShards());
82+
assertTrue(numDocs > searchResponse.getHits().getTotalHits().value);
83+
}
84+
85+
public void testSimpleDoesNotTimeout() throws Exception {
86+
final int numDocs = 10;
87+
for (int i = 0; i < numDocs; i++) {
88+
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
89+
}
90+
refresh("test");
91+
92+
SearchResponse searchResponse = client().prepareSearch("test")
93+
.setTimeout(new TimeValue(10000, TimeUnit.SECONDS))
94+
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
95+
.setAllowPartialSearchResults(true)
96+
.get();
97+
assertFalse(searchResponse.isTimedOut());
98+
assertEquals(0, searchResponse.getFailedShards());
99+
assertEquals(numDocs, searchResponse.getHits().getTotalHits().value);
81100
}
82101

83102
public void testPartialResultsIntolerantTimeout() throws Exception {

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import org.opensearch.search.profile.query.ProfileWeight;
7474
import org.opensearch.search.profile.query.QueryProfiler;
7575
import org.opensearch.search.profile.query.QueryTimingType;
76+
import org.opensearch.search.query.QueryPhase;
77+
import org.opensearch.search.query.QueryPhaseExecutionException;
7678
import org.opensearch.search.query.QuerySearchResult;
7779
import org.opensearch.search.sort.FieldSortBuilder;
7880
import org.opensearch.search.sort.MinAndMax;
@@ -310,17 +312,25 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
310312
return;
311313
}
312314

313-
cancellable.checkCancelled();
314-
weight = wrapWeight(weight);
315-
// See please https://github.com/apache/lucene/pull/964
316-
collector.setWeight(weight);
317315
final LeafCollector leafCollector;
318316
try {
317+
cancellable.checkCancelled();
318+
weight = wrapWeight(weight);
319+
// See please https://github.com/apache/lucene/pull/964
320+
collector.setWeight(weight);
319321
leafCollector = collector.getLeafCollector(ctx);
320322
} catch (CollectionTerminatedException e) {
321323
// there is no doc of interest in this reader context
322324
// continue with the following leaf
323325
return;
326+
} catch (QueryPhase.TimeExceededException e) {
327+
// Swallow timeout exception and return
328+
if (searchContext.request().allowPartialSearchResults() == false) {
329+
// Can't rethrow TimeExceededException because not serializable
330+
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
331+
}
332+
searchContext.queryResult().searchTimedOut(true);
333+
return;
324334
}
325335
Bits liveDocs = ctx.reader().getLiveDocs();
326336
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
@@ -332,6 +342,13 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
332342
} catch (CollectionTerminatedException e) {
333343
// collection was terminated prematurely
334344
// continue with the following leaf
345+
} catch (QueryPhase.TimeExceededException e) {
346+
// Swallow timeout exception
347+
if (searchContext.request().allowPartialSearchResults() == false) {
348+
// Can't rethrow TimeExceededException because not serializable
349+
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
350+
}
351+
searchContext.queryResult().searchTimedOut(true);
335352
}
336353
}
337354
} else {
@@ -348,6 +365,13 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
348365
} catch (CollectionTerminatedException e) {
349366
// collection was terminated prematurely
350367
// continue with the following leaf
368+
} catch (QueryPhase.TimeExceededException e) {
369+
// Swallow timeout exception
370+
if (searchContext.request().allowPartialSearchResults() == false) {
371+
// Can't rethrow TimeExceededException because not serializable
372+
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
373+
}
374+
searchContext.queryResult().searchTimedOut(true);
351375
}
352376
}
353377
}

0 commit comments

Comments
 (0)