Skip to content

Commit b33979a

Browse files
navneet1vmsfrohandrross
authored
Adding the SearchPhaseResultsProcessor interface in Search Pipeline (opensearch-project#7283)
* Initial code for adding the SearchPhaseInjectorProcessor interface in Search Pipeline Signed-off-by: Navneet Verma <navneev@amazon.com> * Pass PipelinedRequest to SearchAsyncActions We should resolve a search pipeline once at the start of a search request and then propagate that pipeline through the async actions. When completing a search phase, we will then use that pipeline to inject behavior (if applicable). Signed-off-by: Michael Froh <froh@amazon.com> * Renamed SearchPhaseInjectorProcessor to SearchPhaseResultsProcessor and fixed the comments Signed-off-by: Navneet Verma <navneev@amazon.com> * Make PipelinedSearchRequest extend SearchRequest Rather than wrapping a SearchRequest in a PipelinedSearchRequest, changes are less intrusive if we say that a PipelinedSearchRequest "is a" SearchRequest. Signed-off-by: Michael Froh <froh@amazon.com> * Revert code change from merge conflict Signed-off-by: Michael Froh <froh@amazon.com> * Updated the changelog with more appropiate wording for the change. Signed-off-by: Navneet Verma <navneev@amazon.com> * Fixed Typos in the code Signed-off-by: Navneet Verma <navneev@amazon.com> * Fixing comments relating to return of SearchPhaseResults from processor Signed-off-by: Navneet Verma <navneev@amazon.com> * Moved SearchPhaseName enum in separate class and fixed comments. Signed-off-by: Navneet Verma <navneev@amazon.com> * Resolve remaining merge conflict Signed-off-by: Michael Froh <froh@amazon.com> --------- Signed-off-by: Navneet Verma <navneev@amazon.com> Signed-off-by: Michael Froh <froh@amazon.com> Co-authored-by: Michael Froh <froh@amazon.com> Co-authored-by: Andrew Ross <andrross@amazon.com>
1 parent 86f955c commit b33979a

22 files changed

+402
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8080

8181
## [Unreleased 2.x]
8282
### Added
83+
- [SearchPipeline] Add new search pipeline processor type, SearchPhaseResultsProcessor, that can modify the result of one search phase before starting the next phase.([#7283](https://github.com/opensearch-project/OpenSearch/pull/7283))
8384
- Add task cancellation monitoring service ([#7642](https://github.com/opensearch-project/OpenSearch/pull/7642))
8485
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
8586
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ teardown:
3939
{
4040
"script" : {
4141
"lang" : "painless",
42-
"source" : "ctx._source['size'] += 10; ctx._source['from'] -= 1; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
42+
"source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
4343
}
4444
}
4545
]

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.opensearch.search.internal.InternalSearchResponse;
5858
import org.opensearch.search.internal.SearchContext;
5959
import org.opensearch.search.internal.ShardSearchRequest;
60+
import org.opensearch.search.pipeline.PipelinedRequest;
6061
import org.opensearch.transport.Transport;
6162

6263
import java.util.ArrayDeque;
@@ -696,7 +697,11 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
696697
* @see #onShardResult(SearchPhaseResult, SearchShardIterator)
697698
*/
698699
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
699-
executeNextPhase(this, getNextPhase(results, this));
700+
final SearchPhase nextPhase = getNextPhase(results, this);
701+
if (request instanceof PipelinedRequest && nextPhase != null) {
702+
((PipelinedRequest) request).transformSearchPhaseResults(results, this, this.getName(), nextPhase.getName());
703+
}
704+
executeNextPhase(this, nextPhase);
700705
}
701706

702707
@Override

server/src/main/java/org/opensearch/action/search/ArraySearchPhaseResults.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ boolean hasResult(int shardIndex) {
6666
}
6767

6868
@Override
69-
AtomicArray<Result> getAtomicArray() {
69+
public AtomicArray<Result> getAtomicArray() {
7070
return results;
7171
}
7272
}

server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
9494
) {
9595
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
9696
super(
97-
"can_match",
97+
SearchPhaseName.CAN_MATCH.getName(),
9898
logger,
9999
searchTransportService,
100100
nodeIdToConnection,

server/src/main/java/org/opensearch/action/search/DfsQueryPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ final class DfsQueryPhase extends SearchPhase {
6969
Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
7070
SearchPhaseContext context
7171
) {
72-
super("dfs_query");
72+
super(SearchPhaseName.DFS_QUERY.getName());
7373
this.progressListener = context.getTask().getProgressListener();
7474
this.queryResult = queryResult;
7575
this.searchResults = searchResults;

server/src/main/java/org/opensearch/action/search/ExpandSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ final class ExpandSearchPhase extends SearchPhase {
6262
private final AtomicArray<SearchPhaseResult> queryResults;
6363

6464
ExpandSearchPhase(SearchPhaseContext context, InternalSearchResponse searchResponse, AtomicArray<SearchPhaseResult> queryResults) {
65-
super("expand");
65+
super(SearchPhaseName.EXPAND.getName());
6666
this.context = context;
6767
this.searchResponse = searchResponse;
6868
this.queryResults = queryResults;

server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ final class FetchSearchPhase extends SearchPhase {
9292
SearchPhaseContext context,
9393
BiFunction<InternalSearchResponse, AtomicArray<SearchPhaseResult>, SearchPhase> nextPhaseFactory
9494
) {
95-
super("fetch");
95+
super(SearchPhaseName.FETCH.getName());
9696
if (context.getNumShards() != resultConsumer.getNumShards()) {
9797
throw new IllegalStateException(
9898
"number of shards must match the length of the query results but doesn't:"

server/src/main/java/org/opensearch/action/search/SearchPhase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.common.CheckedRunnable;
3535

3636
import java.io.IOException;
37+
import java.util.Locale;
3738
import java.util.Objects;
3839

3940
/**
@@ -54,4 +55,13 @@ protected SearchPhase(String name) {
5455
public String getName() {
5556
return name;
5657
}
58+
59+
/**
60+
* Returns the SearchPhase name as {@link SearchPhaseName}. Exception will come if SearchPhase name is not defined
61+
* in {@link SearchPhaseName}
62+
* @return {@link SearchPhaseName}
63+
*/
64+
public SearchPhaseName getSearchPhaseName() {
65+
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
66+
}
5767
}

server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
*
5151
* @opensearch.internal
5252
*/
53-
interface SearchPhaseContext extends Executor {
53+
public interface SearchPhaseContext extends Executor {
5454
// TODO maybe we can make this concrete later - for now we just implement this in the base class for all initial phases
5555

5656
/**
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
/**
12+
* Enum for different Search Phases in OpenSearch
13+
* @opensearch.internal
14+
*/
15+
public enum SearchPhaseName {
16+
QUERY("query"),
17+
FETCH("fetch"),
18+
DFS_QUERY("dfs_query"),
19+
EXPAND("expand"),
20+
CAN_MATCH("can_match");
21+
22+
private final String name;
23+
24+
SearchPhaseName(final String name) {
25+
this.name = name;
26+
}
27+
28+
public String getName() {
29+
return name;
30+
}
31+
}

server/src/main/java/org/opensearch/action/search/SearchPhaseResults.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
*
4343
* @opensearch.internal
4444
*/
45-
abstract class SearchPhaseResults<Result extends SearchPhaseResult> {
45+
public abstract class SearchPhaseResults<Result extends SearchPhaseResult> {
4646
private final int numShards;
4747

4848
SearchPhaseResults(int numShards) {
@@ -75,7 +75,13 @@ final int getNumShards() {
7575

7676
void consumeShardFailure(int shardIndex) {}
7777

78-
AtomicArray<Result> getAtomicArray() {
78+
/**
79+
* Returns an {@link AtomicArray} of {@link Result}, which are nothing but the SearchPhaseResults
80+
* for shards. The {@link Result} are of type {@link SearchPhaseResult}
81+
*
82+
* @return an {@link AtomicArray} of {@link Result}
83+
*/
84+
public AtomicArray<Result> getAtomicArray() {
7985
throw new UnsupportedOperationException();
8086
}
8187

server/src/main/java/org/opensearch/action/search/SearchScrollAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ protected SearchPhase sendResponsePhase(
266266
SearchPhaseController.ReducedQueryPhase queryPhase,
267267
final AtomicArray<? extends SearchPhaseResult> fetchResults
268268
) {
269-
return new SearchPhase("fetch") {
269+
return new SearchPhase(SearchPhaseName.FETCH.getName()) {
270270
@Override
271271
public void run() throws IOException {
272272
sendResponse(queryPhase, fetchResults);

server/src/main/java/org/opensearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ protected void executeInitialPhase(
9292

9393
@Override
9494
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
95-
return new SearchPhase("fetch") {
95+
return new SearchPhase(SearchPhaseName.FETCH.getName()) {
9696
@Override
9797
public void run() {
9898
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,13 +390,12 @@ private void executeRequest(
390390
relativeStartNanos,
391391
System::nanoTime
392392
);
393-
SearchRequest searchRequest;
393+
PipelinedRequest searchRequest;
394394
ActionListener<SearchResponse> listener;
395395
try {
396-
PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
397-
searchRequest = pipelinedRequest.transformedRequest();
396+
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
398397
listener = ActionListener.wrap(
399-
r -> originalListener.onResponse(pipelinedRequest.transformResponse(r)),
398+
r -> originalListener.onResponse(searchRequest.transformResponse(r)),
400399
originalListener::onFailure
401400
);
402401
} catch (Exception e) {

server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.plugins;
1010

1111
import org.opensearch.search.pipeline.Processor;
12+
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
1213
import org.opensearch.search.pipeline.SearchRequestProcessor;
1314
import org.opensearch.search.pipeline.SearchResponseProcessor;
1415

@@ -42,4 +43,15 @@ default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcess
4243
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
4344
return Collections.emptyMap();
4445
}
46+
47+
/**
48+
* Returns additional search pipeline search phase results processor types added by this plugin.
49+
*
50+
* The key of the returned {@link Map} is the unique name for the processor which is specified
51+
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
52+
* to create the processor from a given pipeline configuration.
53+
*/
54+
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Processor.Parameters parameters) {
55+
return Collections.emptyMap();
56+
}
4557
}

server/src/main/java/org/opensearch/search/pipeline/Pipeline.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88

99
package org.opensearch.search.pipeline;
1010

11+
import org.opensearch.action.search.SearchPhaseContext;
12+
import org.opensearch.action.search.SearchPhaseResults;
1113
import org.opensearch.action.search.SearchRequest;
1214
import org.opensearch.action.search.SearchResponse;
1315
import org.opensearch.common.Nullable;
1416
import org.opensearch.common.io.stream.BytesStreamOutput;
1517
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
1618
import org.opensearch.common.io.stream.NamedWriteableRegistry;
1719
import org.opensearch.common.io.stream.StreamInput;
20+
import org.opensearch.search.SearchPhaseResult;
1821

1922
import java.util.Collections;
2023
import java.util.List;
@@ -28,6 +31,7 @@ class Pipeline {
2831

2932
public static final String REQUEST_PROCESSORS_KEY = "request_processors";
3033
public static final String RESPONSE_PROCESSORS_KEY = "response_processors";
34+
public static final String PHASE_PROCESSORS_KEY = "phase_results_processors";
3135
private final String id;
3236
private final String description;
3337
private final Integer version;
@@ -36,7 +40,7 @@ class Pipeline {
3640
// Then these can be CompoundProcessors instead of lists.
3741
private final List<SearchRequestProcessor> searchRequestProcessors;
3842
private final List<SearchResponseProcessor> searchResponseProcessors;
39-
43+
private final List<SearchPhaseResultsProcessor> searchPhaseResultsProcessors;
4044
private final NamedWriteableRegistry namedWriteableRegistry;
4145
private final LongSupplier relativeTimeSupplier;
4246

@@ -46,6 +50,7 @@ class Pipeline {
4650
@Nullable Integer version,
4751
List<SearchRequestProcessor> requestProcessors,
4852
List<SearchResponseProcessor> responseProcessors,
53+
List<SearchPhaseResultsProcessor> phaseResultsProcessors,
4954
NamedWriteableRegistry namedWriteableRegistry,
5055
LongSupplier relativeTimeSupplier
5156
) {
@@ -54,6 +59,7 @@ class Pipeline {
5459
this.version = version;
5560
this.searchRequestProcessors = Collections.unmodifiableList(requestProcessors);
5661
this.searchResponseProcessors = Collections.unmodifiableList(responseProcessors);
62+
this.searchPhaseResultsProcessors = Collections.unmodifiableList(phaseResultsProcessors);
5763
this.namedWriteableRegistry = namedWriteableRegistry;
5864
this.relativeTimeSupplier = relativeTimeSupplier;
5965
}
@@ -78,6 +84,10 @@ List<SearchResponseProcessor> getSearchResponseProcessors() {
7884
return searchResponseProcessors;
7985
}
8086

87+
List<SearchPhaseResultsProcessor> getSearchPhaseResultsProcessors() {
88+
return searchPhaseResultsProcessors;
89+
}
90+
8191
protected void beforeTransformRequest() {}
8292

8393
protected void afterTransformRequest(long timeInNanos) {}
@@ -168,14 +178,33 @@ SearchResponse transformResponse(SearchRequest request, SearchResponse response)
168178
return response;
169179
}
170180

181+
<Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
182+
SearchPhaseResults<Result> searchPhaseResult,
183+
SearchPhaseContext context,
184+
String currentPhase,
185+
String nextPhase
186+
) throws SearchPipelineProcessingException {
187+
188+
try {
189+
for (SearchPhaseResultsProcessor searchPhaseResultsProcessor : searchPhaseResultsProcessors) {
190+
if (currentPhase.equals(searchPhaseResultsProcessor.getBeforePhase().getName())
191+
&& nextPhase.equals(searchPhaseResultsProcessor.getAfterPhase().getName())) {
192+
searchPhaseResultsProcessor.process(searchPhaseResult, context);
193+
}
194+
}
195+
} catch (RuntimeException e) {
196+
throw new SearchPipelineProcessingException(e);
197+
}
198+
}
199+
171200
static final Pipeline NO_OP_PIPELINE = new Pipeline(
172201
SearchPipelineService.NOOP_PIPELINE_ID,
173202
"Pipeline that does not transform anything",
174203
0,
175204
Collections.emptyList(),
176205
Collections.emptyList(),
206+
Collections.emptyList(),
177207
null,
178208
() -> 0L
179209
);
180-
181210
}

server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,22 @@ class PipelineWithMetrics extends Pipeline {
4343
Integer version,
4444
List<SearchRequestProcessor> requestProcessors,
4545
List<SearchResponseProcessor> responseProcessors,
46+
List<SearchPhaseResultsProcessor> phaseResultsProcessors,
4647
NamedWriteableRegistry namedWriteableRegistry,
4748
OperationMetrics totalRequestMetrics,
4849
OperationMetrics totalResponseMetrics,
4950
LongSupplier relativeTimeSupplier
5051
) {
51-
super(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry, relativeTimeSupplier);
52+
super(
53+
id,
54+
description,
55+
version,
56+
requestProcessors,
57+
responseProcessors,
58+
phaseResultsProcessors,
59+
namedWriteableRegistry,
60+
relativeTimeSupplier
61+
);
5262
this.totalRequestMetrics = totalRequestMetrics;
5363
this.totalResponseMetrics = totalResponseMetrics;
5464
for (Processor requestProcessor : getSearchRequestProcessors()) {
@@ -64,6 +74,7 @@ static PipelineWithMetrics create(
6474
Map<String, Object> config,
6575
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
6676
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
77+
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseResultsProcessorFactories,
6778
NamedWriteableRegistry namedWriteableRegistry,
6879
OperationMetrics totalRequestProcessingMetrics,
6980
OperationMetrics totalResponseProcessingMetrics
@@ -79,6 +90,16 @@ static PipelineWithMetrics create(
7990
RESPONSE_PROCESSORS_KEY
8091
);
8192
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
93+
List<Map<String, Object>> phaseResultsProcessorConfigs = ConfigurationUtils.readOptionalList(
94+
null,
95+
null,
96+
config,
97+
PHASE_PROCESSORS_KEY
98+
);
99+
List<SearchPhaseResultsProcessor> phaseResultsProcessors = readProcessors(
100+
phaseResultsProcessorFactories,
101+
phaseResultsProcessorConfigs
102+
);
82103
if (config.isEmpty() == false) {
83104
throw new OpenSearchParseException(
84105
"pipeline ["
@@ -93,6 +114,7 @@ static PipelineWithMetrics create(
93114
version,
94115
requestProcessors,
95116
responseProcessors,
117+
phaseResultsProcessors,
96118
namedWriteableRegistry,
97119
totalRequestProcessingMetrics,
98120
totalResponseProcessingMetrics,

0 commit comments

Comments
 (0)