Skip to content

Commit 59fa1fa

Browse files
authored
Add early termination support for concurrent segment search (#8306) (#8537)
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent e0cf064 commit 59fa1fa

File tree

14 files changed

+188
-49
lines changed

14 files changed

+188
-49
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545))
2828
- Create concept of persistent ThreadContext headers that are unstashable ([#8291]()https://github.com/opensearch-project/OpenSearch/pull/8291)
2929
- Enable Partial Flat Object ([#7997](https://github.com/opensearch-project/OpenSearch/pull/7997))
30+
- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306))
3031

3132
### Dependencies
3233
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
import org.opensearch.common.settings.FeatureFlagSettings;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.FeatureFlags;
15+
16+
public class ConcurrentSegmentSearchCancellationIT extends SearchCancellationIT {
17+
@Override
18+
protected Settings featureFlagSettings() {
19+
Settings.Builder featureSettings = Settings.builder();
20+
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
21+
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
22+
}
23+
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
24+
return featureSettings.build();
25+
}
26+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
import org.opensearch.common.settings.FeatureFlagSettings;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.FeatureFlags;
15+
16+
public class ConcurrentSegmentSearchTimeoutIT extends SearchTimeoutIT {
17+
18+
@Override
19+
protected Settings featureFlagSettings() {
20+
Settings.Builder featureSettings = Settings.builder();
21+
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
22+
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
23+
}
24+
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
25+
return featureSettings.build();
26+
}
27+
}

server/src/internalClusterTest/java/org/opensearch/search/SearchTimeoutIT.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
5252
import static org.opensearch.index.query.QueryBuilders.scriptQuery;
5353
import static org.opensearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME;
54-
import static org.hamcrest.Matchers.equalTo;
5554

5655
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
5756
public class SearchTimeoutIT extends OpenSearchIntegTestCase {
@@ -67,17 +66,37 @@ protected Settings nodeSettings(int nodeOrdinal) {
6766
}
6867

6968
public void testSimpleTimeout() throws Exception {
70-
for (int i = 0; i < 32; i++) {
69+
final int numDocs = 1000;
70+
for (int i = 0; i < numDocs; i++) {
7171
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
7272
}
7373
refresh("test");
7474

7575
SearchResponse searchResponse = client().prepareSearch("test")
76-
.setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
76+
.setTimeout(new TimeValue(5, TimeUnit.MILLISECONDS))
7777
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
7878
.setAllowPartialSearchResults(true)
7979
.get();
80-
assertThat(searchResponse.isTimedOut(), equalTo(true));
80+
assertTrue(searchResponse.isTimedOut());
81+
assertEquals(0, searchResponse.getFailedShards());
82+
assertTrue(numDocs > searchResponse.getHits().getTotalHits().value);
83+
}
84+
85+
public void testSimpleDoesNotTimeout() throws Exception {
86+
final int numDocs = 10;
87+
for (int i = 0; i < numDocs; i++) {
88+
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
89+
}
90+
refresh("test");
91+
92+
SearchResponse searchResponse = client().prepareSearch("test")
93+
.setTimeout(new TimeValue(10000, TimeUnit.SECONDS))
94+
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
95+
.setAllowPartialSearchResults(true)
96+
.get();
97+
assertFalse(searchResponse.isTimedOut());
98+
assertEquals(0, searchResponse.getFailedShards());
99+
assertEquals(numDocs, searchResponse.getHits().getTotalHits().value);
81100
}
82101

83102
public void testPartialResultsIntolerantTimeout() throws Exception {
@@ -91,7 +110,7 @@ public void testPartialResultsIntolerantTimeout() throws Exception {
91110
.setAllowPartialSearchResults(false) // this line causes timeouts to report failures
92111
.get()
93112
);
94-
assertTrue(ex.toString().contains("Time exceeded"));
113+
assertTrue(ex.toString().contains("QueryPhaseExecutionException[Time exceeded]"));
95114
}
96115

97116
public static class ScriptedTimeoutPlugin extends MockScriptPlugin {

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.opensearch.search.profile.query.ProfileWeight;
7474
import org.opensearch.search.profile.query.QueryProfiler;
7575
import org.opensearch.search.profile.query.QueryTimingType;
76+
import org.opensearch.search.query.QueryPhase;
7677
import org.opensearch.search.query.QuerySearchResult;
7778
import org.opensearch.search.sort.FieldSortBuilder;
7879
import org.opensearch.search.sort.MinAndMax;
@@ -103,26 +104,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
103104
private MutableQueryTimeout cancellable;
104105
private SearchContext searchContext;
105106

106-
public ContextIndexSearcher(
107-
IndexReader reader,
108-
Similarity similarity,
109-
QueryCache queryCache,
110-
QueryCachingPolicy queryCachingPolicy,
111-
boolean wrapWithExitableDirectoryReader,
112-
Executor executor
113-
) throws IOException {
114-
this(
115-
reader,
116-
similarity,
117-
queryCache,
118-
queryCachingPolicy,
119-
new MutableQueryTimeout(),
120-
wrapWithExitableDirectoryReader,
121-
executor,
122-
null
123-
);
124-
}
125-
126107
public ContextIndexSearcher(
127108
IndexReader reader,
128109
Similarity similarity,
@@ -310,18 +291,22 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
310291
return;
311292
}
312293

313-
cancellable.checkCancelled();
314-
weight = wrapWeight(weight);
315-
// See please https://github.com/apache/lucene/pull/964
316-
collector.setWeight(weight);
317294
final LeafCollector leafCollector;
318295
try {
296+
cancellable.checkCancelled();
297+
weight = wrapWeight(weight);
298+
// See please https://github.com/apache/lucene/pull/964
299+
collector.setWeight(weight);
319300
leafCollector = collector.getLeafCollector(ctx);
320301
} catch (CollectionTerminatedException e) {
321302
// there is no doc of interest in this reader context
322303
// continue with the following leaf
323304
return;
305+
} catch (QueryPhase.TimeExceededException e) {
306+
searchContext.setSearchTimedOut(true);
307+
return;
324308
}
309+
// catch early terminated exception and rethrow?
325310
Bits liveDocs = ctx.reader().getLiveDocs();
326311
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
327312
if (liveDocsBitSet == null) {
@@ -332,6 +317,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
332317
} catch (CollectionTerminatedException e) {
333318
// collection was terminated prematurely
334319
// continue with the following leaf
320+
} catch (QueryPhase.TimeExceededException e) {
321+
searchContext.setSearchTimedOut(true);
322+
return;
335323
}
336324
}
337325
} else {
@@ -348,6 +336,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
348336
} catch (CollectionTerminatedException e) {
349337
// collection was terminated prematurely
350338
// continue with the following leaf
339+
} catch (QueryPhase.TimeExceededException e) {
340+
searchContext.setSearchTimedOut(true);
341+
return;
351342
}
352343
}
353344
}
@@ -492,7 +483,7 @@ private boolean canMatch(LeafReaderContext ctx) throws IOException {
492483
}
493484

