Skip to content

Redirect failed ingest node operations to a failure store when available #103481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b369c41
Add the original metadata to an ingest document
jbaiera Nov 2, 2023
b316177
Add additional handler for IngestService which allows for interceptin…
jbaiera Nov 9, 2023
b7b0e3c
Add logic for resolving if an index belongs to a data stream with a f…
jbaiera Dec 4, 2023
3dd68bb
Do not route failures unless they target a data stream
jbaiera Dec 4, 2023
2a47437
Convert errors into a failure document
jbaiera Dec 13, 2023
ee7cb89
Mark index request to be written to failure store
jbaiera Dec 14, 2023
300b001
Actually redirect failure doc
jbaiera Dec 14, 2023
d45ff24
Fix infinite loop when creating fresh index requests for failures in …
jbaiera Dec 14, 2023
f1782b1
Update docs/changelog/103481.yaml
jbaiera Dec 14, 2023
7cd959a
Refactor calls to ingest service to use new execute method
jbaiera Jan 8, 2024
947afd4
Ingest Service changes tested
jbaiera Jan 9, 2024
4a8e083
Add redirection test to TransportBulkActionIngestTests
jbaiera Jan 10, 2024
3648364
Add rest tests
jbaiera Jan 10, 2024
cccef0a
precommit
jbaiera Jan 10, 2024
59f250b
Comment cleanup
jbaiera Jan 10, 2024
0af09ee
Add javadoc to FailureStoreDocument, convert it to static utility
jbaiera Jan 12, 2024
4a5b8e6
Refactor IngestPipelinesExecutionResult to use static constants where…
jbaiera Jan 12, 2024
523ae5a
Replace kept with shouldKeep
jbaiera Jan 12, 2024
8e66e25
Add executeBulkRequest javadoc
jbaiera Jan 16, 2024
2f741b4
Simplify exception names
jbaiera Jan 16, 2024
786e0d5
Use cached timestamp in failure store detection.
jbaiera Jan 16, 2024
bfb4343
Add javadoc to bulk action methods
jbaiera Jan 17, 2024
c86f26f
Suppress the more niche exception
jbaiera Jan 17, 2024
1263146
Fully null safe index requests for marking items for failure store.
jbaiera Jan 17, 2024
1b82e25
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Jan 29, 2024
a2ab635
Harden the checks in IndexRequest
jbaiera Jan 29, 2024
13f311a
precommit
jbaiera Jan 29, 2024
b93cc93
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Jan 30, 2024
b78932c
Small doc updates related to double count bug.
jbaiera Jan 30, 2024
18617ef
Fix a mistake from the pr comment updates
jbaiera Jan 30, 2024
a680f5c
Persist the metadata in the ingest service instead of on ingest docum…
jbaiera Jan 31, 2024
46ec425
Revert adding the original metadata to an ingest document
jbaiera Jan 31, 2024
c60fd8a
Rework the exception handling logic into a consumer so that it isn't …
jbaiera Feb 1, 2024
5142491
Add dev time assertion in markItemForFailureStore
jbaiera Feb 1, 2024
1950dfc
Add unit tests for checking data streams for failure store compatibil…
jbaiera Feb 2, 2024
febae29
spotless
jbaiera Feb 2, 2024
8af1cf5
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Feb 2, 2024
01d4ea6
Re-apply changes to surprise refactoring
jbaiera Feb 2, 2024
c83fd50
spotless please
jbaiera Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/103481.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103481
summary: Redirect failed ingest node operations to a failure store when available
area: Data streams
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
teardown:
- do:
indices.delete_data_stream:
name: logs-foobar
ignore: 404

- do:
indices.delete:
index: .fs-logs-foobar-*
ignore: 404

- do:
indices.delete_index_template:
name: generic_logs_template
ignore: 404

- do:
ingest.delete_pipeline:
id: "failing_pipeline"
ignore: 404

---
"Redirect ingest failure in data stream to failure store":
- skip:
version: " - 8.12.99"
reason: "data stream failure stores only redirect ingest failures in 8.13+"
features: [allowed_warnings, contains]

