Skip to content

Restrict Indexing To Child Streams When Streams Is Enabled #132011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c0f1a12
Update BulkRequestModifier to allow wrapping multiple times while pre…
lukewhiting Jul 28, 2025
f8fa32b
Modify BulkResponse to have an equals method and update ingest test's…
lukewhiting Jul 28, 2025
34082ef
Add new StreamType enum along with logic to check if that stream type…
lukewhiting Jul 28, 2025
2457383
Modify IngestService to prevent documents being re-routed into child …
lukewhiting Jul 28, 2025
a2eab2a
Modify TransportAbstractBulkAction to prevent indexing into child str…
lukewhiting Jul 28, 2025
1a861cd
Additional tests for new indexing restrictions
lukewhiting Jul 28, 2025
5faf7fb
Merge branch 'main' into es-11941-streams-logs-bulk-transport-changes
lukewhiting Jul 28, 2025
1c4225b
Apply suggestion from @szybia
lukewhiting Jul 29, 2025
fbfd61b
Apply suggestions from code review
lukewhiting Jul 29, 2025
78cf0ef
Additional PR changes and cleanup
lukewhiting Jul 29, 2025
5e1d615
Additional PR changes to improve performance and readability further
lukewhiting Jul 29, 2025
387b4e3
Update docs/changelog/132011.yaml
lukewhiting Jul 29, 2025
d54b7b9
Added additional documentation on bulk modifier wrap methods
lukewhiting Jul 31, 2025
e5581aa
Merge remote-tracking branch 'origin/es-11941-streams-logs-bulk-trans…
lukewhiting Jul 31, 2025
a7e6f7a
Merge branch 'main' into es-11941-streams-logs-bulk-transport-changes
lukewhiting Jul 31, 2025
3569947
PR Changes
lukewhiting Aug 1, 2025
34e944e
Use of failure store is now wrapped in cluster feature check
lukewhiting Aug 12, 2025
3e82b35
Merge branch 'main' of github.com:elastic/elasticsearch into es-11941…
lukewhiting Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132011.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132011
summary: Restrict Indexing To Child Streams When Streams Is Enabled
area: Data streams
type: enhancement
issues: []
4 changes: 3 additions & 1 deletion modules/streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ esplugin {

restResources {
restApi {
include '_common', 'streams'
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
}
}

Expand All @@ -38,4 +38,6 @@ artifacts {

dependencies {
testImplementation project(path: ':test:test-clusters')
clusterModules project(':modules:ingest-common')
clusterModules project(':modules:reindex')
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public static Iterable<Object[]> parameters() throws Exception {
}

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("streams")
.module("ingest-common")
.module("reindex")
.feature(FeatureFlag.LOGS_STREAM)
.build();

@Override
protected String getTestRestCluster() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
---
"Check User Can't Write To Substream Directly":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
bulk:
body: |
{ "index": { "_index": "logs.foo" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }

---
"Check User Can't Write To Substream Directly With Single Doc":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
catch: bad_request
index:
index: logs.foo
id: "1"
body:
foo: bar
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }

---
"Check Bulk Index With Reroute Processor To Substream Is Rejected":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "reroute-to-logs-foo"
body:
processors:
- reroute:
destination: "logs.foo"
- do:
indices.create:
index: "bad-index"
body:
settings:
index.default_pipeline: "reroute-to-logs-foo"
- do:
bulk:
body: |
{ "index": { "_index": "bad-index" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" }

---
"Check Bulk Index With Script Processor To Substream Is Rejected":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "script-to-logs-foo"
body:
processors:
- script:
source: "ctx._index = 'logs.foo'"
- do:
indices.create:
index: "bad-index-script"
body:
settings:
index.default_pipeline: "script-to-logs-foo"
- do:
bulk:
body: |
{ "index": { "_index": "bad-index-script" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" }

---
"Check Delete By Query Directly On Substream After Reroute Succeeds":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "reroute-to-logs-foo-success"
body:
processors:
- reroute:
destination: "logs.foo"
- do:
indices.create:
index: "logs"
body:
settings:
index.default_pipeline: "reroute-to-logs-foo-success"
- do:
bulk:
refresh: true
body: |
{ "index": { "_index": "logs" } }
{ "foo": "bar", "baz": "qux" }
- match: { errors: false }
- match: { items.0.index.status: 201 }

- do:
delete_by_query:
index: logs.foo
refresh: true
body:
query:
match:
foo: "bar"
- match: { deleted: 1 }
- match: { total: 1 }

- do:
search:
index: logs.foo
body:
query:
match_all: {}
- match: { hits.total.value: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -105,18 +106,52 @@ BulkRequest getBulkRequest() {
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
* <br>
* Use this method when you want the ingest time to be taken from the actual {@link BulkResponse} such as if you are wrapping
* a response multiple times and wish to preserve an already calculated ingest time.
*
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
* @param actionListener the listener to wrap
* @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener);
}

/**
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
* <br>
* This variant is used when the ingest time is already known and should be explicitly set in the final response,
* rather than extracted from the {@link BulkResponse}.
*
* @param ingestTookInMillis the ingest time in milliseconds to use in the final response
* @param actionListener the listener to wrap
* @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener);
}

/**
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
*
* @param ingestTimeProviderFunction A function to provide the ingest time taken for this response
* @param actionListener The action listener that expects the final bulk response.
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
private ActionListener<BulkResponse> doWrapActionListenerIfNeeded(
Function<BulkResponse, Long> ingestTimeProviderFunction,
ActionListener<BulkResponse> actionListener
) {
if (itemResponses.isEmpty()) {
return actionListener.map(
response -> new BulkResponse(
response.getItems(),
response.getTook().getMillis(),
ingestTookInMillis,
response.getTookInMillis(),
ingestTimeProviderFunction.apply(response),
response.getIncrementalState()
)
);
Expand All @@ -143,6 +178,8 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
assertResponsesAreCorrect(bulkResponses, allResponses);
}

var ingestTookInMillis = ingestTimeProviderFunction.apply(response);

return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;

/**
* A response of a bulk execution. Holding a response for each item responding (in order) of the
Expand Down Expand Up @@ -166,4 +168,19 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.startArray(ITEMS);
}), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject()));
}

@Override
public boolean equals(Object o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we adding this for completeness sake or are we using it somewhere? Checking bulk response equality seems like something that could be unintentionally expensive for a large or complicated request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh this was needed to fix a unit test which relied on a sameInstance assertion which became invalid after we started wrapping everything at the higher level. However it's now no longer required with the short circuit wrapping logic added here: #132011 (comment) so have reverted the change as it goes back to being the same instance if wrapped with no modifications.

return o == this
|| (o instanceof BulkResponse that
&& tookInMillis == that.tookInMillis
&& ingestTookInMillis == that.ingestTookInMillis
&& Arrays.equals(responses, that.responses)
&& Objects.equals(incrementalState, that.incrementalState));
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.streams.StreamType;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -396,8 +397,35 @@ private void applyPipelinesAndDoInternalExecute(
ActionListener<BulkResponse> listener
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);

// Validate child stream writes before processing pipelines
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);

for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're iterating using the locally available bulk request, but you're dereferencing the bulk request from the request modifier to get the documents. This assumes that the request modifier will never make any changes to its internal state. I think we can avoid that kind of snag if we iterate over the request items and check each stream type per document instead of iterating over the request items multiple times for each eventual stream type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked around where we use bulk request modifier in other places and it's actually an iterable itself. In the IngestService we just iterate over it like a regular iterable, and maintain a slot counter separately. Might read more clearly if we do that here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have switched this to use the bulk modifier as an iterator and moved the stream types to be the inner iterator so best of both worlds :-)

DocWriteRequest<?> req = bulkRequestModifier.bulkRequest.requests.get(i);

if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) {
IllegalArgumentException e = new IllegalArgumentException(
"Direct writes to child streams are prohibited. Index directly into the ["
+ streamType.getStreamName()
+ "] stream instead"
);
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
if (Boolean.TRUE.equals(failureStoreEnabled)) {
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
} else {
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);
}
}
}
}

var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);

if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.streams;

import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.StreamsMetadata;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Collectors;

public enum StreamType {

LOGS("logs");

private final String streamName;

StreamType(String streamName) {
this.streamName = streamName;
}

public String getStreamName() {
return streamName;
}

public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) {
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
return switch (this) {
case LOGS -> metadata.isLogsEnabled();
};
}

public boolean matchesStreamPrefix(String indexName) {
if (indexName == null) {
return false;
}
return indexName.startsWith(streamName + ".");
}

public static Set<StreamType> getEnabledStreamTypesForProject(ProjectMetadata projectMetadata) {
return Arrays.stream(values())
.filter(t -> t.streamTypeIsEnabled(projectMetadata))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
}

}
Loading