Skip to content

Redirect failed ingest node operations to a failure store when available #103481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Feb 5, 2024

Conversation

jbaiera
Copy link
Member

@jbaiera jbaiera commented Dec 14, 2023

This PR updates the ingest service to detect if a failed ingest document was bound for a data stream configured with a failure store, and in that event, restores the document to its original state, transforms it with its failure information, and redirects it to the failure store for the data stream it was originally targeting.

Example run with a default pipeline and data stream:

PUT _ingest/pipeline/testpipeline
{
  "processors": [
    {
      "fail": {
        "message": "This test pipeline fails for all documents"
      }
    }
  ]
}

PUT _index_template/my_data_stream_template
{
  "index_patterns" : ["my_data_stream*"], 
  "data_stream": {
    "failure_store": true
  },
  "priority" : 1,
  "template": {
    "settings" : {
      "number_of_shards" : 1, 
      "index.default_pipeline": "testpipeline"
    }
  }
}

POST my_data_stream_1/_doc
{
  "key": "value",
  "@timestamp": "2023-12-14T12:00:00Z"
}
>>>
{
  "_index": ".fs-my_data_stream_1-2023.12.14-000001",
  "_id": "8es0aowBHIk1gE8HEwcs",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

POST .fs-my_data_stream_1-2023.12.14-000001/_search
>>>
{
  "took": 47,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": ".fs-my_data_stream_1-2023.12.14-000001",
        "_id": "8es0aowBHIk1gE8HEwcs",
        "_score": 1,
        "_source": {
          "@timestamp": "2023-12-14T21:20:46.566Z",
          "document": {
            "index": "my_data_stream_1",
            "source": {
              "@timestamp": "2023-12-14T12:00:00Z",
              "key": "value"
            }
          },
          "error": {
            "type": "fail_processor_exception",
            "message": "This test pipeline fails for all documents",
            "stack_trace": "org.elasticsearch.ingest.common.FailProcessorException: This test pipeline fails for all documents\n\tat org.elasticsearch.ingest.common@8.12.0-SNAPSHOT/org.elasticsearch.ingest.common.FailProcessor.execute(FailProcessor.java:41)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:165)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:141)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:129)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:867)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.IngestService.executePipeline(IngestService.java:1020)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.IngestService.executePipelines(IngestService.java:879)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.ingest.IngestService$1.doRun(IngestService.java:765)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:33)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:983)\n\tat org.elasticsearch.server@8.12.0-SNAPSHOT/org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"
          }
        }
      }
    ]
  }
}

@elasticsearchmachine
Copy link
Collaborator

Hi @jbaiera, I've created a changelog YAML for you.

Comment on lines 799 to 823
// TODO: Should this be a harder backstop than an assert statement?
assert ia.isDataStreamRelated()
: "Attempting to write a document to a failure store but the targeted index is not a data stream";
// Resolve write index and get parent data stream to handle the case of dealing with an alias
String defaultWriteIndexName = ia.getWriteIndex().getName();
DataStream dataStream = metadata.getIndicesLookup().get(defaultWriteIndexName).getParentDataStream();
// TODO: Should this be a harder backstop than an assert statement?
assert dataStream.getFailureIndices().size() > 0
: "Attempting to write a document to a failure store but the target data stream does not have one enabled";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertions: Do we want them to be stronger checks that will trigger at runtime? I can imagine things going poorly if a failure document ends up somewhere it shouldn't

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I suppose this could happen if we weren't consistent with passing in the same Metadata everywhere, and the index got removed from the metadata. That would definitely be a fun though, and since if that were the case and asserts weren't enabled we'd just fail in the next line with an index-out-of-bounds exception, I think it'd be better to make this a real check so we can at least have a useful message. What do you think?

@jbaiera jbaiera requested a review from dakrone January 9, 2024 18:40
@jbaiera jbaiera force-pushed the data-stream-ingest-failure-redirect branch from 9b5791b to cccef0a Compare January 10, 2024 19:48
@jbaiera jbaiera marked this pull request as ready for review January 10, 2024 19:49
@elasticsearchmachine elasticsearchmachine added the Team:Data Management Meta label for data/management team label Jan 10, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Jimmy, I left some comments

@jbaiera jbaiera requested a review from dakrone January 29, 2024 19:41
Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for adding this Jimmy! I left two minor comments about potentially adding some unit tests, but otherwise looks good.

* @param targetIndexName the index that the document was targeting at the time of failure.
* @param e the failure encountered.
*/
public void markItemForFailureStore(int slot, String targetIndexName, Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you go with an separate listener, it would really cool if captured the targetIndexName on construction rather than requiring it be passed in here -- but I understand if that's not possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be tricky, or I'm not fully understanding the suggestion. The target index name in this case is the index that we were targeting at the start of an ingest pipeline execution. Each recursive call to executePipelines has a chance that this value can change, mostly in the event of either a reroute processor or a processor updating the index target. We need to recapture the target index name on each top level pipeline execution.

I'm going to take a crack at the listener refactor though. If it's a bit much I'll file a followup issue/PR for it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to unify all the composed logic in the service into one listener - it didn't go as cleanly as I had originally hoped. There are still a number of edge cases that will need to be solved and so I've shelved the refactor for now. Instead I'm going with a small change to lowers the repetition.

@jbaiera jbaiera merged commit 9d3a645 into elastic:main Feb 5, 2024
@jbaiera jbaiera deleted the data-stream-ingest-failure-redirect branch February 5, 2024 19:37
@bvader
Copy link

bvader commented May 19, 2025

Apologies, meant for docs PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Data streams Data streams and their lifecycles >feature Team:Data Management Meta label for data/management team v8.13.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants