From c0f1a125ef976fa8ff9db1955c929830baa614ed Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 13:59:14 +0100 Subject: [PATCH 01/14] Update BulkRequestModifier to allow wrapping multiple times while preserving ingest time taken --- .../action/bulk/BulkRequestModifier.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index dda5294b2c0a2..76a0bc36469a3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -101,22 +102,33 @@ BulkRequest getBulkRequest() { } } + ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { + return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener); + } + + ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { + return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener); + } + /** * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest * service with the results returned from running the remaining write operations. * - * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result. + * @param ingestTimeProviderFunction A function to provide the ingest time taken for this response * @param actionListener The action listener that expects the final bulk response. * @return An action listener that combines ingest failure results with the results from writing the remaining documents. */ - ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { + private ActionListener doWrapActionListenerIfNeeded( + Function ingestTimeProviderFunction, + ActionListener actionListener + ) { if (itemResponses.isEmpty()) { return actionListener.map( response -> new BulkResponse( response.getItems(), - response.getTook().getMillis(), - ingestTookInMillis, + response.getTookInMillis(), + ingestTimeProviderFunction.apply(response), response.getIncrementalState() ) ); @@ -143,6 +155,8 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, assertResponsesAreCorrect(bulkResponses, allResponses); } + var ingestTookInMillis = ingestTimeProviderFunction.apply(response); + return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState()); }); } From f8fa32bd4342e23c02d59fb1cf5f58bc945d07e0 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 14:00:29 +0100 Subject: [PATCH 02/14] Modify BulkResponse to have an equals method and update ingest test's to not depend on same instance assertions This prevents issues when wrapping responses during ingest --- .../action/bulk/BulkResponse.java | 19 +++++++++++++++++++ .../bulk/TransportBulkActionIngestTests.java | 5 +++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index 567b433d94daf..a3d11e302694b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -19,7 +19,9 @@ import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; +import java.util.Objects; /** * A response of a bulk execution. Holding a response for each item responding (in order) of the @@ -166,4 +168,21 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.startArray(ITEMS); }), Iterators.forArray(responses), Iterators.single((builder, p) -> builder.endArray().endObject())); } + + @Override + public boolean equals(Object o) { + if (o instanceof BulkResponse that) { + return tookInMillis == that.tookInMillis + && ingestTookInMillis == that.ingestTookInMillis + && Arrays.equals(responses, that.responses) + && Objects.equals(incrementalState, that.incrementalState); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState); + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index a54cd08c3738a..ce9b4628e99dc 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -80,6 +80,7 @@ import java.util.function.Function; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -544,11 +545,11 @@ public void testIngestForward() throws Exception { indexRequest.source(Collections.emptyMap()); indexRequest.setPipeline("testpipeline"); bulkRequest.add(indexRequest); - BulkResponse bulkResponse = mock(BulkResponse.class); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[0], 1234); AtomicBoolean responseCalled = new AtomicBoolean(false); ActionListener listener = ActionTestUtils.assertNoFailureListener(response -> { responseCalled.set(true); - assertSame(bulkResponse, response); + assertThat(response, equalTo(bulkResponse)); }); ActionTestUtils.execute(action, null, bulkRequest, listener); From 34082ef8ef1b089e6bd74542c3eea5168c9353a0 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 14:03:01 +0100 Subject: [PATCH 03/14] Add new StreamType enum along with logic to check if that stream type is enabled in the cluster --- .../common/streams/StreamType.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/common/streams/StreamType.java diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java new file mode 100644 index 0000000000000..2bfe3596cf281 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.streams; + +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; + +public enum StreamType { + + LOGS("logs"); + + private final String streamName; + + StreamType(String streamName) { + this.streamName = streamName; + } + + public String getStreamName() { + return streamName; + } + + public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { + StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + return switch (this) { + case LOGS -> metadata.isLogsEnabled(); + }; + } + +} From 245738367e38f82be9cb6563b56f906fbb48707b Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 14:01:23 +0100 Subject: [PATCH 04/14] Modify IngestService to prevent documents being re-routed into child streams --- .../elasticsearch/ingest/IngestService.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 47ecdaf904801..4ed01da5b1b15 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -54,6 +54,7 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -1238,6 +1239,30 @@ private void executePipelines( return; // document failed! } + for (StreamType streamType : StreamType.values()) { + if (streamType.streamTypeIsEnabled(project)) { + if (newIndex.startsWith(streamType.getStreamName() + ".") + && ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) { + exceptionHandler.accept( + new IngestPipelineException( + pipelineId, + new IllegalArgumentException( + format( + "Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " + + "this document from index [%s] to index [%s]. Reroute history: %s", + pipelineId, + originalIndex, + newIndex, + String.join(" -> ", ingestDocument.getIndexHistory()) + ) + ) + ) + ); + return; // document failed! + } + } + } + // add the index to the document's index history, and check for cycles in the visited indices boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; if (cycle) { From a2eab2a10ea38620686c98128b9aef80463be5da Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 14:01:51 +0100 Subject: [PATCH 05/14] Modify TransportAbstractBulkAction to prevent indexing into child streams --- .../bulk/TransportAbstractBulkAction.java | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index f003cd3fc107d..ccbd7079fe821 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; @@ -44,12 +45,16 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable @@ -396,8 +401,40 @@ private void applyPipelinesAndDoInternalExecute( ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, listener) == false) { - doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); + + // Validate child stream writes before processing pipelines + ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); + Set enabledStreamTypes = Arrays.stream(StreamType.values()) + .filter(t -> t.streamTypeIsEnabled(projectMetadata)) + .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + + BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); + + for (StreamType streamType : enabledStreamTypes) { + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest req = bulkRequestModifier.bulkRequest.requests.get(i); + String prefix = streamType.getStreamName() + "."; + + if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) { + IllegalArgumentException e = new IllegalArgumentException( + "Direct writes to child streams are prohibited. Index directly into the [" + + streamType.getStreamName() + + "] stream instead" + ); + Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); + if (failureStore != null && failureStore) { + bulkRequestModifier.markItemForFailureStore(i, req.index(), e); + } else { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); + } + } + } + } + + var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); + + if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { + doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); } } From 1a861cd47e23408a97b6160f31084527348fd509 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 28 Jul 2025 14:02:33 +0100 Subject: [PATCH 06/14] Additional tests for new indexing restrictions --- modules/streams/build.gradle | 4 +- .../streams/StreamsYamlTestSuiteIT.java | 7 +- .../logs/20_substream_restrictions.yml | 156 ++++++++++++++++++ 3 files changed, 165 insertions(+), 2 deletions(-) create mode 100644 modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index fd56a627026b6..24b651da291af 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams' + include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search" } } @@ -38,4 +38,6 @@ artifacts { dependencies { testImplementation project(path: ':test:test-clusters') + clusterModules project(':modules:ingest-common') + clusterModules project(':modules:reindex') } diff --git a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java index 9d5a1033faf57..e01594fe51c76 100644 --- a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java +++ b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java @@ -29,7 +29,12 @@ public static Iterable parameters() throws Exception { } @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build(); + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("streams") + .module("ingest-common") + .module("reindex") + .feature(FeatureFlag.LOGS_STREAM) + .build(); @Override protected String getTestRestCluster() { diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml new file mode 100644 index 0000000000000..7fb76496fab4c --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -0,0 +1,156 @@ +--- +"Check User Can't Write To Substream Directly": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + body: | + { "index": { "_index": "logs.foo" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } + +--- +"Check User Can't Write To Substream Directly With Single Doc": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + catch: bad_request + index: + index: logs.foo + id: "1" + body: + foo: bar + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } + +--- +"Check Bulk Index With Reroute Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "bad-index" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" } + +--- +"Check Bulk Index With Script Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "script-to-logs-foo" + body: + processors: + - script: + source: "ctx._index = 'logs.foo'" + - do: + indices.create: + index: "bad-index-script" + body: + settings: + index.default_pipeline: "script-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index-script" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" } + +--- +"Check Delete By Query Directly On Substream After Reroute Succeeds": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo-success" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "logs" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo-success" + - do: + bulk: + refresh: true + body: | + { "index": { "_index": "logs" } } + { "foo": "bar", "baz": "qux" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + + - do: + delete_by_query: + index: logs.foo + refresh: true + body: + query: + match: + foo: "bar" + - match: { deleted: 1 } + - match: { total: 1 } + + - do: + search: + index: logs.foo + body: + query: + match_all: {} + - match: { hits.total.value: 0 } From 1c4225bba4b2a45060663e1f02cccca1bbcd495e Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 29 Jul 2025 09:41:41 +0100 Subject: [PATCH 07/14] Apply suggestion from @szybia Co-authored-by: Szymon Bialkowski --- .../elasticsearch/action/bulk/BulkResponse.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index a3d11e302694b..ce09c42ca2d3a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -171,14 +171,12 @@ public Iterator toXContentChunked(ToXContent.Params params @Override public boolean equals(Object o) { - if (o instanceof BulkResponse that) { - return tookInMillis == that.tookInMillis - && ingestTookInMillis == that.ingestTookInMillis - && Arrays.equals(responses, that.responses) - && Objects.equals(incrementalState, that.incrementalState); - } else { - return false; - } + return o == this || (o instanceof BulkResponse that + && tookInMillis == that.tookInMillis + && ingestTookInMillis == that.ingestTookInMillis + && Arrays.equals(responses, that.responses) + && Objects.equals(incrementalState, that.incrementalState)); +} } @Override From fbfd61bd24b2905f422f14764e33a581d2fcaa8e Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 29 Jul 2025 09:46:47 +0100 Subject: [PATCH 08/14] Apply suggestions from code review Co-authored-by: Szymon Bialkowski --- .../action/bulk/TransportAbstractBulkAction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index ccbd7079fe821..d5445e266e109 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -413,7 +413,7 @@ private void applyPipelinesAndDoInternalExecute( for (StreamType streamType : enabledStreamTypes) { for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest req = bulkRequestModifier.bulkRequest.requests.get(i); - String prefix = streamType.getStreamName() + "."; + String StreamTypePrefix = streamType.getStreamName() + "."; if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) { IllegalArgumentException e = new IllegalArgumentException( @@ -421,8 +421,8 @@ private void applyPipelinesAndDoInternalExecute( + streamType.getStreamName() + "] stream instead" ); - Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); - if (failureStore != null && failureStore) { + Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); + if (Boolean.TRUE.equals(failureStore)) { bulkRequestModifier.markItemForFailureStore(i, req.index(), e); } else { bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); From 78cf0ef7d2e94d2d5d586e6c823d2f21eeb8d27a Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 29 Jul 2025 10:01:23 +0100 Subject: [PATCH 09/14] Additional PR changes and cleanup --- .../org/elasticsearch/action/bulk/BulkResponse.java | 12 ++++++------ .../action/bulk/TransportAbstractBulkAction.java | 5 ++--- .../org/elasticsearch/common/streams/StreamType.java | 4 ++++ .../java/org/elasticsearch/ingest/IngestService.java | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index ce09c42ca2d3a..658b365e5510c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -171,12 +171,12 @@ public Iterator toXContentChunked(ToXContent.Params params @Override public boolean equals(Object o) { - return o == this || (o instanceof BulkResponse that - && tookInMillis == that.tookInMillis - && ingestTookInMillis == that.ingestTookInMillis - && Arrays.equals(responses, that.responses) - && Objects.equals(incrementalState, that.incrementalState)); -} + return o == this + || (o instanceof BulkResponse that + && tookInMillis == that.tookInMillis + && ingestTookInMillis == that.ingestTookInMillis + && Arrays.equals(responses, that.responses) + && Objects.equals(incrementalState, that.incrementalState)); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index d5445e266e109..c216c25a2bb2d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -413,16 +413,15 @@ private void applyPipelinesAndDoInternalExecute( for (StreamType streamType : enabledStreamTypes) { for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest req = bulkRequestModifier.bulkRequest.requests.get(i); - String StreamTypePrefix = streamType.getStreamName() + "."; - if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) { + if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) { IllegalArgumentException e = new IllegalArgumentException( "Direct writes to child streams are prohibited. Index directly into the [" + streamType.getStreamName() + "] stream instead" ); Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); - if (Boolean.TRUE.equals(failureStore)) { + if (Boolean.TRUE.equals(failureStoreEnabled)) { bulkRequestModifier.markItemForFailureStore(i, req.index(), e); } else { bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java index 2bfe3596cf281..c2c92715ebadf 100644 --- a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java @@ -33,4 +33,8 @@ public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { }; } + public boolean matchesStreamPrefix(String indexName) { + return indexName.startsWith(streamName + "."); + } + } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4ed01da5b1b15..bbf30d6c6a650 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1241,7 +1241,7 @@ private void executePipelines( for (StreamType streamType : StreamType.values()) { if (streamType.streamTypeIsEnabled(project)) { - if (newIndex.startsWith(streamType.getStreamName() + ".") + if (streamType.matchesStreamPrefix(newIndex) && ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) { exceptionHandler.accept( new IngestPipelineException( From 5e1d61576862fa2665bc587d41045ff6411849a0 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 29 Jul 2025 10:25:25 +0100 Subject: [PATCH 10/14] Additional PR changes to improve performance and readability further --- .../bulk/TransportAbstractBulkAction.java | 10 +----- .../common/streams/StreamType.java | 14 ++++++++ .../elasticsearch/ingest/IngestService.java | 36 +++++++++---------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index c216c25a2bb2d..e2147249594a6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -45,16 +45,12 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Arrays; -import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; -import java.util.stream.Collectors; /** * This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable @@ -404,13 +400,9 @@ private void applyPipelinesAndDoInternalExecute( // Validate child stream writes before processing pipelines ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); - Set enabledStreamTypes = Arrays.stream(StreamType.values()) - .filter(t -> t.streamTypeIsEnabled(projectMetadata)) - .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); - BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); - for (StreamType streamType : enabledStreamTypes) { + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest req = bulkRequestModifier.bulkRequest.requests.get(i); diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java index c2c92715ebadf..f660163414801 100644 --- a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java @@ -12,6 +12,11 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.StreamsMetadata; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + public enum StreamType { LOGS("logs"); @@ -34,7 +39,16 @@ public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { } public boolean matchesStreamPrefix(String indexName) { + if (indexName == null) { + return false; + } return indexName.startsWith(streamName + "."); } + public static Set getEnabledStreamTypesForProject(ProjectMetadata projectMetadata) { + return Arrays.stream(values()) + .filter(t -> t.streamTypeIsEnabled(projectMetadata)) + .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + } + } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index bbf30d6c6a650..ff1d02943dc66 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1239,27 +1239,25 @@ private void executePipelines( return; // document failed! } - for (StreamType streamType : StreamType.values()) { - if (streamType.streamTypeIsEnabled(project)) { - if (streamType.matchesStreamPrefix(newIndex) - && ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) { - exceptionHandler.accept( - new IngestPipelineException( - pipelineId, - new IllegalArgumentException( - format( - "Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " - + "this document from index [%s] to index [%s]. Reroute history: %s", - pipelineId, - originalIndex, - newIndex, - String.join(" -> ", ingestDocument.getIndexHistory()) - ) + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(project)) { + if (streamType.matchesStreamPrefix(newIndex) + && ingestDocument.getIndexHistory().contains(streamType.getStreamName()) == false) { + exceptionHandler.accept( + new IngestPipelineException( + pipelineId, + new IllegalArgumentException( + format( + "Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " + + "this document from index [%s] to index [%s]. Reroute history: %s", + pipelineId, + originalIndex, + newIndex, + String.join(" -> ", ingestDocument.getIndexHistory()) ) ) - ); - return; // document failed! - } + ) + ); + return; // document failed! } } From 387b4e3dbdc232993d710a2d7667a89e6b85d4d2 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 29 Jul 2025 11:20:57 +0100 Subject: [PATCH 11/14] Update docs/changelog/132011.yaml --- docs/changelog/132011.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/132011.yaml diff --git a/docs/changelog/132011.yaml b/docs/changelog/132011.yaml new file mode 100644 index 0000000000000..6eed71f4ec27b --- /dev/null +++ b/docs/changelog/132011.yaml @@ -0,0 +1,5 @@ +pr: 132011 +summary: Restrict Indexing To Child Streams When Streams Is Enabled +area: Data streams +type: enhancement +issues: [] From d54b7b9d463a4ec98ddde4671c7247bc4c6e4b9f Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 31 Jul 2025 09:24:40 +0100 Subject: [PATCH 12/14] Added additional documentation on bulk modifier wrap methods --- .../action/bulk/BulkRequestModifier.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index 76a0bc36469a3..f2d5218d42519 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -102,10 +102,33 @@ BulkRequest getBulkRequest() { } } + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + *
+ * Use this method when you want the ingest time to be taken from the actual {@link BulkResponse} such as if you are wrapping + * a response multiple times and wish to preserve an already calculated ingest time. + * + * @param actionListener the listener to wrap + * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed + */ ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener); } + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + *
+ * This variant is used when the ingest time is already known and should be explicitly set in the final response, + * rather than extracted from the {@link BulkResponse}. + * + * @param ingestTookInMillis the ingest time in milliseconds to use in the final response + * @param actionListener the listener to wrap + * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed + */ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener); } From 35699472a5e2cc0060cbd3a89d2f7e8e2c519182 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Fri, 1 Aug 2025 10:09:36 +0100 Subject: [PATCH 13/14] PR Changes --- .../logs/20_substream_restrictions.yml | 4 +- .../action/bulk/BulkRequestModifier.java | 70 ++++++++++--------- .../action/bulk/BulkResponse.java | 17 ----- .../bulk/TransportAbstractBulkAction.java | 12 ++-- .../elasticsearch/ingest/IngestService.java | 7 +- .../bulk/TransportBulkActionIngestTests.java | 5 +- 6 files changed, 53 insertions(+), 62 deletions(-) diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml index 7fb76496fab4c..621985985295a 100644 --- a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -69,7 +69,7 @@ - match: { errors: true } - match: { items.0.index.status: 400 } - match: { items.0.index.error.type: "illegal_argument_exception" } - - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" } + - match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" } --- "Check Bulk Index With Script Processor To Substream Is Rejected": @@ -102,7 +102,7 @@ - match: { errors: true } - match: { items.0.index.status: 400 } - match: { items.0.index.error.type: "illegal_argument_exception" } - - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" } + - match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" } --- "Check Delete By Query Directly On Substream After Reroute Succeeds": diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index f2d5218d42519..65b71d4b0b40b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -114,7 +114,11 @@ BulkRequest getBulkRequest() { * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed */ ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { - return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener); + if (itemResponses.isEmpty()) { + return actionListener; + } else { + return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener); + } } /** @@ -130,7 +134,18 @@ ActionListener wrapActionListenerIfNeeded(ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { - return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener); + if (itemResponses.isEmpty()) { + return actionListener.map( + response -> new BulkResponse( + response.getItems(), + response.getTookInMillis(), + ingestTookInMillis, + response.getIncrementalState() + ) + ); + } else { + return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener); + } } /** @@ -146,43 +161,32 @@ private ActionListener doWrapActionListenerIfNeeded( Function ingestTimeProviderFunction, ActionListener actionListener ) { - if (itemResponses.isEmpty()) { - return actionListener.map( - response -> new BulkResponse( - response.getItems(), - response.getTookInMillis(), - ingestTimeProviderFunction.apply(response), - response.getIncrementalState() - ) - ); - } else { - return actionListener.map(response -> { - // these items are the responses from the subsequent bulk request, their 'slots' - // are not correct for this response we're building - final BulkItemResponse[] bulkResponses = response.getItems(); + return actionListener.map(response -> { + // these items are the responses from the subsequent bulk request, their 'slots' + // are not correct for this response we're building + final BulkItemResponse[] bulkResponses = response.getItems(); - final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()]; + final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()]; - // the item responses are from the original request, so their slots are correct. - // these are the responses for requests that failed early and were not passed on to the subsequent bulk. - for (BulkItemResponse item : itemResponses) { - allResponses[item.getItemId()] = item; - } + // the item responses are from the original request, so their slots are correct. + // these are the responses for requests that failed early and were not passed on to the subsequent bulk. + for (BulkItemResponse item : itemResponses) { + allResponses[item.getItemId()] = item; + } - // use the original slots for the responses from the bulk - for (int i = 0; i < bulkResponses.length; i++) { - allResponses[originalSlots.get(i)] = bulkResponses[i]; - } + // use the original slots for the responses from the bulk + for (int i = 0; i < bulkResponses.length; i++) { + allResponses[originalSlots.get(i)] = bulkResponses[i]; + } - if (Assertions.ENABLED) { - assertResponsesAreCorrect(bulkResponses, allResponses); - } + if (Assertions.ENABLED) { + assertResponsesAreCorrect(bulkResponses, allResponses); + } - var ingestTookInMillis = ingestTimeProviderFunction.apply(response); + var ingestTookInMillis = ingestTimeProviderFunction.apply(response); - return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState()); - }); - } + return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState()); + }); } private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index 658b365e5510c..567b433d94daf 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -19,9 +19,7 @@ import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; -import java.util.Arrays; import java.util.Iterator; -import java.util.Objects; /** * A response of a bulk execution. Holding a response for each item responding (in order) of the @@ -168,19 +166,4 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.startArray(ITEMS); }), Iterators.forArray(responses), Iterators.single((builder, p) -> builder.endArray().endObject())); } - - @Override - public boolean equals(Object o) { - return o == this - || (o instanceof BulkResponse that - && tookInMillis == that.tookInMillis - && ingestTookInMillis == that.ingestTookInMillis - && Arrays.equals(responses, that.responses) - && Objects.equals(incrementalState, that.incrementalState)); - } - - @Override - public int hashCode() { - return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState); - } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index e2147249594a6..9147c3e49d6b0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -402,10 +402,13 @@ private void applyPipelinesAndDoInternalExecute( ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); - for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest req = bulkRequestModifier.bulkRequest.requests.get(i); + DocWriteRequest req; + int i = -1; + while (bulkRequestModifier.hasNext()) { + req = bulkRequestModifier.next(); + i++; + for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) { IllegalArgumentException e = new IllegalArgumentException( "Direct writes to child streams are prohibited. Index directly into the [" @@ -416,8 +419,9 @@ private void applyPipelinesAndDoInternalExecute( if (Boolean.TRUE.equals(failureStoreEnabled)) { bulkRequestModifier.markItemForFailureStore(i, req.index(), e); } else { - bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); } + break; } } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ff1d02943dc66..dc363e590d143 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1247,12 +1247,13 @@ private void executePipelines( pipelineId, new IllegalArgumentException( format( - "Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " - + "this document from index [%s] to index [%s]. Reroute history: %s", + "Pipeline [%s] can't change the target index (from [%s] to [%s] child stream [%s]) " + + "History: [%s]", pipelineId, originalIndex, + streamType.getStreamName(), newIndex, - String.join(" -> ", ingestDocument.getIndexHistory()) + String.join(", ", ingestDocument.getIndexHistory()) ) ) ) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index ce9b4628e99dc..a54cd08c3738a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -80,7 +80,6 @@ import java.util.function.Function; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -545,11 +544,11 @@ public void testIngestForward() throws Exception { indexRequest.source(Collections.emptyMap()); indexRequest.setPipeline("testpipeline"); bulkRequest.add(indexRequest); - BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[0], 1234); + BulkResponse bulkResponse = mock(BulkResponse.class); AtomicBoolean responseCalled = new AtomicBoolean(false); ActionListener listener = ActionTestUtils.assertNoFailureListener(response -> { responseCalled.set(true); - assertThat(response, equalTo(bulkResponse)); + assertSame(bulkResponse, response); }); ActionTestUtils.execute(action, null, bulkRequest, listener); From 34e944ea107684b47538a78456e1c40cefb0efba Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 12 Aug 2025 14:41:32 +0100 Subject: [PATCH 14/14] Use of failure store is now wrapped in cluster feature check --- .../bulk/TransportAbstractBulkAction.java | 19 ++++++++++++++++--- .../action/bulk/TransportBulkAction.java | 5 ++--- .../bulk/TransportSimulateBulkAction.java | 7 +++++-- .../TransportSimulateBulkActionTests.java | 8 +++++++- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 9147c3e49d6b0..12a583251516a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; @@ -36,6 +37,7 @@ import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; @@ -70,6 +72,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction protected final Executor coordinationExecutor; protected final Executor systemCoordinationExecutor; private final ActionType bulkAction; + protected final FeatureService featureService; public TransportAbstractBulkAction( ActionType action, @@ -82,7 +85,8 @@ public TransportAbstractBulkAction( IndexingPressure indexingPressure, SystemIndices systemIndices, ProjectResolver projectResolver, - LongSupplier relativeTimeNanosProvider + LongSupplier relativeTimeNanosProvider, + FeatureService featureService ) { super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.threadPool = threadPool; @@ -94,6 +98,7 @@ public TransportAbstractBulkAction( this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); this.ingestForwarder = new IngestActionForwarder(transportService); + this.featureService = featureService; clusterService.addStateApplier(this.ingestForwarder); this.relativeTimeNanosProvider = relativeTimeNanosProvider; this.bulkAction = action; @@ -416,11 +421,19 @@ private void applyPipelinesAndDoInternalExecute( + "] stream instead" ); Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); - if (Boolean.TRUE.equals(failureStoreEnabled)) { - bulkRequestModifier.markItemForFailureStore(i, req.index(), e); + + if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) { + if (Boolean.TRUE.equals(failureStoreEnabled)) { + bulkRequestModifier.markItemForFailureStore(i, req.index(), e); + } else if (Boolean.FALSE.equals(failureStoreEnabled)) { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); + } else { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); + } } else { bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); } + break; } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9a79137361260..7e443e055cc90 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -86,7 +86,6 @@ public class TransportBulkAction extends TransportAbstractBulkAction { private final OriginSettingClient rolloverClient; private final FailureStoreMetrics failureStoreMetrics; private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; - private final FeatureService featureService; @Inject public TransportBulkAction( @@ -187,7 +186,8 @@ public TransportBulkAction( indexingPressure, systemIndices, projectResolver, - relativeTimeProvider + relativeTimeProvider, + featureService ); this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; Objects.requireNonNull(relativeTimeProvider); @@ -195,7 +195,6 @@ public TransportBulkAction( this.indexNameExpressionResolver = indexNameExpressionResolver; this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN); this.failureStoreMetrics = failureStoreMetrics; - this.featureService = featureService; } public static ActionListener unwrappingSingleItemBulkResponse( diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java index 85e9f2c5084de..c3de47cfc5b2a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexSettingProviders; @@ -102,7 +103,8 @@ public TransportSimulateBulkAction( ProjectResolver projectResolver, IndicesService indicesService, NamedXContentRegistry xContentRegistry, - IndexSettingProviders indexSettingProviders + IndexSettingProviders indexSettingProviders, + FeatureService featureService ) { super( SimulateBulkAction.INSTANCE, @@ -115,7 +117,8 @@ public TransportSimulateBulkAction( indexingPressure, systemIndices, projectResolver, - threadPool::relativeTimeInNanos + threadPool::relativeTimeInNanos, + featureService ); this.indicesService = indicesService; this.xContentRegistry = xContentRegistry; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java index eab44f1c56b16..78d6ddfc16b9f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.ProjectId; @@ -30,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexSettingProviders; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; @@ -79,6 +81,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase { private ClusterService clusterService; private TestThreadPool threadPool; private IndicesService indicesService; + private FeatureService mockFeatureService; private TestTransportSimulateBulkAction bulkAction; @@ -96,7 +99,8 @@ class TestTransportSimulateBulkAction extends TransportSimulateBulkAction { TestProjectResolvers.DEFAULT_PROJECT_ONLY, indicesService, NamedXContentRegistry.EMPTY, - new IndexSettingProviders(Set.of()) + new IndexSettingProviders(Set.of()), + mockFeatureService ); } } @@ -126,6 +130,8 @@ public void setUp() throws Exception { transportService.acceptIncomingRequests(); indicesService = mock(IndicesService.class); bulkAction = new TestTransportSimulateBulkAction(); + mockFeatureService = mock(FeatureService.class); + when(mockFeatureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)).thenReturn(false); } @After