Skip to content

Commit 08fa4d3

Browse files
committed
Handled default search pipeline for multiple indices
Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
1 parent 84679de commit 08fa4d3

File tree

4 files changed

+213
-40
lines changed

4 files changed

+213
-40
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
2424
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
2525
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
26+
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))
2627

2728
### Dependencies
2829
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ private void executeRequest(
457457
PipelinedRequest searchRequest;
458458
ActionListener<SearchResponse> listener;
459459
try {
460-
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
460+
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver);
461461
listener = searchRequest.transformResponseListener(updatedListener);
462462
} catch (Exception e) {
463463
updatedListener.onFailure(e);

server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.cluster.ClusterState;
2424
import org.opensearch.cluster.ClusterStateApplier;
2525
import org.opensearch.cluster.metadata.IndexMetadata;
26+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
2627
import org.opensearch.cluster.metadata.Metadata;
2728
import org.opensearch.cluster.node.DiscoveryNode;
2829
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
@@ -35,6 +36,7 @@
3536
import org.opensearch.common.xcontent.XContentHelper;
3637
import org.opensearch.core.action.ActionListener;
3738
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
39+
import org.opensearch.core.index.Index;
3840
import org.opensearch.core.service.ReportingService;
3941
import org.opensearch.core.xcontent.NamedXContentRegistry;
4042
import org.opensearch.env.Environment;
@@ -360,7 +362,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat
360362
return newState.build();
361363
}
362364

363-
public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
365+
public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) {
364366
Pipeline pipeline = Pipeline.NO_OP_PIPELINE;
365367

366368
if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) {
@@ -390,13 +392,22 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
390392
if (searchRequest.pipeline() != null) {
391393
// Named pipeline specified for the request
392394
pipelineId = searchRequest.pipeline();
393-
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) {
395+
} else if (state != null && searchRequest.indices() != null) {
394396
// Check for index default pipeline
395-
IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]);
396-
if (indexMetadata != null) {
397-
Settings indexSettings = indexMetadata.getSettings();
398-
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
399-
pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
397+
Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest);
398+
for (Index index : concreteIndices) {
399+
IndexMetadata indexMetadata = state.metadata().index(index);
400+
if (indexMetadata != null) {
401+
Settings indexSettings = indexMetadata.getSettings();
402+
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
403+
String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
404+
if (NOOP_PIPELINE_ID.equals(pipelineId)) {
405+
pipelineId = currentPipelineId;
406+
} else if (pipelineId.equals(currentPipelineId) == false) {
407+
pipelineId = NOOP_PIPELINE_ID;
408+
break;
409+
}
410+
}
400411
}
401412
}
402413
}

0 commit comments

Comments
 (0)