Skip to content

Redirect failed ingest node operations to a failure store when available #103481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b369c41
Add the original metadata to an ingest document
jbaiera Nov 2, 2023
b316177
Add additional handler for IngestService which allows for interceptin…
jbaiera Nov 9, 2023
b7b0e3c
Add logic for resolving if an index belongs to a data stream with a f…
jbaiera Dec 4, 2023
3dd68bb
Do not route failures unless they target a data stream
jbaiera Dec 4, 2023
2a47437
Convert errors into a failure document
jbaiera Dec 13, 2023
ee7cb89
Mark index request to be written to failure store
jbaiera Dec 14, 2023
300b001
Actually redirect failure doc
jbaiera Dec 14, 2023
d45ff24
Fix infinite loop when creating fresh index requests for failures in …
jbaiera Dec 14, 2023
f1782b1
Update docs/changelog/103481.yaml
jbaiera Dec 14, 2023
7cd959a
Refactor calls to ingest service to use new execute method
jbaiera Jan 8, 2024
947afd4
Ingest Service changes tested
jbaiera Jan 9, 2024
4a8e083
Add redirection test to TransportBulkActionIngestTests
jbaiera Jan 10, 2024
3648364
Add rest tests
jbaiera Jan 10, 2024
cccef0a
precommit
jbaiera Jan 10, 2024
59f250b
Comment cleanup
jbaiera Jan 10, 2024
0af09ee
Add javadoc to FailureStoreDocument, convert it to static utility
jbaiera Jan 12, 2024
4a5b8e6
Refactor IngestPipelinesExecutionResult to use static constants where…
jbaiera Jan 12, 2024
523ae5a
Replace kept with shouldKeep
jbaiera Jan 12, 2024
8e66e25
Add executeBulkRequest javadoc
jbaiera Jan 16, 2024
2f741b4
Simplify exception names
jbaiera Jan 16, 2024
786e0d5
Use cached timestamp in failure store detection.
jbaiera Jan 16, 2024
bfb4343
Add javadoc to bulk action methods
jbaiera Jan 17, 2024
c86f26f
Suppress the more niche exception
jbaiera Jan 17, 2024
1263146
Fully null safe index requests for marking items for failure store.
jbaiera Jan 17, 2024
1b82e25
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Jan 29, 2024
a2ab635
Harden the checks in IndexRequest
jbaiera Jan 29, 2024
13f311a
precommit
jbaiera Jan 29, 2024
b93cc93
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Jan 30, 2024
b78932c
Small doc updates related to double count bug.
jbaiera Jan 30, 2024
18617ef
Fix a mistake from the pr comment updates
jbaiera Jan 30, 2024
a680f5c
Persist the metadata in the ingest service instead of on ingest docum…
jbaiera Jan 31, 2024
46ec425
Revert adding the original metadata to an ingest document
jbaiera Jan 31, 2024
c60fd8a
Rework the exception handling logic into a consumer so that it isn't …
jbaiera Feb 1, 2024
5142491
Add dev time assertion in markItemForFailureStore
jbaiera Feb 1, 2024
1950dfc
Add unit tests for checking data streams for failure store compatibil…
jbaiera Feb 2, 2024
febae29
spotless
jbaiera Feb 2, 2024
8af1cf5
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Feb 2, 2024
01d4ea6
Re-apply changes to surprise refactoring
jbaiera Feb 2, 2024
c83fd50
spotless please
jbaiera Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/103481.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103481
summary: Redirect failed ingest node operations to a failure store when available
area: Data streams
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
teardown:
- do:
indices.delete_data_stream:
name: logs-foobar
ignore: 404

- do:
indices.delete:
index: .fs-logs-foobar-*
ignore: 404

- do:
indices.delete_index_template:
name: generic_logs_template
ignore: 404

- do:
ingest.delete_pipeline:
id: "failing_pipeline"
ignore: 404

---
"Redirect ingest failure in data stream to failure store":
- skip:
version: " - 8.12.99"
reason: "data stream failure stores only redirect ingest failures in 8.13+"
features: [allowed_warnings, contains]

- do:
ingest.put_pipeline:
id: "failing_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "error_message"
}
}
]
}
- match: { acknowledged: true }

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "failing_pipeline"

- do:
index:
index: logs-foobar
refresh: true
body:
'@timestamp': '2020-12-12'
foo: bar

- do:
indices.get_data_stream:
name: logs-foobar
- match: { data_streams.0.name: logs-foobar }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.failure_store: true }
- length: { data_streams.0.failure_indices: 1 }
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
search:
index: logs-foobar
body: { query: { match_all: {} } }
- length: { hits.hits: 0 }

- do:
search:
index: .fs-logs-foobar-*
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- exists: hits.hits.0._source.@timestamp
- not_exists: hits.hits.0._source.foo
- not_exists: hits.hits.0._source.document.id
- match: { hits.hits.0._source.document.index: 'logs-foobar' }
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
- match: { hits.hits.0._source.document.source.foo: 'bar' }
- match: { hits.hits.0._source.error.type: 'fail_processor_exception' }
- match: { hits.hits.0._source.error.message: 'error_message' }
- contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' }

- do:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged

- do:
indices.delete:
index: .fs-logs-foobar-*
- is_true: acknowledged
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

public class FailureStoreDocument {

private final IndexRequest source;
private final Exception exception;
private final String targetIndexName;
private final Supplier<Long> timeSupplier;

public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName) {
this(source, exception, targetIndexName, System::currentTimeMillis);
}