- do:
ingest.put_pipeline:
id: "failing_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "error_message"
}
}
]
}
- match: { acknowledged: true }

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "failing_pipeline"

- do:
index:
index: logs-foobar
refresh: true
body:
'@timestamp': '2020-12-12'
foo: bar

- do:
indices.get_data_stream:
name: logs-foobar
- match: { data_streams.0.name: logs-foobar }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.failure_store: true }
- length: { data_streams.0.failure_indices: 1 }
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
search:
index: logs-foobar
body: { query: { match_all: {} } }
- length: { hits.hits: 0 }

- do:
search:
index: .fs-logs-foobar-*
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- exists: hits.hits.0._source.@timestamp
- not_exists: hits.hits.0._source.foo
- not_exists: hits.hits.0._source.document.id
- match: { hits.hits.0._source.document.index: 'logs-foobar' }
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
- match: { hits.hits.0._source.document.source.foo: 'bar' }
- match: { hits.hits.0._source.error.type: 'fail_processor_exception' }
- match: { hits.hits.0._source.error.message: 'error_message' }
- contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' }

- do:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged

- do:
indices.delete:
index: .fs-logs-foobar-*
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@

package org.elasticsearch.action.bulk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ingest.IngestService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -30,8 +36,17 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* Manages mutations to a bulk request that arise from the application of ingest pipelines. The modifier acts as an iterator over the
* documents of a bulk request, keeping a record of all dropped and failed write requests in the overall bulk operation.
* Once all pipelines have been applied, the modifier is used to create a new bulk request that will be used for executing the
* remaining writes. When this final bulk operation is completed, the modifier is used to combine the results with those from the
* ingest service to create the final bulk response.
*/
final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class);

private static final String DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";

final BulkRequest bulkRequest;
Expand All @@ -58,6 +73,13 @@ public boolean hasNext() {
return (currentSlot + 1) < bulkRequest.requests().size();
}

/**
* Creates a new bulk request containing all documents from the original bulk request that have not been marked as failed
* or dropped. Any failed or dropped documents are tracked as a side effect of this call so that they may be reflected in the
* final bulk response.
*
* @return A new bulk request without the write operations removed during any ingest pipeline executions.
*/
BulkRequest getBulkRequest() {
if (itemResponses.isEmpty()) {
return bulkRequest;
Expand All @@ -80,6 +102,15 @@ BulkRequest getBulkRequest() {
}
}

/**
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
*
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
* @param actionListener The action listener that expects the final bulk response.
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return actionListener.map(
Expand Down Expand Up @@ -138,6 +169,11 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte
}
}

/**
* Mark the document at the given slot in the bulk request as having failed in the ingest service.
* @param slot the slot in the bulk request to mark as failed.
* @param e the failure encountered.
*/
synchronized void markItemAsFailed(int slot, Exception e) {
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
Expand All @@ -150,6 +186,10 @@ synchronized void markItemAsFailed(int slot, Exception e) {
itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure));
}

/**
* Mark the document at the given slot in the bulk request as having been dropped by the ingest service.
* @param slot the slot in the bulk request to mark as dropped.
*/
synchronized void markItemAsDropped(int slot) {
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
Expand All @@ -164,4 +204,67 @@ synchronized void markItemAsDropped(int slot) {
);
itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped));
}

