Skip to content

Restrict Indexing To Child Streams When Streams Is Enabled #132011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

lukewhiting
Copy link
Contributor

This PR prevents indexing into child streams when streams mode is enabled.

In this case, we define a child stream as any index matching logs.* and restrictions apply both to direct indexing via put to index or bulk along with indirect indexing attempts via pipelines using reroute, script or other processors that change the target index or routing.

Deletes are still permitted from these child streams but updates such as _query_by_update will be prevented.

Example

Input

  • logs redirects to logs.abc.def via reroute processor on default pipeline
  • bad-index redirects to logs.abc via a script processor changing ctx._index
PUT {{host}}/_bulk
Content-Type: application/json

{ "create":{"_index": "logs" } } 
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg1.jpg HTTP/1.0\" 200 24736" }
{ "create":{"_index": "logs.abc" } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }
{ "create":{"_index": "bad-index" } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }

Output

{
  "errors": true,
  "took": 200,
  "ingest_took": 0,
  "items": [
    {
      "create": {
        "_index": "logs.abc.def",
        "_id": "wmsjUZgBpF-FKxj59Ma4",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    },
    {
      "create": {
        "_index": "logs.abc",
        "_id": "auto-generated",
        "status": 400,
        "failure_store": "not_enabled",
        "error": {
          "type": "illegal_argument_exception",
          "reason": "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead"
        }
      }
    },
    {
      "create": {
        "_index": "logs.abc",
        "_id": "auto-generated",
        "status": 400,
        "error": {
          "type": "illegal_argument_exception",
          "reason": "Pipelines can't re-route documents to child streams, but pipeline [pipeline1] tried to reroute this document from index [bad-index] to index [logs.abc]. Reroute history: bad-index"
        }
      }
    }
  ]
}

Fixes ES-11941

@elasticsearchmachine elasticsearchmachine added the Team:Data Management Meta label for data/management team label Jul 28, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @lukewhiting, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@lukewhiting
Copy link
Contributor Author

Requested review from @jbaiera as this touches the failure store

Copilot

This comment was marked as outdated.

@lukewhiting lukewhiting force-pushed the es-11941-streams-logs-bulk-transport-changes branch from 5936fd2 to 1a861cd Compare July 28, 2025 13:19
Copy link
Contributor

@szybia szybia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess don't have a lot of context on a more higher-level to approve, but code lgtm!

left a few small suggestions and Qs for learning

lukewhiting and others added 3 commits July 29, 2025 09:41
Co-authored-by: Szymon Bialkowski <szybia@tuta.io>
Co-authored-by: Szymon Bialkowski <szybia@tuta.io>
@lukewhiting lukewhiting requested a review from Copilot July 29, 2025 09:07
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements restrictions on direct indexing to child streams when streams mode is enabled, specifically preventing writes to indices matching the logs.* pattern while allowing operations through the parent logs stream.

  • Adds validation logic to prevent direct writes to child streams via bulk operations and single document indexing
  • Introduces pipeline-level validation to prevent rerouting documents to child streams through ingest processors
  • Allows delete operations on child streams while blocking create/update operations

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
StreamType.java New enum defining stream types with validation methods for enabled streams and child stream matching
TransportAbstractBulkAction.java Adds pre-pipeline validation to reject direct writes to child streams
IngestService.java Implements pipeline validation to prevent rerouting documents to child streams
BulkRequestModifier.java Refactors listener wrapping methods to support flexible ingest time calculation
BulkResponse.java Adds equals and hashCode methods for proper response comparison
TransportBulkActionIngestTests.java Updates test to use concrete BulkResponse instead of mock for equality testing
20_substream_restrictions.yml Comprehensive integration tests covering various restriction scenarios
StreamsYamlTestSuiteIT.java Adds required modules for integration testing
build.gradle Includes additional REST API endpoints and test dependencies

Copy link
Contributor

@szybia szybia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

but someone more experienced should have the final approve 🚀

@elasticsearchmachine
Copy link
Collaborator

Hi @lukewhiting, I've created a changelog YAML for you.

@@ -1238,6 +1239,28 @@ private void executePipelines(
return; // document failed!
}

for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(project)) {
if (streamType.matchesStreamPrefix(newIndex)
&& ingestDocument.getIndexHistory().contains(streamType.getStreamName()) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would let me through if I rerouted from say logs to logs.abc.def. That is, it allows me to write to any descendent stream, not just a direct child stream. I assume that's OK, right? The real goal is to prevent things from outside of the stream writing to child streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct but yeah, I don't think it's in scope to enforce the hierarchy in ES. At least not at this stage.

Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a couple of comments, but LGTM.

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. There's a couple easy to miss things that might need to be addressed for failure store, and I left a few small questions and suggestions but otherwise it's looking good. Marking as approved for once the important things are addressed.

BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);

for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're iterating using the locally available bulk request, but you're dereferencing the bulk request from the request modifier to get the documents. This assumes that the request modifier will never make any changes to its internal state. I think we can avoid that kind of snag if we iterate over the request items and check each stream type per document instead of iterating over the request items multiple times for each eventual stream type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked around where we use bulk request modifier in other places and it's actually an iterable itself. In the IngestService we just iterate over it like a regular iterable, and maintain a slot counter separately. Might read more clearly if we do that here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have switched this to use the bulk modifier as an iterator and moved the stream types to be the inner iterator so best of both worlds :-)

+ streamType.getStreamName()
+ "] stream instead"
);
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing that may need to be checked here is whether or not the failure store feature is present on every node. We check that in IngestService.wrapResolverWithFeatureCheck and in TransportBulkAction.executeBulk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what you mean here but I think this isn't needed as at this level we are just marking stuff for failure store and anything marked at this stage will still go through those checks in TransportBulkAction?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those checks later on are only made when deciding if a failed document should be sent to the failure store. Once a document has been marked for failure store (like what we're doing here) we don't actually run the check anymore.