494485
private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
495-
if (searchContext != null && searchContext.request() != null && searchContext.request().source() != null) {
486+
if (searchContext.request() != null && searchContext.request().source() != null) {
496487
// Only applied on primary sort field and primary search_after.
497488
FieldSortBuilder primarySortField = FieldSortBuilder.getPrimaryFieldSortOrNull(searchContext.request().source());
498489
if (primarySortField != null) {
@@ -512,7 +503,7 @@ private boolean shouldReverseLeafReaderContexts() {
512503
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
513504
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
514505
// reader order here.
515-
if (searchContext != null && searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
506+
if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
516507
// Only reverse order for asc order sort queries
517508
if (searchContext.sort() != null
518509
&& searchContext.sort().sort != null

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public abstract class SearchContext implements Releasable {
9898
private final AtomicBoolean closed = new AtomicBoolean(false);
9999
private InnerHitsContext innerHitsContext;
100100

101+
private volatile boolean searchTimedOut;
102+
101103
protected SearchContext() {}
102104

103105
public abstract void setTask(SearchShardTask task);
@@ -106,6 +108,14 @@ protected SearchContext() {}
106108

107109
public abstract boolean isCancelled();
108110

111+
public boolean isSearchTimedOut() {
112+
return this.searchTimedOut;
113+
}
114+
115+
public void setSearchTimedOut(boolean searchTimedOut) {
116+
this.searchTimedOut = searchTimedOut;
117+
}
118+
109119
@Override
110120
public final void close() {
111121
if (closed.compareAndSet(false, true)) {

server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import org.opensearch.search.internal.SearchContext;
2020
import org.opensearch.search.profile.query.ProfileCollectorManager;
2121
import org.opensearch.search.query.QueryPhase.DefaultQueryPhaseSearcher;
22-
import org.opensearch.search.query.QueryPhase.TimeExceededException;
2322

2423
import java.io.IOException;
2524
import java.util.LinkedList;
25+
import java.util.concurrent.ExecutionException;
2626

2727
import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;
2828

@@ -80,12 +80,12 @@ private static boolean searchWithCollectorManager(
8080
try {
8181
final ReduceableSearchResult result = searcher.search(query, collectorManager);
8282
result.reduce(queryResult);
83-
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
84-
queryResult.terminatedEarly(true);
85-
} catch (TimeExceededException e) {
83+
} catch (RuntimeException re) {
84+
rethrowCauseIfPossible(re, searchContext);
85+
}
86+
if (searchContext.isSearchTimedOut()) {
8687
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
8788
if (searchContext.request().allowPartialSearchResults() == false) {
88-
// Can't rethrow TimeExceededException because not serializable
8989
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
9090
}
9191
queryResult.searchTimedOut(true);
@@ -101,4 +101,26 @@ private static boolean searchWithCollectorManager(
101101
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
102102
return aggregationProcessor;
103103
}
104+
105+
private static <T extends Exception> void rethrowCauseIfPossible(RuntimeException re, SearchContext searchContext) throws T {
106+
// Rethrow exception if cause is null
107+
if (re.getCause() == null) {
108+
throw re;
109+
}
110+
111+
// Unwrap the RuntimeException and ExecutionException from Lucene concurrent search method and rethrow
112+
if (re.getCause() instanceof ExecutionException || re.getCause() instanceof InterruptedException) {
113+
Throwable t = re.getCause();
114+
if (t.getCause() != null) {
115+
throw (T) t.getCause();
116+
}
117+
}
118+
119+
// Rethrow any unexpected exception types
120+
throw new QueryPhaseExecutionException(
121+
searchContext.shardTarget(),
122+
"Failed to execute concurrent segment search thread",
123+
re.getCause()
124+
);
125+
}
104126
}

server/src/main/java/org/opensearch/search/query/QueryPhase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,10 @@ private static boolean searchWithCollector(
354354
searcher.search(query, queryCollector);
355355
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
356356
queryResult.terminatedEarly(true);
357-
} catch (TimeExceededException e) {
357+
}
358+
if (searchContext.isSearchTimedOut()) {
358359
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
359360
if (searchContext.request().allowPartialSearchResults() == false) {
360-
// Can't rethrow TimeExceededException because not serializable
361361
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
362362
}
363363
queryResult.searchTimedOut(true);

server/src/test/java/org/opensearch/search/SearchCancellationTests.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import org.apache.lucene.util.automaton.CompiledAutomaton;
5050
import org.apache.lucene.util.automaton.RegExp;
5151
import org.opensearch.common.util.io.IOUtils;
52+
import org.opensearch.index.shard.IndexShard;
5253
import org.opensearch.search.internal.ContextIndexSearcher;
54+
import org.opensearch.search.internal.SearchContext;
5355
import org.opensearch.tasks.TaskCancelledException;
5456
import org.opensearch.test.OpenSearchTestCase;
5557
import org.junit.AfterClass;
@@ -59,6 +61,8 @@
5961
import java.util.concurrent.atomic.AtomicBoolean;
6062

6163
import static org.hamcrest.Matchers.equalTo;
64+
import static org.mockito.Mockito.mock;
65+
import static org.mockito.Mockito.when;
6266

6367
public class SearchCancellationTests extends OpenSearchTestCase {
6468

@@ -109,7 +113,8 @@ public void testAddingCancellationActions() throws IOException {
109113
IndexSearcher.getDefaultQueryCache(),
110114
IndexSearcher.getDefaultQueryCachingPolicy(),
111115
true,
112-
null
116+
null,
117+
mock(SearchContext.class)
113118
);
114119
NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null));
115120
assertEquals("cancellation runnable should not be null", npe.getMessage());
@@ -123,13 +128,17 @@ public void testAddingCancellationActions() throws IOException {
123128
public void testCancellableCollector() throws IOException {
124129
TotalHitCountCollector collector1 = new TotalHitCountCollector();
125130
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
131+
SearchContext searchContext = mock(SearchContext.class);
132+
IndexShard indexShard = mock(IndexShard.class);
133+
when(searchContext.indexShard()).thenReturn(indexShard);
126134
ContextIndexSearcher searcher = new ContextIndexSearcher(
127135
reader,
128136
IndexSearcher.getDefaultSimilarity(),
129137
IndexSearcher.getDefaultQueryCache(),
130138
IndexSearcher.getDefaultQueryCachingPolicy(),
131139
true,
132-
null
140+
null,
141+
searchContext
133142
);
134143

135144
searcher.search(new MatchAllDocsQuery(), collector1);
@@ -157,7 +166,8 @@ public void testExitableDirectoryReader() throws IOException {
157166
IndexSearcher.getDefaultQueryCache(),
158167
IndexSearcher.getDefaultQueryCachingPolicy(),
159168
true,
160-
null
169+
null,
170+
mock(SearchContext.class)
161171
);
162172
searcher.addQueryCancellation(cancellation);
163173
CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton());

0 commit comments

Comments
 (0)