Skip to content

Redirect failed ingest node operations to a failure store when available #103481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b369c41
Add the original metadata to an ingest document
jbaiera Nov 2, 2023
b316177
Add additional handler for IngestService which allows for interceptin…
jbaiera Nov 9, 2023
b7b0e3c
Add logic for resolving if an index belongs to a data stream with a f…
jbaiera Dec 4, 2023
3dd68bb
Do not route failures unless they target a data stream
jbaiera Dec 4, 2023
2a47437
Convert errors into a failure document
jbaiera Dec 13, 2023
ee7cb89
Mark index request to be written to failure store
jbaiera Dec 14, 2023
300b001
Actually redirect failure doc
jbaiera Dec 14, 2023
d45ff24
Fix infinite loop when creating fresh index requests for failures in …
jbaiera Dec 14, 2023
f1782b1
Update docs/changelog/103481.yaml
jbaiera Dec 14, 2023
7cd959a
Refactor calls to ingest service to use new execute method
jbaiera Jan 8, 2024
947afd4
Ingest Service changes tested
jbaiera Jan 9, 2024
4a8e083
Add redirection test to TransportBulkActionIngestTests
jbaiera Jan 10, 2024
3648364
Add rest tests
jbaiera Jan 10, 2024
cccef0a
precommit
jbaiera Jan 10, 2024
59f250b
Comment cleanup
jbaiera Jan 10, 2024
0af09ee
Add javadoc to FailureStoreDocument, convert it to static utility
jbaiera Jan 12, 2024
4a5b8e6
Refactor IngestPipelinesExecutionResult to use static constants where…
jbaiera Jan 12, 2024
523ae5a
Replace kept with shouldKeep
jbaiera Jan 12, 2024
8e66e25
Add executeBulkRequest javadoc
jbaiera Jan 16, 2024
2f741b4
Simplify exception names
jbaiera Jan 16, 2024
786e0d5
Use cached timestamp in failure store detection.
jbaiera Jan 16, 2024
bfb4343
Add javadoc to bulk action methods
jbaiera Jan 17, 2024
c86f26f
Suppress the more niche exception
jbaiera Jan 17, 2024
1263146
Fully null safe index requests for marking items for failure store.
jbaiera Jan 17, 2024
1b82e25
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Jan 29, 2024
a2ab635
Harden the checks in IndexRequest
jbaiera Jan 29, 2024
13f311a
precommit
jbaiera Jan 29, 2024
b93cc93
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Jan 30, 2024
b78932c
Small doc updates related to double count bug.
jbaiera Jan 30, 2024
18617ef
Fix a mistake from the pr comment updates
jbaiera Jan 30, 2024
a680f5c
Persist the metadata in the ingest service instead of on ingest docum…
jbaiera Jan 31, 2024
46ec425
Revert adding the original metadata to an ingest document
jbaiera Jan 31, 2024
c60fd8a
Rework the exception handling logic into a consumer so that it isn't …
jbaiera Feb 1, 2024
5142491
Add dev time assertion in markItemForFailureStore
jbaiera Feb 1, 2024
1950dfc
Add unit tests for checking data streams for failure store compatibil…
jbaiera Feb 2, 2024
febae29
spotless
jbaiera Feb 2, 2024
8af1cf5
Merge branch 'main' into data-stream-ingest-failure-redirect
jbaiera Feb 2, 2024
01d4ea6
Re-apply changes to surprise refactoring
jbaiera Feb 2, 2024
c83fd50
spotless please
jbaiera Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/103481.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103481
summary: Redirect failed ingest node operations to a failure store when available
area: Data streams
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
teardown:
- do:
indices.delete_data_stream:
name: logs-foobar
ignore: 404

- do:
indices.delete:
index: .fs-logs-foobar-*
ignore: 404

- do:
indices.delete_index_template:
name: generic_logs_template
ignore: 404

- do:
ingest.delete_pipeline:
id: "failing_pipeline"
ignore: 404