I'm not 100% sure what you mean here

I would check in to those linked methods to see how they make use of the feature service to ensure every node in the cluster knows what a failure store is before trying to mark a document to be sent to it.

if (Boolean.TRUE.equals(failureStoreEnabled)) {
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
} else {
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the failure store status of NOT_ENABLED always correct here? a null value in the failureStoreEnabled variable means the document "doesn't correspond to a data stream" in which case the status should be NA. I recognize that streams might make that impossible, so if that is the case, let's add an assert statement here to make sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can actually make a definitive determination here... For now I have switched it to "Unknown" which I think is a better option?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can actually make a definitive determination here.

Looking at the doc for resolveFailureStore method

return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store, or if it matches a template that has a data stream failure store enabled, or if it matches a data stream template with no failure store option specified and the name matches the cluster setting to enable the failure store. Returns false if the index name corresponds to a data stream, but it doesn't have the failure store enabled by one of those conditions. Returns null when it doesn't correspond to a data stream.

Based on that I think the logic should be:

if (feature enabled on all nodes):
    if (resolveFailureStore returned true):
        mark for failure store
    else if (resolveFailureStore returned false):
        fail document - mark as NOT_ENABLED status
    else if (resolveFailureStore returned null):
        fail document - mark as NA status
else:
    fail document - mark as NA status

response.getTook().getMillis(),
ingestTookInMillis,
response.getTookInMillis(),
ingestTimeProviderFunction.apply(response),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: And this comment is easily something we can ignore for the sake of readability, but if we use the variant where the time provider function just passes along the original time, we're wrapping the listener just to reconstruct the same response effectively. I wonder if there's a way we could refactor this to avoid that. Though, if it's messy maybe we just move on with our lives without doing so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I have refactored this to have short circuit logic in each of the overloaded methods. This means we quick return if no items are modified.

@@ -166,4 +168,19 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.startArray(ITEMS);
}), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject()));
}

@Override
public boolean equals(Object o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we adding this for completeness sake or are we using it somewhere? Checking bulk response equality seems like something that could be unintentionally expensive for a large or complicated request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh this was needed to fix a unit test which relied on a sameInstance assertion which became invalid after we started wrapping everything at the higher level. However it's now no longer required with the short circuit wrapping logic added here: #132011 (comment) so have reverted the change as it goes back to being the same instance if wrapped with no modifications.

Comment on lines 1250 to 1251
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute "
+ "this document from index [%s] to index [%s]. Reroute history: %s",
Copy link
Member

@jbaiera jbaiera Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the term "reroute" - That reads to me to mean using the reroute processor, but I think you can get to here via any method of changing the index name. Also maybe we should elaborate on what we mean by child stream.

A rough suggestion like

Suggested change
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute "
+ "this document from index [%s] to index [%s]. Reroute history: %s",
"pipeline [%s] can't change the target index (from [%s] to [%s] child stream [%s]) for document [%s]. History: [%s]",

Or along those lines? e.g. (from [my-index-name] to [logs] child stream [logs.nginx.prod])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the message although omitted the document as this error is rendered in line with the document ID / slot.

pipelineId,
originalIndex,
newIndex,
String.join(" -> ", ingestDocument.getIndexHistory())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I like the style of the arrow separator, but I think a comma separated list is more aligned with our log message style and perhaps a tad easier to parse if ever needed.

Suggested change
String.join(" -> ", ingestDocument.getIndexHistory())
String.join(", ", ingestDocument.getIndexHistory())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched. On a personal level I much prefer the -> for readability but you're right, consistency is more important here :-)

exceptionHandler.accept(
new IngestPipelineException(
pipelineId,
new IllegalArgumentException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other exceptions like this are all IllegalStateException, should we follow suit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so... IllegalStateException would cause a 500 code which isn't reflective of the situation in this case. The user has made an error that is correctable (Index into the parent not the child) so I think IllegalArgumentException and the 400 code it returns is a better option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Data streams Data streams and their lifecycles >enhancement Team:Data Management Meta label for data/management team v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants