Skip to content

Commit 9d3a645

Browse files
authored
Redirect failed ingest node operations to a failure store when available (#103481)
This PR updates the ingest service to detect if a failed ingest document was bound for a data stream configured with a failure store, and in that event, restores the document to its original state, transforms it with its failure information, and redirects it to the failure store for the data stream it was originally targeting.
1 parent f879508 commit 9d3a645

File tree

12 files changed

+1120
-54
lines changed

12 files changed

+1120
-54
lines changed

docs/changelog/103481.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 103481
2+
summary: Redirect failed ingest node operations to a failure store when available
3+
area: Data streams
4+
type: feature
5+
issues: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
---
2+
teardown:
3+
- do:
4+
indices.delete_data_stream:
5+
name: logs-foobar
6+
ignore: 404
7+
8+
- do:
9+
indices.delete:
10+
index: .fs-logs-foobar-*
11+
ignore: 404
12+
13+
- do:
14+
indices.delete_index_template:
15+
name: generic_logs_template
16+
ignore: 404
17+
18+
- do:
19+
ingest.delete_pipeline:
20+
id: "failing_pipeline"
21+
ignore: 404
22+
23+
---
24+
"Redirect ingest failure in data stream to failure store":
25+
- skip:
26+
version: " - 8.12.99"
27+
reason: "data stream failure stores only redirect ingest failures in 8.13+"
28+
features: [allowed_warnings, contains]
29+
30+
- do:
31+
ingest.put_pipeline:
32+
id: "failing_pipeline"
33+
body: >
34+
{
35+
"description": "_description",
36+
"processors": [
37+
{
38+
"fail" : {
39+
"message" : "error_message"
40+
}
41+
}
42+
]
43+
}
44+
- match: { acknowledged: true }
45+
46+
- do:
47+
allowed_warnings:
48+
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
49+
indices.put_index_template:
50+
name: generic_logs_template
51+
body:
52+
index_patterns: logs-*
53+
data_stream:
54+
failure_store: true
55+
template:
56+
settings:
57+
number_of_shards: 1
58+
number_of_replicas: 1
59+
index:
60+
default_pipeline: "failing_pipeline"
61+
62+
- do:
63+
index:
64+
index: logs-foobar
65+
refresh: true
66+
body:
67+
'@timestamp': '2020-12-12'
68+
foo: bar
69+
70+
- do:
71+
indices.get_data_stream:
72+
name: logs-foobar
73+
- match: { data_streams.0.name: logs-foobar }
74+
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
75+
- length: { data_streams.0.indices: 1 }
76+
- match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
77+
- match: { data_streams.0.failure_store: true }
78+
- length: { data_streams.0.failure_indices: 1 }
79+
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
80+
81+
- do:
82+
search:
83+
index: logs-foobar
84+
body: { query: { match_all: {} } }
85+
- length: { hits.hits: 0 }
86+
87+
- do:
88+
search:
89+
index: .fs-logs-foobar-*
90+
- length: { hits.hits: 1 }
91+
- match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
92+
- exists: hits.hits.0._source.@timestamp
93+
- not_exists: hits.hits.0._source.foo
94+
- not_exists: hits.hits.0._source.document.id
95+
- match: { hits.hits.0._source.document.index: 'logs-foobar' }
96+
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
97+
- match: { hits.hits.0._source.document.source.foo: 'bar' }
98+
- match: { hits.hits.0._source.error.type: 'fail_processor_exception' }
99+
- match: { hits.hits.0._source.error.message: 'error_message' }
100+
- contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' }
101+
102+
- do:
103+
indices.delete_data_stream:
104+
name: logs-foobar
105+
- is_true: acknowledged
106+
107+
- do:
108+
indices.delete:
109+
index: .fs-logs-foobar-*
110+
- is_true: acknowledged

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,22 @@
88

99
package org.elasticsearch.action.bulk;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.apache.lucene.util.SparseFixedBitSet;
1214
import org.elasticsearch.action.ActionListener;
1315
import org.elasticsearch.action.DocWriteRequest;
1416
import org.elasticsearch.action.DocWriteResponse;
17+
import org.elasticsearch.action.index.IndexRequest;
1518
import org.elasticsearch.action.update.UpdateResponse;
19+
import org.elasticsearch.cluster.metadata.DataStream;
1620
import org.elasticsearch.cluster.metadata.IndexMetadata;
1721
import org.elasticsearch.common.util.set.Sets;
1822
import org.elasticsearch.core.Assertions;
1923
import org.elasticsearch.index.shard.ShardId;
24+
import org.elasticsearch.ingest.IngestService;
2025

26+
import java.io.IOException;
2127
import java.util.ArrayList;
2228
import java.util.Iterator;
2329
import java.util.List;
@@ -30,8 +36,17 @@
3036
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
3137
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
3238

39+
/**
40+
* Manages mutations to a bulk request that arise from the application of ingest pipelines. The modifier acts as an iterator over the
41+
* documents of a bulk request, keeping a record of all dropped and failed write requests in the overall bulk operation.
42+
* Once all pipelines have been applied, the modifier is used to create a new bulk request that will be used for executing the
43+
* remaining writes. When this final bulk operation is completed, the modifier is used to combine the results with those from the
44+
* ingest service to create the final bulk response.
45+
*/
3346
final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
3447

48+
private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class);
49+
3550
private static final String DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
3651

3752
final BulkRequest bulkRequest;
@@ -58,6 +73,13 @@ public boolean hasNext() {
5873
return (currentSlot + 1) < bulkRequest.requests().size();
5974
}
6075

76+
/**
77+
* Creates a new bulk request containing all documents from the original bulk request that have not been marked as failed
78+
* or dropped. Any failed or dropped documents are tracked as a side effect of this call so that they may be reflected in the
79+
* final bulk response.
80+
*
81+
* @return A new bulk request without the write operations removed during any ingest pipeline executions.
82+
*/
6183
BulkRequest getBulkRequest() {
6284
if (itemResponses.isEmpty()) {
6385
return bulkRequest;
@@ -80,6 +102,15 @@ BulkRequest getBulkRequest() {
80102
}
81103
}
82104

105+
/**
106+
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
107+
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
108+
* service with the results returned from running the remaining write operations.
109+
*
110+
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
111+
* @param actionListener The action listener that expects the final bulk response.
112+
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
113+
*/
83114
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
84115
if (itemResponses.isEmpty()) {
85116
return actionListener.map(
@@ -138,6 +169,11 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte
138169
}
139170
}
140171

172+
/**
173+
* Mark the document at the given slot in the bulk request as having failed in the ingest service.
174+
* @param slot the slot in the bulk request to mark as failed.
175+
* @param e the failure encountered.
176+
*/
141177
synchronized void markItemAsFailed(int slot, Exception e) {
142178
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
143179
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
@@ -150,6 +186,10 @@ synchronized void markItemAsFailed(int slot, Exception e) {
150186
itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure));
151187
}
152188

