Skip to content

Commit 82fb7ab

Browse files
authored
Introduce QueryPhaseSearcher extension point (SearchPlugin) (#1931)
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent bd2d935 commit 82fb7ab

File tree

11 files changed

+259
-21
lines changed

11 files changed

+259
-21
lines changed

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@
174174
import org.opensearch.search.SearchService;
175175
import org.opensearch.search.aggregations.support.AggregationUsageService;
176176
import org.opensearch.search.fetch.FetchPhase;
177+
import org.opensearch.search.query.QueryPhase;
177178
import org.opensearch.snapshots.InternalSnapshotsInfoService;
178179
import org.opensearch.snapshots.RestoreService;
179180
import org.opensearch.snapshots.SnapshotShardsService;
@@ -210,6 +211,7 @@
210211
import java.util.Optional;
211212
import java.util.Set;
212213
import java.util.concurrent.CountDownLatch;
214+
import java.util.concurrent.Executor;
213215
import java.util.concurrent.TimeUnit;
214216
import java.util.function.Function;
215217
import java.util.function.UnaryOperator;
@@ -849,9 +851,11 @@ protected Node(
849851
threadPool,
850852
scriptService,
851853
bigArrays,
854+
searchModule.getQueryPhase(),
852855
searchModule.getFetchPhase(),
853856
responseCollectorService,
854-
circuitBreakerService
857+
circuitBreakerService,
858+
searchModule.getIndexSearcherExecutor(threadPool)
855859
);
856860

857861
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
@@ -1407,19 +1411,23 @@ protected SearchService newSearchService(
14071411
ThreadPool threadPool,
14081412
ScriptService scriptService,
14091413
BigArrays bigArrays,
1414+
QueryPhase queryPhase,
14101415
FetchPhase fetchPhase,
14111416
ResponseCollectorService responseCollectorService,
1412-
CircuitBreakerService circuitBreakerService
1417+
CircuitBreakerService circuitBreakerService,
1418+
Executor indexSearcherExecutor
14131419
) {
14141420
return new SearchService(
14151421
clusterService,
14161422
indicesService,
14171423
threadPool,
14181424
scriptService,
14191425
bigArrays,
1426+
queryPhase,
14201427
fetchPhase,
14211428
responseCollectorService,
1422-
circuitBreakerService
1429+
circuitBreakerService,
1430+
indexSearcherExecutor
14231431
);
14241432
}
14251433

server/src/main/java/org/opensearch/plugins/SearchPlugin.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.plugins;
3434

35+
import org.apache.lucene.search.IndexSearcher;
3536
import org.apache.lucene.search.Query;
3637
import org.apache.lucene.search.Sort;
3738
import org.opensearch.common.CheckedFunction;
@@ -40,6 +41,7 @@
4041
import org.opensearch.common.io.stream.StreamInput;
4142
import org.opensearch.common.io.stream.Writeable;
4243
import org.opensearch.common.lucene.search.function.ScoreFunction;
44+
import org.opensearch.common.settings.Settings;
4345
import org.opensearch.common.xcontent.ContextParser;
4446
import org.opensearch.common.xcontent.XContent;
4547
import org.opensearch.common.xcontent.XContentParser;
@@ -61,18 +63,22 @@
6163
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
6264
import org.opensearch.search.fetch.FetchSubPhase;
6365
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
66+
import org.opensearch.search.query.QueryPhaseSearcher;
6467
import org.opensearch.search.rescore.Rescorer;
6568
import org.opensearch.search.rescore.RescorerBuilder;
6669
import org.opensearch.search.sort.SortBuilder;
6770
import org.opensearch.search.sort.SortParser;
6871
import org.opensearch.search.suggest.Suggest;
6972
import org.opensearch.search.suggest.Suggester;
7073
import org.opensearch.search.suggest.SuggestionBuilder;
74+
import org.opensearch.threadpool.ThreadPool;
7175

7276
import java.io.IOException;
7377
import java.util.List;
7478
import java.util.Map;
79+
import java.util.Optional;
7580
import java.util.TreeMap;
81+
import java.util.concurrent.ExecutorService;
7682
import java.util.function.BiFunction;
7783
import java.util.function.Consumer;
7884

@@ -178,6 +184,36 @@ default List<RescorerSpec<?>> getRescorers() {
178184
return emptyList();
179185
}
180186

187+
/**
188+
* The new {@link QueryPhaseSearcher} added by this plugin. At the moment, only one {@link QueryPhaseSearcher} is supported per
189+
* instance, the {@link IllegalStateException} is going to be thrown if more then one plugin tries to register
190+
* {@link QueryPhaseSearcher} implementation.
191+
*/
192+
default Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
193+
return Optional.empty();
194+
}
195+
196+
/**
197+
* The executor service provider (registered through {@link Plugin#getExecutorBuilders(Settings)} to be used at search
198+
* time by {@link IndexSearcher}. The {@link IllegalStateException} is going to be thrown if more then one
199+
* plugin tries to register index searcher executor.
200+
*/
201+
default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
202+
return Optional.empty();
203+
}
204+
205+
/**
206+
* Executor service provider
207+
*/
208+
interface ExecutorServiceProvider {
209+
/**
210+
* Provides an executor service instance
211+
* @param threadPool thread pool
212+
* @return executor service instance
213+
*/
214+
ExecutorService getExecutor(ThreadPool threadPool);
215+
}
216+
181217
/**
182218
* Specification of custom {@link ScoreFunction}.
183219
*/

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import java.util.HashMap;
9595
import java.util.List;
9696
import java.util.Map;
97+
import java.util.concurrent.Executor;
9798
import java.util.function.LongSupplier;
9899

