-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[Streams] Propagate the ingest pipeline access pattern flag to the ingest document #130488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Streams] Propagate the ingest pipeline access pattern flag to the ingest document #130488
Conversation
…ent when executing
Pinging @elastic/es-data-management (Team:Data Management) |
/** | ||
* @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc | ||
*/ | ||
public IngestPipelineFieldAccessPattern getCurrentAccessPattern() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional that this method isn't used above (and seems to only be used in the test)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used above yet. I'm currently using it in a couple places in the next set of changes I'm working on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces tracking of each pipeline’s access pattern within nested pipeline executions by maintaining a stack on the ingest document. Key changes include:
- Add
accessPatternStack
field and related push/pop logic inIngestDocument
- Update
Pipeline
class to allow mocking by removingfinal
- Introduce a comprehensive nested-access-pattern test in
IngestDocumentTests
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java | Added accessPatternStack , push/pop logic, and getCurrentAccessPattern() |
server/src/main/java/org/elasticsearch/ingest/Pipeline.java | Removed final modifier from Pipeline for testability |
server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java | Added testNestedAccessPatternPropagation with Mockito captures |
Comments suppressed due to low confidence (2)
server/src/main/java/org/elasticsearch/ingest/Pipeline.java:28
- [nitpick] Removing
final
fromPipeline
makes it extendable, which may not be intended for core pipeline behavior; verify that this aligns with the project's API design.
public class Pipeline {
server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java:1273
- The test uses
is(empty())
butis
is not statically imported; addimport static org.hamcrest.Matchers.is;
to avoid a compilation error.
assertThat(document.getPipelineStack(), is(empty()));
assert previousAccessPattern == accessPatternStack.peek() | ||
: "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected [" | ||
+ previousAccessPattern | ||
+ "] but found [" | ||
+ accessPatternStack.peek() | ||
+ "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relying on a Java assert
means this check is skipped unless assertions are enabled; consider throwing an explicit exception or using a proper validation mechanism to ensure consistency in all environments.
assert previousAccessPattern == accessPatternStack.peek() | |
: "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected [" | |
+ previousAccessPattern | |
+ "] but found [" | |
+ accessPatternStack.peek() | |
+ "]"; | |
if (previousAccessPattern != accessPatternStack.peek()) { | |
throw new IllegalStateException( | |
"Cleared access pattern from nested pipeline and found inconsistent stack state. Expected [" | |
+ previousAccessPattern | |
+ "] but found [" | |
+ accessPatternStack.peek() | |
+ "]" | |
); | |
} |
Copilot uses AI. Check for mistakes.
pipeline.execute(this, (result, e) -> { | ||
executedPipelines.remove(pipeline.getId()); | ||
accessPatternStack.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using poll()
ignores the returned element and silently handles an empty stack; consider using pop()
or removeFirst()
for clearer intent and to fail-fast on unexpected underflow.
accessPatternStack.poll(); | |
accessPatternStack.pop(); |
Copilot uses AI. Check for mistakes.
* involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more | ||
* complicated, we're just copying this over here since it does no harm. | ||
* The executedPipelines and accessPatternStack fields are clearly execution-centric rather than data centric. | ||
* Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a typo in the Javadoc: THe
should be The
.
* Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from | |
* Despite what the comment above says, we're copying it here anyway. The reason is that this constructor is only called from |
Copilot uses AI. Check for mistakes.
Adds the ability for an ingest document to capture the pipeline's access pattern flag when executing the pipeline on the document. The document stores the access pattern flag in a stack to allow for holding and restoring the access pattern flag when running nested pipelines.