public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName, Supplier<Long> timeSupplier) {
this.source = Objects.requireNonNull(source, "source must not be null");
this.exception = Objects.requireNonNull(exception, "exception must not be null");
this.targetIndexName = Objects.requireNonNull(targetIndexName, "targetIndexName must not be null");
this.timeSupplier = Objects.requireNonNull(timeSupplier, "timeSupplier must not be null");
}

public IndexRequest convert() throws IOException {
return new IndexRequest().index(targetIndexName)
.source(createSource())
.opType(DocWriteRequest.OpType.CREATE)
.setWriteToFailureStore(true);
}

private XContentBuilder createSource() throws IOException {
Throwable unwrapped = ExceptionsHelper.unwrapCause(exception);
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
{
builder.timeField("@timestamp", timeSupplier.get());
builder.startObject("document");
{
if (source.id() != null) {
builder.field("id", source.id());
}
if (source.routing() != null) {
builder.field("routing", source.routing());
}
builder.field("index", source.index());
// Unmapped source field
builder.startObject("source");
{
builder.mapContents(source.sourceAsMap());
}
builder.endObject();
}
builder.endObject();
builder.startObject("error");
{
builder.field("type", ElasticsearchException.getExceptionName(unwrapped));
builder.field("message", unwrapped.getMessage());
builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped));
// Further fields not yet tracked (Need to expose via specific exceptions)
// - pipeline
// - pipeline_trace
// - processor
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -71,13 +73,15 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -336,7 +340,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executorName, l);
processBulkIndexIngestRequest(task, bulkRequest, executorName, metadata, l);
} else {
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
}
Expand Down Expand Up @@ -907,6 +911,7 @@ private void processBulkIndexIngestRequest(
Task task,
BulkRequest original,
String executorName,
Metadata metadata,
ActionListener<BulkResponse> listener
) {
final long ingestStartTimeInNanos = System.nanoTime();
Expand All @@ -915,6 +920,8 @@ private void processBulkIndexIngestRequest(
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
(indexName) -> shouldStoreFailure(indexName, metadata, System.currentTimeMillis()),
bulkRequestModifier::markItemForFailureStore,
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
Expand Down Expand Up @@ -962,6 +969,60 @@ public boolean isForceExecution() {
);
}

static boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) {
return DataStream.isFailureStoreEnabled()
&& resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or(
() -> resolveFailureStoreFromTemplate(indexName, metadata)
).orElse(false);
}

private static Optional<Boolean> resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) {
if (indexName == null) {
return Optional.empty();
}

// Get index abstraction, resolving date math if it exists
IndexAbstraction indexAbstraction = metadata.getIndicesLookup()
.get(IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis));

// We only store failures if the failure is being written to a data stream,
// not when directly writing to backing indices/failure stores
if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) {
return Optional.empty();
}

// Locate the write index for the abstraction, and check if it has a data stream associated with it.
// This handles alias resolution as well as data stream resolution.
Index writeIndex = indexAbstraction.getWriteIndex();
assert writeIndex != null : "Could not resolve write index for resource [" + indexName + "]";
IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName());
DataStream targetDataStream = writeAbstraction.getParentDataStream();

// We will store the failure if the write target belongs to a data stream with a failure store.
return Optional.of(targetDataStream != null && targetDataStream.isFailureStore());
}

private static Optional<Boolean> resolveFailureStoreFromTemplate(String indexName, Metadata metadata) {
if (indexName == null) {
return Optional.empty();
}

// Check to see if the index name matches any templates such that an index would have been attributed
// We don't check v1 templates at all because failure stores can only exist on data streams via a v2 template
String template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false);
if (template != null) {
// Check if this is a data stream template or if it is just a normal index.
ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(template);
if (composableIndexTemplate.getDataStreamTemplate() != null) {
// Check if the data stream has the failure store enabled
return Optional.of(composableIndexTemplate.getDataStreamTemplate().hasFailureStore());
}
}

// Could not locate a failure store via template
return Optional.empty();
}

static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

final BulkRequest bulkRequest;
Expand Down Expand Up @@ -1060,5 +1121,35 @@ synchronized void markItemAsFailed(int slot, Exception e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e);
itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure));
}

public void markItemForFailureStore(int slot, String targetIndexName, Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you go with an separate listener, it would really cool if captured the targetIndexName on construction rather than requiring it be passed in here -- but I understand if that's not possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be tricky, or I'm not fully understanding the suggestion. The target index name in this case is the index that we were targeting at the start of an ingest pipeline execution. Each recursive call to executePipelines has a chance that this value can change, mostly in the event of either a reroute processor or a processor updating the index target. We need to recapture the target index name on each top level pipeline execution.

I'm going to take a crack at the listener refactor though. If it's a bit much I'll file a followup issue/PR for it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to unify all the composed logic in the service into one listener - it didn't go as cleanly as I had originally hoped. There are still a number of edge cases that will need to be solved and so I've shelved the refactor for now. Instead I'm going with a small change to lowers the repetition.

if (DataStream.isFailureStoreEnabled() == false) {
markItemAsFailed(slot, e);
} else {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
try {
IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert();
// This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to
// the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again.
errorDocument.isPipelineResolved(true);
errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME);
errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
bulkRequest.requests.set(slot, errorDocument);
} catch (IOException ex) {
// This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it if
// we need the info later.
ex.addSuppressed(e);
logger.debug(
() -> "Encountered exception while attempting to redirect a failed ingest operation: index ["
+ targetIndexName
+ "], source: ["
+ indexRequest.source().utf8ToString()
+ "]",
ex
);
markItemAsFailed(slot, ex);
}
}
}
}
}
Loading