99100
final class DefaultSearchContext extends SearchContext {
@@ -177,7 +178,8 @@ final class DefaultSearchContext extends SearchContext {
177178
FetchPhase fetchPhase,
178179
boolean lowLevelCancellation,
179180
Version minNodeVersion,
180-
boolean validate
181+
boolean validate,
182+
Executor executor
181183
) throws IOException {
182184
this.readerContext = readerContext;
183185
this.request = request;
@@ -198,7 +200,8 @@ final class DefaultSearchContext extends SearchContext {
198200
engineSearcher.getSimilarity(),
199201
engineSearcher.getQueryCache(),
200202
engineSearcher.getQueryCachingPolicy(),
201-
lowLevelCancellation
203+
lowLevelCancellation,
204+
executor
202205
);
203206
this.relativeTimeSupplier = relativeTimeSupplier;
204207
this.timeout = timeout;

server/src/main/java/org/opensearch/search/SearchModule.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.apache.lucene.search.BooleanQuery;
3636
import org.opensearch.common.NamedRegistry;
37+
import org.opensearch.common.Nullable;
3738
import org.opensearch.common.ParseField;
3839
import org.opensearch.common.geo.GeoShapeType;
3940
import org.opensearch.common.geo.ShapesAvailability;
@@ -273,6 +274,8 @@
273274
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
274275
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
275276
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
277+
import org.opensearch.search.query.QueryPhase;
278+
import org.opensearch.search.query.QueryPhaseSearcher;
276279
import org.opensearch.search.rescore.QueryRescorerBuilder;
277280
import org.opensearch.search.rescore.RescorerBuilder;
278281
import org.opensearch.search.sort.FieldSortBuilder;
@@ -293,11 +296,14 @@
293296
import org.opensearch.search.suggest.phrase.StupidBackoff;
294297
import org.opensearch.search.suggest.term.TermSuggestion;
295298
import org.opensearch.search.suggest.term.TermSuggestionBuilder;
299+
import org.opensearch.threadpool.ThreadPool;
296300

297301
import java.util.ArrayList;
298302
import java.util.Arrays;
299303
import java.util.List;
300304
import java.util.Map;
305+
import java.util.Optional;
306+
import java.util.concurrent.ExecutorService;
301307
import java.util.function.Consumer;
302308
import java.util.function.Function;
303309

@@ -329,6 +335,8 @@ public class SearchModule {
329335
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
330336
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
331337
private final ValuesSourceRegistry valuesSourceRegistry;
338+
private final QueryPhaseSearcher queryPhaseSearcher;
339+
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;
332340

333341
/**
334342
* Constructs a new SearchModule object
@@ -355,6 +363,8 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
355363
registerSearchExts(plugins);
356364
registerShapes();
357365
registerIntervalsSourceProviders();
366+
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
367+
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
358368
namedWriteables.addAll(SortValue.namedWriteables());
359369
}
360370

@@ -1282,7 +1292,49 @@ private void registerSort(SortSpec<?> spec) {
12821292
);
12831293
}
12841294

1295+
private QueryPhaseSearcher registerQueryPhaseSearcher(List<SearchPlugin> plugins) {
1296+
QueryPhaseSearcher searcher = null;
1297+
1298+
for (SearchPlugin plugin : plugins) {
1299+
final Optional<QueryPhaseSearcher> searcherOpt = plugin.getQueryPhaseSearcher();
1300+
1301+
if (searcher == null) {
1302+
searcher = searcherOpt.orElse(null);
1303+
} else if (searcherOpt.isPresent()) {
1304+
throw new IllegalStateException("Only one QueryPhaseSearcher is allowed, but more than one are provided by the plugins");
1305+
}
1306+
}
1307+
1308+
return searcher;
1309+
}
1310+
1311+
private SearchPlugin.ExecutorServiceProvider registerIndexSearcherExecutorProvider(List<SearchPlugin> plugins) {
1312+
SearchPlugin.ExecutorServiceProvider provider = null;
1313+
1314+
for (SearchPlugin plugin : plugins) {
1315+
final Optional<SearchPlugin.ExecutorServiceProvider> providerOpt = plugin.getIndexSearcherExecutorProvider();
1316+
1317+
if (provider == null) {
1318+
provider = providerOpt.orElse(null);
1319+
} else if (providerOpt.isPresent()) {
1320+
throw new IllegalStateException(
1321+
"The index searcher executor is already assigned but more than one are provided by the plugins"
1322+
);
1323+
}
1324+
}
1325+
1326+
return provider;
1327+
}
1328+
12851329
public FetchPhase getFetchPhase() {
12861330
return new FetchPhase(fetchSubPhases);
12871331
}
1332+
1333+
public QueryPhase getQueryPhase() {
1334+
return (queryPhaseSearcher == null) ? new QueryPhase() : new QueryPhase(queryPhaseSearcher);
1335+
}
1336+
1337+
public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
1338+
return (indexSearcherExecutorProvider == null) ? null : indexSearcherExecutorProvider.getExecutor(pool);
1339+
}
12881340
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,16 +256,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
256256

257257
private final AtomicInteger openScrollContexts = new AtomicInteger();
258258
private final String sessionId = UUIDs.randomBase64UUID();
259+
private final Executor indexSearcherExecutor;
259260

260261
public SearchService(
261262
ClusterService clusterService,
262263
IndicesService indicesService,
263264
ThreadPool threadPool,
264265
ScriptService scriptService,
265266
BigArrays bigArrays,
267+
QueryPhase queryPhase,
266268
FetchPhase fetchPhase,
267269
ResponseCollectorService responseCollectorService,
268-
CircuitBreakerService circuitBreakerService
270+
CircuitBreakerService circuitBreakerService,
271+
Executor indexSearcherExecutor
269272
) {
270273
Settings settings = clusterService.getSettings();
271274
this.threadPool = threadPool;
@@ -274,13 +277,14 @@ public SearchService(
274277
this.scriptService = scriptService;
275278
this.responseCollectorService = responseCollectorService;
276279
this.bigArrays = bigArrays;
277-
this.queryPhase = new QueryPhase();
280+
this.queryPhase = queryPhase;
278281
this.fetchPhase = fetchPhase;
279282
this.multiBucketConsumerService = new MultiBucketConsumerService(
280283
clusterService,
281284
settings,
282285
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
283286
);
287+
this.indexSearcherExecutor = indexSearcherExecutor;
284288

285289
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
286290
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
@@ -884,7 +888,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
884888
fetchPhase,
885889
lowLevelCancellation,
886890
clusterService.state().nodes().getMinNodeVersion(),
887-
validate
891+
validate,
892+
indexSearcherExecutor
888893
);
889894
// we clone the query shard context here just for rewriting otherwise we
890895
// might end up with incorrect state since we are using now() or script services

0 commit comments

Comments
 (0)