/**
* Mark the document at the given slot in the bulk request as having failed in the ingest service. The document will be redirected
* to a data stream's failure store.
* @param slot the slot in the bulk request to redirect.
* @param targetIndexName the index that the document was targeting at the time of failure.
* @param e the failure encountered.
*/
public void markItemForFailureStore(int slot, String targetIndexName, Exception e) {
if (DataStream.isFailureStoreEnabled() == false) {
// Assert false for development, but if we somehow find ourselves here, default to failure logic.
assert false
: "Attempting to route a failed write request type to a failure store but the failure store is not enabled! "
+ "This should be guarded against in TransportBulkAction#shouldStoreFailure()";
markItemAsFailed(slot, e);
} else {
// We get the index write request to find the source of the failed document
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(bulkRequest.requests().get(slot));
if (indexRequest == null) {
// This is unlikely to happen ever since only source oriented operations (index, create, upsert) are considered for
// ingest, but if it does happen, attempt to trip an assertion. If running in production, be defensive: Mark it failed
// as normal, and log the info for later debugging if needed.
assert false
: "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. "
+ "type: ["
+ bulkRequest.requests().get(slot).getClass().getName()
+ "], index: ["
+ targetIndexName
+ "]";
markItemAsFailed(slot, e);
logger.debug(
() -> "Attempted to redirect an invalid write operation after ingest failure - type: ["
+ bulkRequest.requests().get(slot).getClass().getName()
+ "], index: ["
+ targetIndexName
+ "]"
);
} else {
try {
IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName);
// This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to
// the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again.
errorDocument.isPipelineResolved(true);
errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME);
errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
bulkRequest.requests.set(slot, errorDocument);
} catch (IOException ioException) {
// This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it
// if we need the info later.
e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one.
logger.debug(
() -> "Encountered exception while attempting to redirect a failed ingest operation: index ["
+ targetIndexName
+ "], source: ["
+ indexRequest.source().utf8ToString()
+ "]",
ioException
);
markItemAsFailed(slot, e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store.
*/
public final class FailureStoreDocument {

private FailureStoreDocument() {}

/**
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
* new {@link IndexRequest} that can be stored in a data stream's failure store.
* @param source The original request that has failed to be ingested
* @param exception The exception that was thrown that caused the request to fail to be ingested
* @param targetIndexName The index that the request was targeting at time of failure
* @return A new {@link IndexRequest} with a failure store compliant structure
* @throws IOException If there is a problem when the document's new source is serialized
*/
public static IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException {
return transformFailedRequest(source, exception, targetIndexName, System::currentTimeMillis);
}

/**
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
* new {@link IndexRequest} that can be stored in a data stream's failure store.
* @param source The original request that has failed to be ingested
* @param exception The exception that was thrown that caused the request to fail to be ingested
* @param targetIndexName The index that the request was targeting at time of failure
* @param timeSupplier Supplies the value for the document's timestamp
* @return A new {@link IndexRequest} with a failure store compliant structure
* @throws IOException If there is a problem when the document's new source is serialized
*/
public static IndexRequest transformFailedRequest(
IndexRequest source,
Exception exception,
String targetIndexName,
Supplier<Long> timeSupplier
) throws IOException {
return new IndexRequest().index(targetIndexName)
.source(createSource(source, exception, targetIndexName, timeSupplier))
.opType(DocWriteRequest.OpType.CREATE)
.setWriteToFailureStore(true);
}

private static XContentBuilder createSource(
IndexRequest source,
Exception exception,
String targetIndexName,
Supplier<Long> timeSupplier
) throws IOException {
Objects.requireNonNull(source, "source must not be null");
Objects.requireNonNull(exception, "exception must not be null");
Objects.requireNonNull(targetIndexName, "targetIndexName must not be null");
Objects.requireNonNull(timeSupplier, "timeSupplier must not be null");
Throwable unwrapped = ExceptionsHelper.unwrapCause(exception);
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
{
builder.timeField("@timestamp", timeSupplier.get());
builder.startObject("document");
{
if (source.id() != null) {
builder.field("id", source.id());
}
if (source.routing() != null) {
builder.field("routing", source.routing());
}
builder.field("index", source.index());
// Unmapped source field
builder.startObject("source");
{
builder.mapContents(source.sourceAsMap());
}
builder.endObject();
}
builder.endObject();
builder.startObject("error");
{
builder.field("type", ElasticsearchException.getExceptionName(unwrapped));
builder.field("message", unwrapped.getMessage());
builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped));
// Further fields not yet tracked (Need to expose via specific exceptions)
// - pipeline
// - pipeline_trace
// - processor
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
Loading