diff --git a/docs/changelog/103481.yaml b/docs/changelog/103481.yaml new file mode 100644 index 0000000000000..f7c7c0b6eecc9 --- /dev/null +++ b/docs/changelog/103481.yaml @@ -0,0 +1,5 @@ +pr: 103481 +summary: Redirect failed ingest node operations to a failure store when available +area: Data streams +type: feature +issues: [] diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml new file mode 100644 index 0000000000000..b9621977ff3aa --- /dev/null +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml @@ -0,0 +1,110 @@ +--- +teardown: + - do: + indices.delete_data_stream: + name: logs-foobar + ignore: 404 + + - do: + indices.delete: + index: .fs-logs-foobar-* + ignore: 404 + + - do: + indices.delete_index_template: + name: generic_logs_template + ignore: 404 + + - do: + ingest.delete_pipeline: + id: "failing_pipeline" + ignore: 404 + +--- +"Redirect ingest failure in data stream to failure store": + - skip: + version: " - 8.12.99" + reason: "data stream failure stores only redirect ingest failures in 8.13+" + features: [allowed_warnings, contains] + + - do: + ingest.put_pipeline: + id: "failing_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "fail" : { + "message" : "error_message" + } + } + ] + } + - match: { acknowledged: true } + + - do: + allowed_warnings: + - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" + indices.put_index_template: + name: generic_logs_template + body: + index_patterns: logs-* + data_stream: + failure_store: true + template: + settings: + number_of_shards: 1 + number_of_replicas: 1 + index: + default_pipeline: "failing_pipeline" + + - do: + index: + index: logs-foobar + refresh: true + body: + '@timestamp': '2020-12-12' + foo: bar + + - do: + indices.get_data_stream: + name: logs-foobar + - match: { data_streams.0.name: logs-foobar } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - length: { data_streams.0.indices: 1 } + - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { data_streams.0.failure_store: true } + - length: { data_streams.0.failure_indices: 1 } + - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + + - do: + search: + index: logs-foobar + body: { query: { match_all: {} } } + - length: { hits.hits: 0 } + + - do: + search: + index: .fs-logs-foobar-* + - length: { hits.hits: 1 } + - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" } + - exists: hits.hits.0._source.@timestamp + - not_exists: hits.hits.0._source.foo + - not_exists: hits.hits.0._source.document.id + - match: { hits.hits.0._source.document.index: 'logs-foobar' } + - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' } + - match: { hits.hits.0._source.document.source.foo: 'bar' } + - match: { hits.hits.0._source.error.type: 'fail_processor_exception' } + - match: { hits.hits.0._source.error.message: 'error_message' } + - contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' } + + - do: + indices.delete_data_stream: + name: logs-foobar + - is_true: acknowledged + + - do: + indices.delete: + index: .fs-logs-foobar-* + - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index e42ddd41b0b0a..5e630bf9cdef5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -8,16 +8,22 @@ package org.elasticsearch.action.bulk; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SparseFixedBitSet; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Assertions; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.ingest.IngestService; +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -30,8 +36,17 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +/** + * Manages mutations to a bulk request that arise from the application of ingest pipelines. The modifier acts as an iterator over the + * documents of a bulk request, keeping a record of all dropped and failed write requests in the overall bulk operation. + * Once all pipelines have been applied, the modifier is used to create a new bulk request that will be used for executing the + * remaining writes. When this final bulk operation is completed, the modifier is used to combine the results with those from the + * ingest service to create the final bulk response. + */ final class BulkRequestModifier implements Iterator> { + private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class); + private static final String DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated"; final BulkRequest bulkRequest; @@ -58,6 +73,13 @@ public boolean hasNext() { return (currentSlot + 1) < bulkRequest.requests().size(); } + /** + * Creates a new bulk request containing all documents from the original bulk request that have not been marked as failed + * or dropped. Any failed or dropped documents are tracked as a side effect of this call so that they may be reflected in the + * final bulk response. + * + * @return A new bulk request without the write operations removed during any ingest pipeline executions. + */ BulkRequest getBulkRequest() { if (itemResponses.isEmpty()) { return bulkRequest; @@ -80,6 +102,15 @@ BulkRequest getBulkRequest() { } } + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + * + * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result. + * @param actionListener The action listener that expects the final bulk response. + * @return An action listener that combines ingest failure results with the results from writing the remaining documents. + */ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { return actionListener.map( @@ -138,6 +169,11 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte } } + /** + * Mark the document at the given slot in the bulk request as having failed in the ingest service. + * @param slot the slot in the bulk request to mark as failed. + * @param e the failure encountered. + */ synchronized void markItemAsFailed(int slot, Exception e) { final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); @@ -150,6 +186,10 @@ synchronized void markItemAsFailed(int slot, Exception e) { itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure)); } + /** + * Mark the document at the given slot in the bulk request as having been dropped by the ingest service. + * @param slot the slot in the bulk request to mark as dropped. + */ synchronized void markItemAsDropped(int slot) { final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); @@ -164,4 +204,67 @@ synchronized void markItemAsDropped(int slot) { ); itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped)); } + + /** + * Mark the document at the given slot in the bulk request as having failed in the ingest service. The document will be redirected + * to a data stream's failure store. + * @param slot the slot in the bulk request to redirect. + * @param targetIndexName the index that the document was targeting at the time of failure. + * @param e the failure encountered. + */ + public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { + if (DataStream.isFailureStoreEnabled() == false) { + // Assert false for development, but if we somehow find ourselves here, default to failure logic. + assert false + : "Attempting to route a failed write request type to a failure store but the failure store is not enabled! " + + "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; + markItemAsFailed(slot, e); + } else { + // We get the index write request to find the source of the failed document + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(bulkRequest.requests().get(slot)); + if (indexRequest == null) { + // This is unlikely to happen ever since only source oriented operations (index, create, upsert) are considered for + // ingest, but if it does happen, attempt to trip an assertion. If running in production, be defensive: Mark it failed + // as normal, and log the info for later debugging if needed. + assert false + : "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. " + + "type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + + targetIndexName + + "]"; + markItemAsFailed(slot, e); + logger.debug( + () -> "Attempted to redirect an invalid write operation after ingest failure - type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + + targetIndexName + + "]" + ); + } else { + try { + IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName); + // This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to + // the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. + errorDocument.isPipelineResolved(true); + errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); + errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); + bulkRequest.requests.set(slot, errorDocument); + } catch (IOException ioException) { + // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it + // if we need the info later. + e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one. + logger.debug( + () -> "Encountered exception while attempting to redirect a failed ingest operation: index [" + + targetIndexName + + "], source: [" + + indexRequest.source().utf8ToString() + + "]", + ioException + ); + markItemAsFailed(slot, e); + } + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java new file mode 100644 index 0000000000000..e0d6e8200e86d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -0,0 +1,111 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store. + */ +public final class FailureStoreDocument { + + private FailureStoreDocument() {} + + /** + * Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a + * new {@link IndexRequest} that can be stored in a data stream's failure store. + * @param source The original request that has failed to be ingested + * @param exception The exception that was thrown that caused the request to fail to be ingested + * @param targetIndexName The index that the request was targeting at time of failure + * @return A new {@link IndexRequest} with a failure store compliant structure + * @throws IOException If there is a problem when the document's new source is serialized + */ + public static IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException { + return transformFailedRequest(source, exception, targetIndexName, System::currentTimeMillis); + } + + /** + * Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a + * new {@link IndexRequest} that can be stored in a data stream's failure store. + * @param source The original request that has failed to be ingested + * @param exception The exception that was thrown that caused the request to fail to be ingested + * @param targetIndexName The index that the request was targeting at time of failure + * @param timeSupplier Supplies the value for the document's timestamp + * @return A new {@link IndexRequest} with a failure store compliant structure + * @throws IOException If there is a problem when the document's new source is serialized + */ + public static IndexRequest transformFailedRequest( + IndexRequest source, + Exception exception, + String targetIndexName, + Supplier timeSupplier + ) throws IOException { + return new IndexRequest().index(targetIndexName) + .source(createSource(source, exception, targetIndexName, timeSupplier)) + .opType(DocWriteRequest.OpType.CREATE) + .setWriteToFailureStore(true); + } + + private static XContentBuilder createSource( + IndexRequest source, + Exception exception, + String targetIndexName, + Supplier timeSupplier + ) throws IOException { + Objects.requireNonNull(source, "source must not be null"); + Objects.requireNonNull(exception, "exception must not be null"); + Objects.requireNonNull(targetIndexName, "targetIndexName must not be null"); + Objects.requireNonNull(timeSupplier, "timeSupplier must not be null"); + Throwable unwrapped = ExceptionsHelper.unwrapCause(exception); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + { + builder.timeField("@timestamp", timeSupplier.get()); + builder.startObject("document"); + { + if (source.id() != null) { + builder.field("id", source.id()); + } + if (source.routing() != null) { + builder.field("routing", source.routing()); + } + builder.field("index", source.index()); + // Unmapped source field + builder.startObject("source"); + { + builder.mapContents(source.sourceAsMap()); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject("error"); + { + builder.field("type", ElasticsearchException.getExceptionName(unwrapped)); + builder.field("message", unwrapped.getMessage()); + builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped)); + // Further fields not yet tracked (Need to expose via specific exceptions) + // - pipeline + // - pipeline_trace + // - processor + } + builder.endObject(); + } + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index e33f3c71e0076..2f12008501487 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -36,10 +36,12 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; @@ -48,6 +50,7 @@ import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; @@ -62,6 +65,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; @@ -316,7 +320,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executorName, l); + processBulkIndexIngestRequest(task, bulkRequest, executorName, metadata, l); } else { ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); } @@ -624,6 +628,7 @@ private void processBulkIndexIngestRequest( Task task, BulkRequest original, String executorName, + Metadata metadata, ActionListener listener ) { final long ingestStartTimeInNanos = System.nanoTime(); @@ -632,6 +637,8 @@ private void processBulkIndexIngestRequest( original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsDropped, + (indexName) -> shouldStoreFailure(indexName, metadata, threadPool.absoluteTimeInMillis()), + bulkRequestModifier::markItemForFailureStore, bulkRequestModifier::markItemAsFailed, (originalThread, exception) -> { if (exception != null) { @@ -679,4 +686,79 @@ public boolean isForceExecution() { ); } + /** + * Determines if an index name is associated with either an existing data stream or a template + * for one that has the failure store enabled. + * @param indexName The index name to check. + * @param metadata Cluster state metadata. + * @param epochMillis A timestamp to use when resolving date math in the index name. + * @return true if the given index name corresponds to a data stream with a failure store, + * or if it matches a template that has a data stream failure store enabled. + */ + static boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) { + return DataStream.isFailureStoreEnabled() + && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or( + () -> resolveFailureStoreFromTemplate(indexName, metadata) + ).orElse(false); + } + + /** + * Determines if an index name is associated with an existing data stream that has a failure store enabled. + * @param indexName The index name to check. + * @param metadata Cluster state metadata. + * @param epochMillis A timestamp to use when resolving date math in the index name. + * @return true if the given index name corresponds to an existing data stream with a failure store enabled. + */ + private static Optional resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) { + if (indexName == null) { + return Optional.empty(); + } + + // Get index abstraction, resolving date math if it exists + IndexAbstraction indexAbstraction = metadata.getIndicesLookup() + .get(IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis)); + + // We only store failures if the failure is being written to a data stream, + // not when directly writing to backing indices/failure stores + if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) { + return Optional.empty(); + } + + // Locate the write index for the abstraction, and check if it has a data stream associated with it. + // This handles alias resolution as well as data stream resolution. + Index writeIndex = indexAbstraction.getWriteIndex(); + assert writeIndex != null : "Could not resolve write index for resource [" + indexName + "]"; + IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName()); + DataStream targetDataStream = writeAbstraction.getParentDataStream(); + + // We will store the failure if the write target belongs to a data stream with a failure store. + return Optional.of(targetDataStream != null && targetDataStream.isFailureStore()); + } + + /** + * Determines if an index name is associated with an index template that has a data stream failure store enabled. + * @param indexName The index name to check. + * @param metadata Cluster state metadata. + * @return true if the given index name corresponds to an index template with a data stream failure store enabled. + */ + private static Optional resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { + if (indexName == null) { + return Optional.empty(); + } + + // Check to see if the index name matches any templates such that an index would have been attributed + // We don't check v1 templates at all because failure stores can only exist on data streams via a v2 template + String template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false); + if (template != null) { + // Check if this is a data stream template or if it is just a normal index. + ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(template); + if (composableIndexTemplate.getDataStreamTemplate() != null) { + // Check if the data stream has the failure store enabled + return Optional.of(composableIndexTemplate.getDataStreamTemplate().hasFailureStore()); + } + } + + // Could not locate a failure store via template + return Optional.empty(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index b1ad328abda92..13ae065844318 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.index; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; @@ -19,6 +20,7 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.internal.Requests; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRouting; @@ -110,6 +112,11 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean requireDataStream; + /** + * Transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. + */ + private boolean writeToFailureStore = false; + /** * This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document */ @@ -821,7 +828,25 @@ public IndexRequest setRequireDataStream(boolean requireDataStream) { @Override public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { - return ia.getWriteIndex(this, metadata); + if (DataStream.isFailureStoreEnabled() && writeToFailureStore) { + if (ia.isDataStreamRelated() == false) { + throw new ElasticsearchException( + "Attempting to write a document to a failure store but the targeted index is not a data stream" + ); + } + // Resolve write index and get parent data stream to handle the case of dealing with an alias + String defaultWriteIndexName = ia.getWriteIndex().getName(); + DataStream dataStream = metadata.getIndicesLookup().get(defaultWriteIndexName).getParentDataStream(); + if (dataStream.getFailureIndices().size() < 1) { + throw new ElasticsearchException( + "Attempting to write a document to a failure store but the target data stream does not have one enabled" + ); + } + return dataStream.getFailureIndices().get(dataStream.getFailureIndices().size() - 1); + } else { + // Resolve as normal + return ia.getWriteIndex(this, metadata); + } } @Override @@ -834,6 +859,15 @@ public IndexRequest setRequireAlias(boolean requireAlias) { return this; } + public boolean isWriteToFailureStore() { + return writeToFailureStore; + } + + public IndexRequest setWriteToFailureStore(boolean writeToFailureStore) { + this.writeToFailureStore = writeToFailureStore; + return this; + } + public IndexRequest setListExecutedPipelines(boolean listExecutedPipelines) { this.listExecutedPipelines = listExecutedPipelines; return this; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3a2a810dc61b5..1f82ebd786e98 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; @@ -668,10 +669,41 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } + private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) { + + private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); + private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, false, null, null); + private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { + return new IngestPipelinesExecutionResult(false, true, e, index); + } + } + + /** + * Executes all applicable pipelines for a collection of documents. + * @param numberOfActionRequests The total number of requests to process. + * @param actionRequests The collection of requests to be processed. + * @param onDropped A callback executed when a document is dropped by a pipeline. + * Accepts the slot in the collection of requests that the document occupies. + * @param shouldStoreFailure A predicate executed on each ingest failure to determine if the + * failure should be stored somewhere. + * @param onStoreFailure A callback executed when a document fails ingest but the failure should + * be persisted elsewhere. Accepts the slot in the collection of requests + * that the document occupies, the index name that the request was targeting + * at the time of failure, and the exception that the document encountered. + * @param onFailure A callback executed when a document fails ingestion and does not need to be + * persisted. Accepts the slot in the collection of requests that the document + * occupies, and the exception that the document encountered. + * @param onCompletion A callback executed once all documents have been processed. Accepts the thread + * that ingestion completed on or an exception in the event that the entire operation + * has failed. + * @param executorName Which executor the bulk request should be executed on. + */ public void executeBulkRequest( final int numberOfActionRequests, final Iterable> actionRequests, final IntConsumer onDropped, + final Predicate shouldStoreFailure, + final TriConsumer onStoreFailure, final BiConsumer onFailure, final BiConsumer onCompletion, final String executorName @@ -708,34 +740,45 @@ protected void doRun() { totalMetrics.preIngest(); final int slot = i; final Releasable ref = refs.acquire(); + DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); + final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); + final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). - final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { - @Override - public void onResponse(Boolean kept) { - assert kept != null; - if (kept == false) { - onDropped.accept(slot); + final ActionListener documentListener = ActionListener.runAfter( + new ActionListener<>() { + @Override + public void onResponse(IngestPipelinesExecutionResult result) { + assert result != null; + if (result.success) { + if (result.shouldKeep == false) { + onDropped.accept(slot); + } + } else { + // We were given a failure result in the onResponse method, so we must store the failure + // Recover the original document state, track a failed ingest, and pass it along + updateIndexRequestMetadata(indexRequest, originalDocumentMetadata); + totalMetrics.ingestFailed(); + onStoreFailure.apply(slot, result.failedIndex, result.exception); + } } - } - @Override - public void onFailure(Exception e) { - totalMetrics.ingestFailed(); - onFailure.accept(slot, e); + @Override + public void onFailure(Exception e) { + totalMetrics.ingestFailed(); + onFailure.accept(slot, e); + } + }, + () -> { + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + ref.close(); } - }, () -> { - // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate - // that we're finished with this document - final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - totalMetrics.postIngest(ingestTimeInNanos); - ref.close(); - }); - DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); - - IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); + ); - executePipelines(pipelines, indexRequest, ingestDocument, documentListener); + executePipelines(pipelines, indexRequest, ingestDocument, shouldStoreFailure, documentListener); indexRequest.setPipelinesHaveRun(); assert actionRequest.index() != null; @@ -825,7 +868,8 @@ private void executePipelines( final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final Predicate shouldStoreFailure, + final ActionListener listener ) { assert pipelines.hasNext(); PipelineSlot slot = pipelines.next(); @@ -835,13 +879,20 @@ private void executePipelines( // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet ingestDocument.resetReroute(); + final String originalIndex = indexRequest.indices()[0]; + final Consumer exceptionHandler = (Exception e) -> { + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); + } else { + listener.onFailure(e); + } + }; try { if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } indexRequest.addPipeline(pipelineId); - final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; @@ -855,12 +906,12 @@ private void executePipelines( ), e ); - listener.onFailure(e); + exceptionHandler.accept(e); return; // document failed! } if (keep == false) { - listener.onResponse(false); + listener.onResponse(IngestPipelinesExecutionResult.DISCARD_RESULT); return; // document dropped! } @@ -875,7 +926,7 @@ private void executePipelines( } catch (IllegalArgumentException ex) { // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. // In that case, we catch and wrap the exception, so we can include more details - listener.onFailure( + exceptionHandler.accept( new IllegalArgumentException( format( "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", @@ -895,7 +946,7 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { // final pipelines cannot change the target index (either directly or by way of a reroute) if (isFinalPipeline) { - listener.onFailure( + exceptionHandler.accept( new IllegalStateException( format( "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", @@ -914,7 +965,7 @@ private void executePipelines( if (cycle) { List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); indexCycle.add(newIndex); - listener.onFailure( + exceptionHandler.accept( new IllegalStateException( format( "index cycle detected while processing pipeline [%s] for document [%s]: %s", @@ -941,12 +992,12 @@ private void executePipelines( } if (newPipelines.hasNext()) { - executePipelines(newPipelines, indexRequest, ingestDocument, listener); + executePipelines(newPipelines, indexRequest, ingestDocument, shouldStoreFailure, listener); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); cacheRawTimestamp(indexRequest, ingestDocument); - listener.onResponse(true); // document succeeded! + listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded! } }); } catch (Exception e) { @@ -954,7 +1005,7 @@ private void executePipelines( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), e ); - listener.onFailure(e); // document failed! + exceptionHandler.accept(e); // document failed } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java new file mode 100644 index 0000000000000..92fa67e9a6ffc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.xcontent.ObjectPath; +import org.elasticsearch.xcontent.json.JsonXContent; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.startsWith; + +public class FailureStoreDocumentTests extends ESTestCase { + + public void testFailureStoreDocumentConverstion() throws Exception { + IndexRequest source = new IndexRequest("original_index").routing("fake_routing") + .id("1") + .source(JsonXContent.contentBuilder().startObject().field("key", "value").endObject()); + + // The exception will be wrapped for the test to make sure the converter correctly unwraps it + Exception exception = new ElasticsearchException("Test exception please ignore"); + exception = new RemoteTransportException("Test exception wrapper, please ignore", exception); + + String targetIndexName = "rerouted_index"; + long testTime = 1702357200000L; // 2023-12-12T05:00:00.000Z + + IndexRequest convertedRequest = FailureStoreDocument.transformFailedRequest(source, exception, targetIndexName, () -> testTime); + + // Retargeting write + assertThat(convertedRequest.id(), is(nullValue())); + assertThat(convertedRequest.routing(), is(nullValue())); + assertThat(convertedRequest.index(), is(equalTo(targetIndexName))); + assertThat(convertedRequest.opType(), is(DocWriteRequest.OpType.CREATE)); + + // Original document content is no longer in same place + assertThat("Expected original document to be modified", convertedRequest.sourceAsMap().get("key"), is(nullValue())); + + // Assert document contents + assertThat(ObjectPath.eval("@timestamp", convertedRequest.sourceAsMap()), is(equalTo("2023-12-12T05:00:00.000Z"))); + + assertThat(ObjectPath.eval("document.id", convertedRequest.sourceAsMap()), is(equalTo("1"))); + assertThat(ObjectPath.eval("document.routing", convertedRequest.sourceAsMap()), is(equalTo("fake_routing"))); + assertThat(ObjectPath.eval("document.index", convertedRequest.sourceAsMap()), is(equalTo("original_index"))); + assertThat(ObjectPath.eval("document.source.key", convertedRequest.sourceAsMap()), is(equalTo("value"))); + + assertThat(ObjectPath.eval("error.type", convertedRequest.sourceAsMap()), is(equalTo("exception"))); + assertThat(ObjectPath.eval("error.message", convertedRequest.sourceAsMap()), is(equalTo("Test exception please ignore"))); + assertThat( + ObjectPath.eval("error.stack_trace", convertedRequest.sourceAsMap()), + startsWith( + "org.elasticsearch.ElasticsearchException: Test exception please ignore\n" + + "\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion" + ) + ); + + assertThat(convertedRequest.isWriteToFailureStore(), is(true)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 188adf396435f..564cf74697194 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; @@ -54,13 +55,16 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Predicate; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; @@ -82,6 +86,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { */ private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline"; private static final String WITH_DEFAULT_PIPELINE_ALIAS = "alias_for_index_with_default_pipeline"; + private static final String WITH_FAILURE_STORE_ENABLED = "data-stream-failure-store-enabled"; private static final Settings SETTINGS = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); @@ -95,6 +100,10 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor + ArgumentCaptor> redirectPredicate; + @Captor + ArgumentCaptor> redirectHandler; + @Captor ArgumentCaptor> failureHandler; @Captor ArgumentCaptor> completionHandler; @@ -174,7 +183,7 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction> req = bulkDocsItr.getValue().iterator(); failureHandler.getValue().accept(0, exception); // have an exception for our one index request indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + assertTrue(redirectPredicate.getValue().test(WITH_FAILURE_STORE_ENABLED + "-1")); // ensure redirects on failure store data stream + assertFalse(redirectPredicate.getValue().test(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices + assertFalse(redirectPredicate.getValue().test("index")); // no redirects for non-existant indices with no templates + redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2) + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); @@ -322,6 +350,8 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -368,6 +398,8 @@ public void testIngestSystemLocal() throws Exception { eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.SYSTEM_WRITE) @@ -401,7 +433,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -441,7 +473,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -525,6 +557,8 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -573,6 +607,8 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -667,6 +703,8 @@ public void testFindDefaultPipelineFromTemplateMatch() { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -705,6 +743,8 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -732,6 +772,8 @@ public void testIngestCallbackExceptionHandled() throws Exception { eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -769,6 +811,8 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index c3a1747902893..6f3767892e7a4 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -21,7 +21,9 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexAbstraction.ConcreteIndex; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -52,6 +54,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -61,6 +64,7 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; public class TransportBulkActionTests extends ESTestCase { @@ -336,6 +340,100 @@ public void testRejectionAfterCreateIndexIsPropagated() throws Exception { } } + public void testResolveFailureStoreFromMetadata() throws Exception { + assumeThat(DataStream.isFailureStoreEnabled(), is(true)); + + String dataStreamWithFailureStore = "test-data-stream-failure-enabled"; + String dataStreamWithoutFailureStore = "test-data-stream-failure-disabled"; + long testTime = randomMillisUpToYear9999(); + + IndexMetadata backingIndex1 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithFailureStore, testTime).build(); + IndexMetadata backingIndex2 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithoutFailureStore, testTime).build(); + IndexMetadata failureStoreIndex1 = DataStreamTestHelper.createFirstFailureStore(dataStreamWithFailureStore, testTime).build(); + + Metadata metadata = Metadata.builder() + .dataStreams( + Map.of( + dataStreamWithFailureStore, + DataStreamTestHelper.newInstance( + dataStreamWithFailureStore, + List.of(backingIndex1.getIndex()), + 1L, + Map.of(), + false, + null, + List.of(failureStoreIndex1.getIndex()) + ), + dataStreamWithoutFailureStore, + DataStreamTestHelper.newInstance( + dataStreamWithoutFailureStore, + List.of(backingIndex2.getIndex()), + 1L, + Map.of(), + false, + null, + List.of() + ) + ), + Map.of() + ) + .indices( + Map.of( + backingIndex1.getIndex().getName(), + backingIndex1, + backingIndex2.getIndex().getName(), + backingIndex2, + failureStoreIndex1.getIndex().getName(), + failureStoreIndex1 + ) + ) + .build(); + + // Data stream with failure store should store failures + assertThat(TransportBulkAction.shouldStoreFailure(dataStreamWithFailureStore, metadata, testTime), is(true)); + // Data stream without failure store should not + assertThat(TransportBulkAction.shouldStoreFailure(dataStreamWithoutFailureStore, metadata, testTime), is(false)); + // An index should not be considered for failure storage + assertThat(TransportBulkAction.shouldStoreFailure(backingIndex1.getIndex().getName(), metadata, testTime), is(false)); + // even if that index is itself a failure store + assertThat(TransportBulkAction.shouldStoreFailure(failureStoreIndex1.getIndex().getName(), metadata, testTime), is(false)); + } + + public void testResolveFailureStoreFromTemplate() throws Exception { + assumeThat(DataStream.isFailureStoreEnabled(), is(true)); + + String dsTemplateWithFailureStore = "test-data-stream-failure-enabled"; + String dsTemplateWithoutFailureStore = "test-data-stream-failure-disabled"; + String indexTemplate = "test-index"; + long testTime = randomMillisUpToYear9999(); + + Metadata metadata = Metadata.builder() + .indexTemplates( + Map.of( + dsTemplateWithFailureStore, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dsTemplateWithFailureStore + "-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true)) + .build(), + dsTemplateWithoutFailureStore, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dsTemplateWithoutFailureStore + "-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, false)) + .build(), + indexTemplate, + ComposableIndexTemplate.builder().indexPatterns(List.of(indexTemplate + "-*")).build() + ) + ) + .build(); + + // Data stream with failure store should store failures + assertThat(TransportBulkAction.shouldStoreFailure(dsTemplateWithFailureStore + "-1", metadata, testTime), is(true)); + // Data stream without failure store should not + assertThat(TransportBulkAction.shouldStoreFailure(dsTemplateWithoutFailureStore + "-1", metadata, testTime), is(false)); + // An index template should not be considered for failure storage + assertThat(TransportBulkAction.shouldStoreFailure(indexTemplate + "-1", metadata, testTime), is(false)); + } + private BulkRequest buildBulkRequest(List indices) { BulkRequest request = new BulkRequest(); for (String index : indices) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index d345197d88a23..26aa5b1e0454f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -91,6 +92,7 @@ import java.util.function.Consumer; import java.util.function.IntConsumer; import java.util.function.LongSupplier; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -208,7 +210,16 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1111,6 +1122,8 @@ public String getType() { bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -1154,6 +1167,8 @@ public void testExecuteBulkPipelineDoesNotExist() { bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -1218,6 +1233,8 @@ public void close() { bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -1247,7 +1264,16 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1280,7 +1306,16 @@ public void testDynamicTemplates() throws Exception { CountDownLatch latch = new CountDownLatch(1); final BiConsumer failureHandler = (v, e) -> { throw new AssertionError("must never fail", e); }; final BiConsumer completionHandler = (t, e) -> latch.countDown(); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); latch.await(); assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz"))); } @@ -1301,7 +1336,16 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1355,7 +1399,16 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1404,7 +1457,16 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1453,7 +1515,16 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1496,7 +1567,16 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1554,6 +1634,8 @@ public void testBulkRequestExecutionWithFailures() throws Exception { numRequest, bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), requestItemErrorHandler, completionHandler, Names.WRITE @@ -1563,6 +1645,184 @@ public void testBulkRequestExecutionWithFailures() throws Exception { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } + public void testExecuteFailureRedirection() throws Exception { + final CompoundProcessor processor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Map.of( + "mock", + (factories, tag, description, config) -> processor, + "set", + (factories, tag, description, config) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail()) + ) + ); + PutPipelineRequest putRequest1 = new PutPipelineRequest( + "_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + // given that set -> fail() above, it's a failure if a document executes against this pipeline + PutPipelineRequest putRequest2 = new PutPipelineRequest( + "_id2", + new BytesArray("{\"processors\": [{\"set\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest1, clusterState); + clusterState = executePut(putRequest2, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .source(Map.of()) + .setPipeline("_id1") + .setFinalPipeline("_id2"); + doThrow(new RuntimeException()).when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + final Predicate redirectCheck = (idx) -> indexRequest.index().equals(idx); + @SuppressWarnings("unchecked") + final TriConsumer redirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + redirectCheck, + redirectHandler, + failureHandler, + completionHandler, + Names.WRITE + ); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + verify(redirectHandler, times(1)).apply(eq(0), eq(indexRequest.index()), any(RuntimeException.class)); + verifyNoInteractions(failureHandler); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + + public void testExecuteFailureRedirectionWithNestedOnFailure() throws Exception { + final Processor processor = mock(Processor.class); + when(processor.isAsync()).thenReturn(true); + final Processor onFailureProcessor = mock(Processor.class); + when(onFailureProcessor.isAsync()).thenReturn(true); + final Processor onFailureOnFailureProcessor = mock(Processor.class); + when(onFailureOnFailureProcessor.isAsync()).thenReturn(true); + final List processors = List.of(onFailureProcessor); + final List onFailureProcessors = List.of(onFailureOnFailureProcessor); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(new CompoundProcessor(false, processors, onFailureProcessors)) + ); + IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> compoundProcessor)); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .source(Map.of()) + .setPipeline("_id") + .setFinalPipeline("_none"); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + doThrow(new RuntimeException()).when(onFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + doThrow(new RuntimeException()).when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + final Predicate redirectPredicate = (idx) -> indexRequest.index().equals(idx); + @SuppressWarnings("unchecked") + final TriConsumer redirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + redirectPredicate, + redirectHandler, + failureHandler, + completionHandler, + Names.WRITE + ); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + verify(redirectHandler, times(1)).apply(eq(0), eq(indexRequest.index()), any(RuntimeException.class)); + verifyNoInteractions(failureHandler); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + + public void testBulkRequestExecutionWithRedirectedFailures() throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + int numRequest = scaledRandomIntBetween(8, 64); + int numIndexRequests = 0; + for (int i = 0; i < numRequest; i++) { + DocWriteRequest request; + if (randomBoolean()) { + if (randomBoolean()) { + request = new DeleteRequest("_index", "_id"); + } else { + request = new UpdateRequest("_index", "_id"); + } + } else { + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + request = indexRequest; + numIndexRequests++; + } + bulkRequest.add(request); + } + + CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.isAsync()).thenReturn(true); + when(processor.getProcessors()).thenReturn(List.of(mock(Processor.class))); + Exception error = new RuntimeException(); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, error); + return null; + }).when(processor).execute(any(), any()); + IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + TriConsumer requestItemRedirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + numRequest, + bulkRequest.requests(), + indexReq -> {}, + (s) -> true, + requestItemRedirectHandler, + requestItemErrorHandler, + completionHandler, + Names.WRITE + ); + + verify(requestItemRedirectHandler, times(numIndexRequests)).apply(anyInt(), anyString(), argThat(e -> e.getCause().equals(error))); + verifyNoInteractions(requestItemErrorHandler); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + public void testBulkRequestExecution() throws Exception { BulkRequest bulkRequest = new BulkRequest(); String pipelineId = "_id"; @@ -1612,6 +1872,8 @@ public void testBulkRequestExecution() throws Exception { numRequest, bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), requestItemErrorHandler, completionHandler, Names.WRITE @@ -1721,7 +1983,16 @@ public String execute() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_id2"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); { final IngestStats ingestStats = ingestService.stats(); @@ -1792,7 +2063,16 @@ public void testStats() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2)); @@ -1809,7 +2089,16 @@ public void testStats() throws Exception { assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1831,7 +2120,16 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1854,7 +2152,16 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1946,6 +2253,8 @@ public String getDescription() { bulkRequest.numberOfActions(), bulkRequest.requests(), dropHandler, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -2030,7 +2339,16 @@ public void testCBORParsing() throws Exception { .setPipeline("_id") .setFinalPipeline("_none"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); } assertThat(reference.get(), is(instanceOf(byte[].class))); @@ -2101,7 +2419,16 @@ public void testSetsRawTimestamp() { bulkRequest.add(indexRequest6); bulkRequest.add(indexRequest7); bulkRequest.add(indexRequest8); - ingestService.executeBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest( + 8, + bulkRequest.requests(), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); assertThat(indexRequest1.getRawTimestamp(), nullValue()); assertThat(indexRequest2.getRawTimestamp(), nullValue()); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index d0b30bff92f3e..3a47e0885f2d2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -124,7 +124,20 @@ public static DataStream newInstance( @Nullable DataStreamLifecycle lifecycle, List failureStores ) { - return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, failureStores); + return new DataStream( + name, + indices, + generation, + metadata, + false, + replicated, + false, + false, + null, + lifecycle, + failureStores.size() > 0, + failureStores + ); } public static String getLegacyDefaultBackingIndexName( @@ -169,6 +182,25 @@ public static IndexMetadata.Builder createBackingIndex(String dataStreamName, in .numberOfReplicas(NUMBER_OF_REPLICAS); } + public static IndexMetadata.Builder createFirstFailureStore(String dataStreamName) { + return createFailureStore(dataStreamName, 1, System.currentTimeMillis()); + } + + public static IndexMetadata.Builder createFirstFailureStore(String dataStreamName, long epochMillis) { + return createFailureStore(dataStreamName, 1, epochMillis); + } + + public static IndexMetadata.Builder createFailureStore(String dataStreamName, int generation) { + return createFailureStore(dataStreamName, generation, System.currentTimeMillis()); + } + + public static IndexMetadata.Builder createFailureStore(String dataStreamName, int generation, long epochMillis) { + return IndexMetadata.builder(DataStream.getDefaultFailureStoreName(dataStreamName, generation, epochMillis)) + .settings(SETTINGS) + .numberOfShards(NUMBER_OF_SHARDS) + .numberOfReplicas(NUMBER_OF_REPLICAS); + } + public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) { return IndexMetadata.builder(index.getName()) .settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID()))