-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Redirect failed ingest node operations to a failure store when available #103481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
b369c41
b316177
b7b0e3c
3dd68bb
2a47437
ee7cb89
300b001
d45ff24
f1782b1
7cd959a
947afd4
4a8e083
3648364
cccef0a
59f250b
0af09ee
4a5b8e6
523ae5a
8e66e25
2f741b4
786e0d5
bfb4343
c86f26f
1263146
1b82e25
a2ab635
13f311a
b93cc93
b78932c
18617ef
a680f5c
46ec425
c60fd8a
5142491
1950dfc
febae29
8af1cf5
01d4ea6
c83fd50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 103481 | ||
summary: Redirect failed ingest node operations to a failure store when available | ||
area: Data streams | ||
type: feature | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
--- | ||
teardown: | ||
- do: | ||
indices.delete_data_stream: | ||
name: logs-foobar | ||
ignore: 404 | ||
|
||
- do: | ||
indices.delete: | ||
index: .fs-logs-foobar-* | ||
ignore: 404 | ||
|
||
- do: | ||
indices.delete_index_template: | ||
name: generic_logs_template | ||
ignore: 404 | ||
|
||
- do: | ||
ingest.delete_pipeline: | ||
id: "failing_pipeline" | ||
ignore: 404 | ||
|
||
--- | ||
"Redirect ingest failure in data stream to failure store": | ||
- skip: | ||
version: " - 8.12.99" | ||
reason: "data stream failure stores only redirect ingest failures in 8.13+" | ||
features: [allowed_warnings, contains] | ||
|
||
- do: | ||
ingest.put_pipeline: | ||
id: "failing_pipeline" | ||
body: > | ||
{ | ||
"description": "_description", | ||
"processors": [ | ||
{ | ||
"fail" : { | ||
"message" : "error_message" | ||
} | ||
} | ||
] | ||
} | ||
- match: { acknowledged: true } | ||
|
||
- do: | ||
allowed_warnings: | ||
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" | ||
indices.put_index_template: | ||
name: generic_logs_template | ||
body: | ||
index_patterns: logs-* | ||
data_stream: | ||
failure_store: true | ||
template: | ||
settings: | ||
number_of_shards: 1 | ||
number_of_replicas: 1 | ||
index: | ||
default_pipeline: "failing_pipeline" | ||
|
||
- do: | ||
index: | ||
index: logs-foobar | ||
refresh: true | ||
body: | ||
'@timestamp': '2020-12-12' | ||
foo: bar | ||
|
||
- do: | ||
indices.get_data_stream: | ||
name: logs-foobar | ||
- match: { data_streams.0.name: logs-foobar } | ||
- match: { data_streams.0.timestamp_field.name: '@timestamp' } | ||
- length: { data_streams.0.indices: 1 } | ||
- match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } | ||
- match: { data_streams.0.failure_store: true } | ||
- length: { data_streams.0.failure_indices: 1 } | ||
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } | ||
|
||
- do: | ||
search: | ||
index: logs-foobar | ||
body: { query: { match_all: {} } } | ||
- length: { hits.hits: 0 } | ||
|
||
- do: | ||
search: | ||
index: .fs-logs-foobar-* | ||
- length: { hits.hits: 1 } | ||
- match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" } | ||
- exists: hits.hits.0._source.@timestamp | ||
- not_exists: hits.hits.0._source.foo | ||
- not_exists: hits.hits.0._source.document.id | ||
- match: { hits.hits.0._source.document.index: 'logs-foobar' } | ||
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' } | ||
- match: { hits.hits.0._source.document.source.foo: 'bar' } | ||
- match: { hits.hits.0._source.error.type: 'fail_processor_exception' } | ||
- match: { hits.hits.0._source.error.message: 'error_message' } | ||
- contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' } | ||
|
||
- do: | ||
indices.delete_data_stream: | ||
name: logs-foobar | ||
- is_true: acknowledged | ||
|
||
- do: | ||
indices.delete: | ||
index: .fs-logs-foobar-* | ||
- is_true: acknowledged |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.bulk; | ||
|
||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.action.DocWriteRequest; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.xcontent.XContentBuilder; | ||
import org.elasticsearch.xcontent.json.JsonXContent; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
|
||
public class FailureStoreDocument { | ||
|
||
private final IndexRequest source; | ||
private final Exception exception; | ||
private final String targetIndexName; | ||
private final Supplier<Long> timeSupplier; | ||
|
||
public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName) { | ||
this(source, exception, targetIndexName, System::currentTimeMillis); | ||
} | ||
|
||
public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName, Supplier<Long> timeSupplier) { | ||
this.source = Objects.requireNonNull(source, "source must not be null"); | ||
this.exception = Objects.requireNonNull(exception, "exception must not be null"); | ||
this.targetIndexName = Objects.requireNonNull(targetIndexName, "targetIndexName must not be null"); | ||
this.timeSupplier = Objects.requireNonNull(timeSupplier, "timeSupplier must not be null"); | ||
} | ||
|
||
public IndexRequest convert() throws IOException { | ||
return new IndexRequest().index(targetIndexName) | ||
.source(createSource()) | ||
.opType(DocWriteRequest.OpType.CREATE) | ||
.setWriteToFailureStore(true); | ||
} | ||
|
||
private XContentBuilder createSource() throws IOException { | ||
Throwable unwrapped = ExceptionsHelper.unwrapCause(exception); | ||
XContentBuilder builder = JsonXContent.contentBuilder(); | ||
builder.startObject(); | ||
{ | ||
builder.timeField("@timestamp", timeSupplier.get()); | ||
builder.startObject("document"); | ||
{ | ||
if (source.id() != null) { | ||
builder.field("id", source.id()); | ||
} | ||
if (source.routing() != null) { | ||
builder.field("routing", source.routing()); | ||
} | ||
builder.field("index", source.index()); | ||
// Unmapped source field | ||
builder.startObject("source"); | ||
{ | ||
builder.mapContents(source.sourceAsMap()); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
builder.startObject("error"); | ||
{ | ||
builder.field("type", ElasticsearchException.getExceptionName(unwrapped)); | ||
builder.field("message", unwrapped.getMessage()); | ||
builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped)); | ||
// Further fields not yet tracked (Need to expose via specific exceptions) | ||
// - pipeline | ||
// - pipeline_trace | ||
// - processor | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,11 +42,13 @@ | |
import org.elasticsearch.cluster.ClusterStateObserver; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; | ||
import org.elasticsearch.cluster.metadata.DataStream; | ||
import org.elasticsearch.cluster.metadata.IndexAbstraction; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.Metadata; | ||
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; | ||
import org.elasticsearch.cluster.routing.IndexRouting; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
|
@@ -71,13 +73,15 @@ | |
import org.elasticsearch.threadpool.ThreadPool.Names; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.SortedMap; | ||
import java.util.concurrent.TimeUnit; | ||
|
@@ -336,7 +340,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec | |
assert arePipelinesResolved : bulkRequest; | ||
} | ||
if (clusterService.localNode().isIngestNode()) { | ||
processBulkIndexIngestRequest(task, bulkRequest, executorName, l); | ||
processBulkIndexIngestRequest(task, bulkRequest, executorName, metadata, l); | ||
} else { | ||
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); | ||
} | ||
|
@@ -907,6 +911,7 @@ private void processBulkIndexIngestRequest( | |
Task task, | ||
BulkRequest original, | ||
String executorName, | ||
Metadata metadata, | ||
ActionListener<BulkResponse> listener | ||
) { | ||
final long ingestStartTimeInNanos = System.nanoTime(); | ||
|
@@ -915,6 +920,8 @@ private void processBulkIndexIngestRequest( | |
original.numberOfActions(), | ||
() -> bulkRequestModifier, | ||
bulkRequestModifier::markItemAsDropped, | ||
(indexName) -> shouldStoreFailure(indexName, metadata, System.currentTimeMillis()), | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bulkRequestModifier::markItemForFailureStore, | ||
bulkRequestModifier::markItemAsFailed, | ||
(originalThread, exception) -> { | ||
if (exception != null) { | ||
|
@@ -962,6 +969,60 @@ public boolean isForceExecution() { | |
); | ||
} | ||
|
||
static boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) { | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return DataStream.isFailureStoreEnabled() | ||
&& resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or( | ||
() -> resolveFailureStoreFromTemplate(indexName, metadata) | ||
).orElse(false); | ||
} | ||
|
||
private static Optional<Boolean> resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) { | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (indexName == null) { | ||
return Optional.empty(); | ||
} | ||
|
||
// Get index abstraction, resolving date math if it exists | ||
IndexAbstraction indexAbstraction = metadata.getIndicesLookup() | ||
.get(IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis)); | ||
|
||
// We only store failures if the failure is being written to a data stream, | ||
// not when directly writing to backing indices/failure stores | ||
if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) { | ||
return Optional.empty(); | ||
} | ||
|
||
// Locate the write index for the abstraction, and check if it has a data stream associated with it. | ||
// This handles alias resolution as well as data stream resolution. | ||
Index writeIndex = indexAbstraction.getWriteIndex(); | ||
assert writeIndex != null : "Could not resolve write index for resource [" + indexName + "]"; | ||
IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName()); | ||
DataStream targetDataStream = writeAbstraction.getParentDataStream(); | ||
|
||
// We will store the failure if the write target belongs to a data stream with a failure store. | ||
return Optional.of(targetDataStream != null && targetDataStream.isFailureStore()); | ||
} | ||
|
||
private static Optional<Boolean> resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (indexName == null) { | ||
return Optional.empty(); | ||
} | ||
|
||
// Check to see if the index name matches any templates such that an index would have been attributed | ||
// We don't check v1 templates at all because failure stores can only exist on data streams via a v2 template | ||
String template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false); | ||
if (template != null) { | ||
// Check if this is a data stream template or if it is just a normal index. | ||
ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(template); | ||
if (composableIndexTemplate.getDataStreamTemplate() != null) { | ||
// Check if the data stream has the failure store enabled | ||
return Optional.of(composableIndexTemplate.getDataStreamTemplate().hasFailureStore()); | ||
} | ||
} | ||
|
||
// Could not locate a failure store via template | ||
return Optional.empty(); | ||
} | ||
|
||
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> { | ||
|
||
final BulkRequest bulkRequest; | ||
|
@@ -1060,5 +1121,35 @@ synchronized void markItemAsFailed(int slot, Exception e) { | |
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e); | ||
itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure)); | ||
} | ||
|
||
public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you go with an separate listener, it would really cool if captured the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be tricky, or I'm not fully understanding the suggestion. The target index name in this case is the index that we were targeting at the start of an ingest pipeline execution. Each recursive call to I'm going to take a crack at the listener refactor though. If it's a bit much I'll file a followup issue/PR for it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I attempted to unify all the composed logic in the service into one listener - it didn't go as cleanly as I had originally hoped. There are still a number of edge cases that will need to be solved and so I've shelved the refactor for now. Instead I'm going with a small change to lowers the repetition. |
||
if (DataStream.isFailureStoreEnabled() == false) { | ||
markItemAsFailed(slot, e); | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert(); | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to | ||
// the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. | ||
errorDocument.isPipelineResolved(true); | ||
errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); | ||
errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); | ||
bulkRequest.requests.set(slot, errorDocument); | ||
} catch (IOException ex) { | ||
// This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it if | ||
// we need the info later. | ||
ex.addSuppressed(e); | ||
jbaiera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.debug( | ||
() -> "Encountered exception while attempting to redirect a failed ingest operation: index [" | ||
+ targetIndexName | ||
+ "], source: [" | ||
+ indexRequest.source().utf8ToString() | ||
+ "]", | ||
ex | ||
); | ||
markItemAsFailed(slot, ex); | ||
} | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.