-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Redirect failed ingest node operations to a failure store when available #103481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redirect failed ingest node operations to a failure store when available #103481
Conversation
Hi @jbaiera, I've created a changelog YAML for you. |
// TODO: Should this be a harder backstop than an assert statement? | ||
assert ia.isDataStreamRelated() | ||
: "Attempting to write a document to a failure store but the targeted index is not a data stream"; | ||
// Resolve write index and get parent data stream to handle the case of dealing with an alias | ||
String defaultWriteIndexName = ia.getWriteIndex().getName(); | ||
DataStream dataStream = metadata.getIndicesLookup().get(defaultWriteIndexName).getParentDataStream(); | ||
// TODO: Should this be a harder backstop than an assert statement? | ||
assert dataStream.getFailureIndices().size() > 0 | ||
: "Attempting to write a document to a failure store but the target data stream does not have one enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions: Do we want them to be stronger checks that will trigger at runtime? I can imagine things going poorly if a failure document ends up somewhere it shouldn't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I suppose this could happen if we weren't consistent with passing in the same Metadata
everywhere, and the index got removed from the metadata. That would definitely be a fun though, and since if that were the case and asserts weren't enabled we'd just fail in the next line with an index-out-of-bounds exception, I think it'd be better to make this a real check so we can at least have a useful message. What do you think?
…g failure store documents.
9b5791b
to
cccef0a
Compare
Pinging @elastic/es-data-management (Team:Data Management) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this Jimmy, I left some comments
server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
… possible. Pull the fail and store method into it for consistency sake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for adding this Jimmy! I left two minor comments about potentially adding some unit tests, but otherwise looks good.
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
* @param targetIndexName the index that the document was targeting at the time of failure. | ||
* @param e the failure encountered. | ||
*/ | ||
public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you go with an separate listener, it would really cool if captured the targetIndexName
on construction rather than requiring it be passed in here -- but I understand if that's not possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be tricky, or I'm not fully understanding the suggestion. The target index name in this case is the index that we were targeting at the start of an ingest pipeline execution. Each recursive call to executePipelines
has a chance that this value can change, mostly in the event of either a reroute processor or a processor updating the index target. We need to recapture the target index name on each top level pipeline execution.
I'm going to take a crack at the listener refactor though. If it's a bit much I'll file a followup issue/PR for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted to unify all the composed logic in the service into one listener - it didn't go as cleanly as I had originally hoped. There are still a number of edge cases that will need to be solved and so I've shelved the refactor for now. Instead I'm going with a small change to lowers the repetition.
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
Apologies, meant for docs PR |
This PR updates the ingest service to detect if a failed ingest document was bound for a data stream configured with a failure store, and in that event, restores the document to its original state, transforms it with its failure information, and redirects it to the failure store for the data stream it was originally targeting.
Example run with a default pipeline and data stream: