-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Restrict Indexing To Child Streams When Streams Is Enabled #132011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Restrict Indexing To Child Streams When Streams Is Enabled #132011
Conversation
Hi @lukewhiting, I've created a changelog YAML for you. |
Pinging @elastic/es-data-management (Team:Data Management) |
Requested review from @jbaiera as this touches the failure store |
…serving ingest time taken
… to not depend on same instance assertions This prevents issues when wrapping responses during ingest
… is enabled in the cluster
5936fd2
to
1a861cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess don't have a lot of context on a more higher-level to approve, but code lgtm!
left a few small suggestions and Qs for learning
...ams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Szymon Bialkowski <szybia@tuta.io>
Co-authored-by: Szymon Bialkowski <szybia@tuta.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements restrictions on direct indexing to child streams when streams mode is enabled, specifically preventing writes to indices matching the logs.*
pattern while allowing operations through the parent logs
stream.
- Adds validation logic to prevent direct writes to child streams via bulk operations and single document indexing
- Introduces pipeline-level validation to prevent rerouting documents to child streams through ingest processors
- Allows delete operations on child streams while blocking create/update operations
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
StreamType.java |
New enum defining stream types with validation methods for enabled streams and child stream matching |
TransportAbstractBulkAction.java |
Adds pre-pipeline validation to reject direct writes to child streams |
IngestService.java |
Implements pipeline validation to prevent rerouting documents to child streams |
BulkRequestModifier.java |
Refactors listener wrapping methods to support flexible ingest time calculation |
BulkResponse.java |
Adds equals and hashCode methods for proper response comparison |
TransportBulkActionIngestTests.java |
Updates test to use concrete BulkResponse instead of mock for equality testing |
20_substream_restrictions.yml |
Comprehensive integration tests covering various restriction scenarios |
StreamsYamlTestSuiteIT.java |
Adds required modules for integration testing |
build.gradle |
Includes additional REST API endpoints and test dependencies |
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
but someone more experienced should have the final approve 🚀
Hi @lukewhiting, I've created a changelog YAML for you. |
@@ -1238,6 +1239,28 @@ private void executePipelines( | |||
return; // document failed! | |||
} | |||
|
|||
for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(project)) { | |||
if (streamType.matchesStreamPrefix(newIndex) | |||
&& ingestDocument.getIndexHistory().contains(streamType.getStreamName()) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would let me through if I rerouted from say logs
to logs.abc.def
. That is, it allows me to write to any descendent stream, not just a direct child stream. I assume that's OK, right? The real goal is to prevent things from outside of the stream writing to child streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct but yeah, I don't think it's in scope to enforce the hierarchy in ES. At least not at this stage.
server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of comments, but LGTM.
…port-changes' into es-11941-streams-logs-bulk-transport-changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. There's a couple easy to miss things that might need to be addressed for failure store, and I left a few small questions and suggestions but otherwise it's looking good. Marking as approved for once the important things are addressed.
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); | ||
|
||
for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { | ||
for (int i = 0; i < bulkRequest.requests.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're iterating using the locally available bulk request, but you're dereferencing the bulk request from the request modifier to get the documents. This assumes that the request modifier will never make any changes to its internal state. I think we can avoid that kind of snag if we iterate over the request items and check each stream type per document instead of iterating over the request items multiple times for each eventual stream type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked around where we use bulk request modifier in other places and it's actually an iterable itself. In the IngestService we just iterate over it like a regular iterable, and maintain a slot counter separately. Might read more clearly if we do that here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have switched this to use the bulk modifier as an iterator and moved the stream types to be the inner iterator so best of both worlds :-)
+ streamType.getStreamName() | ||
+ "] stream instead" | ||
); | ||
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing that may need to be checked here is whether or not the failure store feature is present on every node. We check that in IngestService.wrapResolverWithFeatureCheck
and in TransportBulkAction.executeBulk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure what you mean here but I think this isn't needed as at this level we are just marking stuff for failure store and anything marked at this stage will still go through those checks in TransportBulkAction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those checks later on are only made when deciding if a failed document should be sent to the failure store. Once a document has been marked for failure store (like what we're doing here) we don't actually run the check anymore.
I'm not 100% sure what you mean here
I would check in to those linked methods to see how they make use of the feature service to ensure every node in the cluster knows what a failure store is before trying to mark a document to be sent to it.
if (Boolean.TRUE.equals(failureStoreEnabled)) { | ||
bulkRequestModifier.markItemForFailureStore(i, req.index(), e); | ||
} else { | ||
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the failure store status of NOT_ENABLED always correct here? a null value in the failureStoreEnabled variable means the document "doesn't correspond to a data stream" in which case the status should be NA. I recognize that streams might make that impossible, so if that is the case, let's add an assert statement here to make sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can actually make a definitive determination here... For now I have switched it to "Unknown" which I think is a better option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can actually make a definitive determination here.
Looking at the doc for resolveFailureStore
method
return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store, or if it matches a template that has a data stream failure store enabled, or if it matches a data stream template with no failure store option specified and the name matches the cluster setting to enable the failure store. Returns false if the index name corresponds to a data stream, but it doesn't have the failure store enabled by one of those conditions. Returns null when it doesn't correspond to a data stream.
Based on that I think the logic should be:
if (feature enabled on all nodes):
if (resolveFailureStore returned true):
mark for failure store
else if (resolveFailureStore returned false):
fail document - mark as NOT_ENABLED status
else if (resolveFailureStore returned null):
fail document - mark as NA status
else:
fail document - mark as NA status
response.getTook().getMillis(), | ||
ingestTookInMillis, | ||
response.getTookInMillis(), | ||
ingestTimeProviderFunction.apply(response), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: And this comment is easily something we can ignore for the sake of readability, but if we use the variant where the time provider function just passes along the original time, we're wrapping the listener just to reconstruct the same response effectively. I wonder if there's a way we could refactor this to avoid that. Though, if it's messy maybe we just move on with our lives without doing so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I have refactored this to have short circuit logic in each of the overloaded methods. This means we quick return if no items are modified.
@@ -166,4 +168,19 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params | |||
return builder.startArray(ITEMS); | |||
}), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject())); | |||
} | |||
|
|||
@Override | |||
public boolean equals(Object o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we adding this for completeness sake or are we using it somewhere? Checking bulk response equality seems like something that could be unintentionally expensive for a large or complicated request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhh this was needed to fix a unit test which relied on a sameInstance
assertion which became invalid after we started wrapping everything at the higher level. However it's now no longer required with the short circuit wrapping logic added here: #132011 (comment) so have reverted the change as it goes back to being the same instance if wrapped with no modifications.
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " | ||
+ "this document from index [%s] to index [%s]. Reroute history: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the term "reroute" - That reads to me to mean using the reroute processor, but I think you can get to here via any method of changing the index name. Also maybe we should elaborate on what we mean by child stream.
A rough suggestion like
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " | |
+ "this document from index [%s] to index [%s]. Reroute history: %s", | |
"pipeline [%s] can't change the target index (from [%s] to [%s] child stream [%s]) for document [%s]. History: [%s]", |
Or along those lines? e.g. (from [my-index-name] to [logs] child stream [logs.nginx.prod])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the message although omitted the document as this error is rendered in line with the document ID / slot.
pipelineId, | ||
originalIndex, | ||
newIndex, | ||
String.join(" -> ", ingestDocument.getIndexHistory()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I like the style of the arrow separator, but I think a comma separated list is more aligned with our log message style and perhaps a tad easier to parse if ever needed.
String.join(" -> ", ingestDocument.getIndexHistory()) | |
String.join(", ", ingestDocument.getIndexHistory()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched. On a personal level I much prefer the -> for readability but you're right, consistency is more important here :-)
exceptionHandler.accept( | ||
new IngestPipelineException( | ||
pipelineId, | ||
new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other exceptions like this are all IllegalStateException, should we follow suit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so... IllegalStateException
would cause a 500 code which isn't reflective of the situation in this case. The user has made an error that is correctable (Index into the parent not the child) so I think IllegalArgumentException
and the 400 code it returns is a better option.
This PR prevents indexing into child streams when streams mode is enabled.
In this case, we define a child stream as any index matching
logs.*
and restrictions apply both to direct indexing via put to index or bulk along with indirect indexing attempts via pipelines using reroute, script or other processors that change the target index or routing.Deletes are still permitted from these child streams but updates such as
_query_by_update
will be prevented.Example
Input
logs
redirects tologs.abc.def
via reroute processor on default pipelinebad-index
redirects tologs.abc
via a script processor changingctx._index
Output
Fixes ES-11941