---
"Redirect ingest failure in data stream to failure store":
- skip:
version: " - 8.12.99"
reason: "data stream failure stores only redirect ingest failures in 8.13+"
features: [allowed_warnings, contains]

- do:
ingest.put_pipeline:
id: "failing_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "error_message"
}
}
]
}
- match: { acknowledged: true }

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "failing_pipeline"

- do:
index:
index: logs-foobar
refresh: true
body:
'@timestamp': '2020-12-12'
foo: bar

- do:
indices.get_data_stream:
name: logs-foobar
- match: { data_streams.0.name: logs-foobar }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.failure_store: true }
- length: { data_streams.0.failure_indices: 1 }
- match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
search:
index: logs-foobar
body: { query: { match_all: {} } }
- length: { hits.hits: 0 }

- do:
search:
index: .fs-logs-foobar-*
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- exists: hits.hits.0._source.@timestamp
- not_exists: hits.hits.0._source.foo
- not_exists: hits.hits.0._source.document.id
- match: { hits.hits.0._source.document.index: 'logs-foobar' }
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
- match: { hits.hits.0._source.document.source.foo: 'bar' }
- match: { hits.hits.0._source.error.type: 'fail_processor_exception' }
- match: { hits.hits.0._source.error.message: 'error_message' }
- contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' }

- do:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged

- do:
indices.delete:
index: .fs-logs-foobar-*
- is_true: acknowledged
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store.
*/
public final class FailureStoreDocument {

private FailureStoreDocument() {}

/**
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
* new {@link IndexRequest} that can be stored in a data stream's failure store.
* @param source The original request that has failed to be ingested
* @param exception The exception that was thrown that caused the request to fail to be ingested
* @param targetIndexName The index that the request was targeting at time of failure
* @return A new {@link IndexRequest} with a failure store compliant structure
* @throws IOException If there is a problem when the document's new source is serialized
*/
public static IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException {
return transformFailedRequest(source, exception, targetIndexName, System::currentTimeMillis);
}

/**
* Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a
* new {@link IndexRequest} that can be stored in a data stream's failure store.
* @param source The original request that has failed to be ingested
* @param exception The exception that was thrown that caused the request to fail to be ingested
* @param targetIndexName The index that the request was targeting at time of failure
* @param timeSupplier Supplies the value for the document's timestamp
* @return A new {@link IndexRequest} with a failure store compliant structure
* @throws IOException If there is a problem when the document's new source is serialized
*/
public static IndexRequest transformFailedRequest(
IndexRequest source,
Exception exception,
String targetIndexName,
Supplier<Long> timeSupplier
) throws IOException {
return new IndexRequest().index(targetIndexName)
.source(createSource(source, exception, targetIndexName, timeSupplier))
.opType(DocWriteRequest.OpType.CREATE)
.setWriteToFailureStore(true);
}

private static XContentBuilder createSource(
IndexRequest source,
Exception exception,
String targetIndexName,
Supplier<Long> timeSupplier
) throws IOException {
Objects.requireNonNull(source, "source must not be null");
Objects.requireNonNull(exception, "exception must not be null");
Objects.requireNonNull(targetIndexName, "targetIndexName must not be null");
Objects.requireNonNull(timeSupplier, "timeSupplier must not be null");
Throwable unwrapped = ExceptionsHelper.unwrapCause(exception);
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
{
builder.timeField("@timestamp", timeSupplier.get());
builder.startObject("document");
{
if (source.id() != null) {
builder.field("id", source.id());
}
if (source.routing() != null) {
builder.field("routing", source.routing());
}
builder.field("index", source.index());
// Unmapped source field
builder.startObject("source");
{
builder.mapContents(source.sourceAsMap());
}
builder.endObject();
}
builder.endObject();
builder.startObject("error");
{
builder.field("type", ElasticsearchException.getExceptionName(unwrapped));
builder.field("message", unwrapped.getMessage());
builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped));
// Further fields not yet tracked (Need to expose via specific exceptions)
// - pipeline
// - pipeline_trace
// - processor
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
Loading