Skip to content

Commit defbd60

Browse files
authored
Handle default search pipeline for multiple indices (opensearch-project#13276)
Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
1 parent 4ee984f commit defbd60

File tree

4 files changed

+291
-41
lines changed

4 files changed

+291
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
3232
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
3333
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))
34+
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))
3435

3536
### Dependencies
3637
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ private void executeRequest(
457457
PipelinedRequest searchRequest;
458458
ActionListener<SearchResponse> listener;
459459
try {
460-
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
460+
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver);
461461
listener = searchRequest.transformResponseListener(updatedListener);
462462
} catch (Exception e) {
463463
updatedListener.onFailure(e);

server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.cluster.ClusterState;
2424
import org.opensearch.cluster.ClusterStateApplier;
2525
import org.opensearch.cluster.metadata.IndexMetadata;
26+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
2627
import org.opensearch.cluster.metadata.Metadata;
2728
import org.opensearch.cluster.node.DiscoveryNode;
2829
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
@@ -35,10 +36,12 @@
3536
import org.opensearch.common.xcontent.XContentHelper;
3637
import org.opensearch.core.action.ActionListener;
3738
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
39+
import org.opensearch.core.index.Index;
3840
import org.opensearch.core.service.ReportingService;
3941
import org.opensearch.core.xcontent.NamedXContentRegistry;
4042
import org.opensearch.env.Environment;
4143
import org.opensearch.gateway.GatewayService;
44+
import org.opensearch.index.IndexNotFoundException;
4245
import org.opensearch.index.IndexSettings;
4346
import org.opensearch.index.analysis.AnalysisRegistry;
4447
import org.opensearch.ingest.ConfigurationUtils;
@@ -62,6 +65,8 @@
6265
/**
6366
* The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines
6467
* against requests and responses.
68+
*
69+
* @opensearch.internal
6570
*/
6671
public class SearchPipelineService implements ClusterStateApplier, ReportingService<SearchPipelineInfo> {
6772

@@ -360,7 +365,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat
360365
return newState.build();
361366
}
362367

363-
public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
368+
public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) {
364369
Pipeline pipeline = Pipeline.NO_OP_PIPELINE;
365370

366371
if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) {
@@ -390,14 +395,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
390395
if (searchRequest.pipeline() != null) {
391396
// Named pipeline specified for the request
392397
pipelineId = searchRequest.pipeline();
393-
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) {
394-
// Check for index default pipeline
395-
IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]);
396-
if (indexMetadata != null) {
397-
Settings indexSettings = indexMetadata.getSettings();
398-
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
399-
pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
398+
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length != 0) {
399+
try {
400+
// Check for index default pipeline
401+
Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest);
402+
for (Index index : concreteIndices) {
403+
IndexMetadata indexMetadata = state.metadata().index(index);
404+
if (indexMetadata != null) {
405+
Settings indexSettings = indexMetadata.getSettings();
406+
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
407+
String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
408+
if (NOOP_PIPELINE_ID.equals(pipelineId)) {
409+
pipelineId = currentPipelineId;
410+
} else if (!pipelineId.equals(currentPipelineId)) {
411+
pipelineId = NOOP_PIPELINE_ID;
412+
break;
413+
}
414+
}
415+
}
400416
}
417+
} catch (IndexNotFoundException e) {
418+
logger.debug("Default pipeline not applied for {}", (Object) searchRequest.indices());
401419
}
402420
}
403421
if (NOOP_PIPELINE_ID.equals(pipelineId) == false) {

0 commit comments

Comments
 (0)