189+
/**
190+
* Mark the document at the given slot in the bulk request as having been dropped by the ingest service.
191+
* @param slot the slot in the bulk request to mark as dropped.
192+
*/
153193
synchronized void markItemAsDropped(int slot) {
154194
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
155195
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
@@ -164,4 +204,67 @@ synchronized void markItemAsDropped(int slot) {
164204
);
165205
itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped));
166206
}
207+
208+
/**
209+
* Mark the document at the given slot in the bulk request as having failed in the ingest service. The document will be redirected
210+
* to a data stream's failure store.
211+
* @param slot the slot in the bulk request to redirect.
212+
* @param targetIndexName the index that the document was targeting at the time of failure.
213+
* @param e the failure encountered.
214+
*/
215+
public void markItemForFailureStore(int slot, String targetIndexName, Exception e) {
216+
if (DataStream.isFailureStoreEnabled() == false) {
217+
// Assert false for development, but if we somehow find ourselves here, default to failure logic.
218+
assert false
219+
: "Attempting to route a failed write request type to a failure store but the failure store is not enabled! "
220+
+ "This should be guarded against in TransportBulkAction#shouldStoreFailure()";
221+
markItemAsFailed(slot, e);
222+
} else {
223+
// We get the index write request to find the source of the failed document
224+
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(bulkRequest.requests().get(slot));
225+
if (indexRequest == null) {
226+
// This is unlikely to happen ever since only source oriented operations (index, create, upsert) are considered for
227+
// ingest, but if it does happen, attempt to trip an assertion. If running in production, be defensive: Mark it failed
228+
// as normal, and log the info for later debugging if needed.
229+
assert false
230+
: "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. "
231+
+ "type: ["
232+
+ bulkRequest.requests().get(slot).getClass().getName()
233+
+ "], index: ["
234+
+ targetIndexName
235+
+ "]";
236+
markItemAsFailed(slot, e);
237+
logger.debug(
238+
() -> "Attempted to redirect an invalid write operation after ingest failure - type: ["
239+
+ bulkRequest.requests().get(slot).getClass().getName()
240+
+ "], index: ["
241+
+ targetIndexName
242+
+ "]"
243+
);
244+
} else {
245+
try {
246+
IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName);
247+
// This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to
248+
// the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again.
249+
errorDocument.isPipelineResolved(true);
250+
errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME);
251+
errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
252+
bulkRequest.requests.set(slot, errorDocument);
253+
} catch (IOException ioException) {
254+
// This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it
255+
// if we need the info later.
256+
e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one.
257+
logger.debug(
258+
() -> "Encountered exception while attempting to redirect a failed ingest operation: index ["
259+
+ targetIndexName
260+
+ "], source: ["
261+
+ indexRequest.source().utf8ToString()
262+
+ "]",
263+
ioException
264+
);
265+
markItemAsFailed(slot, e);
266+
}
267+
}
268+
}
269+
}
167270
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.bulk;
10+
11+
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.ExceptionsHelper;
13+
import org.elasticsearch.action.DocWriteRequest;
14+
import org.elasticsearch.action.index.IndexRequest;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
import java.util.function.Supplier;
21+
22+
/**
23+
* Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store.
24+
*/
25+
public final class FailureStoreDocument {
26+
27+
private FailureStoreDocument() {}
28+
29+
/**
30+
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
31+
* new {@link IndexRequest} that can be stored in a data stream's failure store.
32+
* @param source The original request that has failed to be ingested
33+
* @param exception The exception that was thrown that caused the request to fail to be ingested
34+
* @param targetIndexName The index that the request was targeting at time of failure
35+
* @return A new {@link IndexRequest} with a failure store compliant structure
36+
* @throws IOException If there is a problem when the document's new source is serialized
37+
*/
38+
public static IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException {
39+
return transformFailedRequest(source, exception, targetIndexName, System::currentTimeMillis);
40+
}
41+
42+
/**
43+
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
44+
* new {@link IndexRequest} that can be stored in a data stream's failure store.
45+
* @param source The original request that has failed to be ingested
46+
* @param exception The exception that was thrown that caused the request to fail to be ingested
47+
* @param targetIndexName The index that the request was targeting at time of failure
48+
* @param timeSupplier Supplies the value for the document's timestamp
49+
* @return A new {@link IndexRequest} with a failure store compliant structure
50+
* @throws IOException If there is a problem when the document's new source is serialized
51+
*/
52+
public static IndexRequest transformFailedRequest(
53+
IndexRequest source,
54+
Exception exception,
55+
String targetIndexName,
56+
Supplier<Long> timeSupplier
57+
) throws IOException {
58+
return new IndexRequest().index(targetIndexName)
59+
.source(createSource(source, exception, targetIndexName, timeSupplier))
60+
.opType(DocWriteRequest.OpType.CREATE)
61+
.setWriteToFailureStore(true);
62+
}
63+
64+
private static XContentBuilder createSource(
65+
IndexRequest source,
66+
Exception exception,
67+
String targetIndexName,
68+
Supplier<Long> timeSupplier
69+
) throws IOException {
70+
Objects.requireNonNull(source, "source must not be null");
71+
Objects.requireNonNull(exception, "exception must not be null");
72+
Objects.requireNonNull(targetIndexName, "targetIndexName must not be null");
73+
Objects.requireNonNull(timeSupplier, "timeSupplier must not be null");
74+
Throwable unwrapped = ExceptionsHelper.unwrapCause(exception);
75+
XContentBuilder builder = JsonXContent.contentBuilder();
76+
builder.startObject();
77+
{
78+
builder.timeField("@timestamp", timeSupplier.get());
79+
builder.startObject("document");
80+
{
81+
if (source.id() != null) {
82+
builder.field("id", source.id());
83+
}
84+
if (source.routing() != null) {
85+
builder.field("routing", source.routing());
86+
}
87+
builder.field("index", source.index());
88+
// Unmapped source field
89+
builder.startObject("source");
90+
{
91+
builder.mapContents(source.sourceAsMap());
92+
}
93+
builder.endObject();
94+
}
95+
builder.endObject();
96+
builder.startObject("error");
97+
{
98+
builder.field("type", ElasticsearchException.getExceptionName(unwrapped));
99+
builder.field("message", unwrapped.getMessage());
100+
builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped));
101+
// Further fields not yet tracked (Need to expose via specific exceptions)
102+
// - pipeline
103+
// - pipeline_trace
104+
// - processor
105+
}
106+
builder.endObject();
107+
}
108+
builder.endObject();
109+
return builder;
110+
}
111+
}

0 commit comments

Comments
 (0)