diff --git a/docs/changelog/132011.yaml b/docs/changelog/132011.yaml new file mode 100644 index 0000000000000..6eed71f4ec27b --- /dev/null +++ b/docs/changelog/132011.yaml @@ -0,0 +1,5 @@ +pr: 132011 +summary: Restrict Indexing To Child Streams When Streams Is Enabled +area: Data streams +type: enhancement +issues: [] diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index fd56a627026b6..24b651da291af 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams' + include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search" } } @@ -38,4 +38,6 @@ artifacts { dependencies { testImplementation project(path: ':test:test-clusters') + clusterModules project(':modules:ingest-common') + clusterModules project(':modules:reindex') } diff --git a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java index 9d5a1033faf57..e01594fe51c76 100644 --- a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java +++ b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java @@ -29,7 +29,12 @@ public static Iterable parameters() throws Exception { } @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build(); + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("streams") + .module("ingest-common") + .module("reindex") + .feature(FeatureFlag.LOGS_STREAM) + .build(); @Override protected String getTestRestCluster() { diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml new file mode 100644 index 0000000000000..621985985295a --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -0,0 +1,156 @@ +--- +"Check User Can't Write To Substream Directly": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + body: | + { "index": { "_index": "logs.foo" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } + +--- +"Check User Can't Write To Substream Directly With Single Doc": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + catch: bad_request + index: + index: logs.foo + id: "1" + body: + foo: bar + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } + +--- +"Check Bulk Index With Reroute Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "bad-index" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" } + +--- +"Check Bulk Index With Script Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "script-to-logs-foo" + body: + processors: + - script: + source: "ctx._index = 'logs.foo'" + - do: + indices.create: + index: "bad-index-script" + body: + settings: + index.default_pipeline: "script-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index-script" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" } + +--- +"Check Delete By Query Directly On Substream After Reroute Succeeds": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo-success" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "logs" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo-success" + - do: + bulk: + refresh: true + body: | + { "index": { "_index": "logs" } } + { "foo": "bar", "baz": "qux" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + + - do: + delete_by_query: + index: logs.foo + refresh: true + body: + query: + match: + foo: "bar" + - match: { deleted: 1 } + - match: { total: 1 } + + - do: + search: + index: logs.foo + body: + query: + match_all: {} + - match: { hits.total.value: 0 } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index dda5294b2c0a2..65b71d4b0b40b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -105,47 +106,87 @@ BulkRequest getBulkRequest() { * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest * service with the results returned from running the remaining write operations. + *
+ * Use this method when you want the ingest time to be taken from the actual {@link BulkResponse} such as if you are wrapping + * a response multiple times and wish to preserve an already calculated ingest time. * - * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result. - * @param actionListener The action listener that expects the final bulk response. - * @return An action listener that combines ingest failure results with the results from writing the remaining documents. + * @param actionListener the listener to wrap + * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed + */ + ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { + if (itemResponses.isEmpty()) { + return actionListener; + } else { + return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener); + } + } + + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + *
+ * This variant is used when the ingest time is already known and should be explicitly set in the final response, + * rather than extracted from the {@link BulkResponse}. + * + * @param ingestTookInMillis the ingest time in milliseconds to use in the final response + * @param actionListener the listener to wrap + * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed */ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { return actionListener.map( response -> new BulkResponse( response.getItems(), - response.getTook().getMillis(), + response.getTookInMillis(), ingestTookInMillis, response.getIncrementalState() ) ); } else { - return actionListener.map(response -> { - // these items are the responses from the subsequent bulk request, their 'slots' - // are not correct for this response we're building - final BulkItemResponse[] bulkResponses = response.getItems(); + return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener); + } + } + + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + * + * @param ingestTimeProviderFunction A function to provide the ingest time taken for this response + * @param actionListener The action listener that expects the final bulk response. + * @return An action listener that combines ingest failure results with the results from writing the remaining documents. + */ + private ActionListener doWrapActionListenerIfNeeded( + Function ingestTimeProviderFunction, + ActionListener actionListener + ) { + return actionListener.map(response -> { + // these items are the responses from the subsequent bulk request, their 'slots' + // are not correct for this response we're building + final BulkItemResponse[] bulkResponses = response.getItems(); - final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()]; + final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()]; - // the item responses are from the original request, so their slots are correct. - // these are the responses for requests that failed early and were not passed on to the subsequent bulk. - for (BulkItemResponse item : itemResponses) { - allResponses[item.getItemId()] = item; - } + // the item responses are from the original request, so their slots are correct. + // these are the responses for requests that failed early and were not passed on to the subsequent bulk. + for (BulkItemResponse item : itemResponses) { + allResponses[item.getItemId()] = item; + } - // use the original slots for the responses from the bulk - for (int i = 0; i < bulkResponses.length; i++) { - allResponses[originalSlots.get(i)] = bulkResponses[i]; - } + // use the original slots for the responses from the bulk + for (int i = 0; i < bulkResponses.length; i++) { + allResponses[originalSlots.get(i)] = bulkResponses[i]; + } - if (Assertions.ENABLED) { - assertResponsesAreCorrect(bulkResponses, allResponses); - } + if (Assertions.ENABLED) { + assertResponsesAreCorrect(bulkResponses, allResponses); + } - return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState()); - }); - } + var ingestTookInMillis = ingestTimeProviderFunction.apply(response); + + return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState()); + }); } private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index f003cd3fc107d..12a583251516a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -26,15 +26,18 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; @@ -69,6 +72,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction protected final Executor coordinationExecutor; protected final Executor systemCoordinationExecutor; private final ActionType bulkAction; + protected final FeatureService featureService; public TransportAbstractBulkAction( ActionType action, @@ -81,7 +85,8 @@ public TransportAbstractBulkAction( IndexingPressure indexingPressure, SystemIndices systemIndices, ProjectResolver projectResolver, - LongSupplier relativeTimeNanosProvider + LongSupplier relativeTimeNanosProvider, + FeatureService featureService ) { super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.threadPool = threadPool; @@ -93,6 +98,7 @@ public TransportAbstractBulkAction( this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); this.ingestForwarder = new IngestActionForwarder(transportService); + this.featureService = featureService; clusterService.addStateApplier(this.ingestForwarder); this.relativeTimeNanosProvider = relativeTimeNanosProvider; this.bulkAction = action; @@ -396,8 +402,47 @@ private void applyPipelinesAndDoInternalExecute( ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, listener) == false) { - doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); + + // Validate child stream writes before processing pipelines + ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); + BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); + + DocWriteRequest req; + int i = -1; + while (bulkRequestModifier.hasNext()) { + req = bulkRequestModifier.next(); + i++; + + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { + if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) { + IllegalArgumentException e = new IllegalArgumentException( + "Direct writes to child streams are prohibited. Index directly into the [" + + streamType.getStreamName() + + "] stream instead" + ); + Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); + + if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) { + if (Boolean.TRUE.equals(failureStoreEnabled)) { + bulkRequestModifier.markItemForFailureStore(i, req.index(), e); + } else if (Boolean.FALSE.equals(failureStoreEnabled)) { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); + } else { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); + } + } else { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); + } + + break; + } + } + } + + var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); + + if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { + doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9a79137361260..7e443e055cc90 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -86,7 +86,6 @@ public class TransportBulkAction extends TransportAbstractBulkAction { private final OriginSettingClient rolloverClient; private final FailureStoreMetrics failureStoreMetrics; private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; - private final FeatureService featureService; @Inject public TransportBulkAction( @@ -187,7 +186,8 @@ public TransportBulkAction( indexingPressure, systemIndices, projectResolver, - relativeTimeProvider + relativeTimeProvider, + featureService ); this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; Objects.requireNonNull(relativeTimeProvider); @@ -195,7 +195,6 @@ public TransportBulkAction( this.indexNameExpressionResolver = indexNameExpressionResolver; this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN); this.failureStoreMetrics = failureStoreMetrics; - this.featureService = featureService; } public static ActionListener unwrappingSingleItemBulkResponse( diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java index 7da12e05925af..338ab8f1e7b14 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexSettingProviders; @@ -102,7 +103,8 @@ public TransportSimulateBulkAction( ProjectResolver projectResolver, IndicesService indicesService, NamedXContentRegistry xContentRegistry, - IndexSettingProviders indexSettingProviders + IndexSettingProviders indexSettingProviders, + FeatureService featureService ) { super( SimulateBulkAction.INSTANCE, @@ -115,7 +117,8 @@ public TransportSimulateBulkAction( indexingPressure, systemIndices, projectResolver, - threadPool::relativeTimeInNanos + threadPool::relativeTimeInNanos, + featureService ); this.indicesService = indicesService; this.xContentRegistry = xContentRegistry; diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java new file mode 100644 index 0000000000000..f660163414801 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.streams; + +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + +public enum StreamType { + + LOGS("logs"); + + private final String streamName; + + StreamType(String streamName) { + this.streamName = streamName; + } + + public String getStreamName() { + return streamName; + } + + public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { + StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + return switch (this) { + case LOGS -> metadata.isLogsEnabled(); + }; + } + + public boolean matchesStreamPrefix(String indexName) { + if (indexName == null) { + return false; + } + return indexName.startsWith(streamName + "."); + } + + public static Set getEnabledStreamTypesForProject(ProjectMetadata projectMetadata) { + return Arrays.stream(values()) + .filter(t -> t.streamTypeIsEnabled(projectMetadata)) + .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + } + +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 1ef03325da778..ea8e9d3843296 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -57,6 +57,7 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -1282,6 +1283,29 @@ private void executePipelines( return; // document failed! } + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(project)) { + if (streamType.matchesStreamPrefix(newIndex) + && ingestDocument.getIndexHistory().contains(streamType.getStreamName()) == false) { + exceptionHandler.accept( + new IngestPipelineException( + pipelineId, + new IllegalArgumentException( + format( + "Pipeline [%s] can't change the target index (from [%s] to [%s] child stream [%s]) " + + "History: [%s]", + pipelineId, + originalIndex, + streamType.getStreamName(), + newIndex, + String.join(", ", ingestDocument.getIndexHistory()) + ) + ) + ) + ); + return; // document failed! + } + } + // add the index to the document's index history, and check for cycles in the visited indices boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; if (cycle) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java index 8f9e134b98a8e..ca275d284f7e3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.ProjectId; @@ -30,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexSettingProviders; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; @@ -79,6 +81,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase { private ClusterService clusterService; private TestThreadPool threadPool; private IndicesService indicesService; + private FeatureService mockFeatureService; private TestTransportSimulateBulkAction bulkAction; @@ -96,7 +99,8 @@ class TestTransportSimulateBulkAction extends TransportSimulateBulkAction { TestProjectResolvers.DEFAULT_PROJECT_ONLY, indicesService, NamedXContentRegistry.EMPTY, - new IndexSettingProviders(Set.of()) + new IndexSettingProviders(Set.of()), + mockFeatureService ); } } @@ -126,6 +130,8 @@ public void setUp() throws Exception { transportService.acceptIncomingRequests(); indicesService = mock(IndicesService.class); bulkAction = new TestTransportSimulateBulkAction(); + mockFeatureService = mock(FeatureService.class); + when(mockFeatureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)).thenReturn(false); } @After