From b369c41a8bd067c5535c5ff9128202d042d06e49 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 2 Nov 2023 16:51:06 -0400 Subject: [PATCH 01/36] Add the original metadata to an ingest document --- .../elasticsearch/ingest/IngestDocument.java | 22 +++++++++++++++---- .../ingest/TestIngestDocument.java | 12 ++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 31d947d548ccf..9a9cd1f6fb762 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -52,6 +52,7 @@ public final class IngestDocument { public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: "; static final String TIMESTAMP = "timestamp"; + private final IngestDocMetadata originalMetadata; private final IngestCtxMap ctxMap; private final Map ingestMetadata; @@ -81,9 +82,11 @@ public final class IngestDocument { private boolean reroute = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { - this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + this.originalMetadata = new IngestDocMetadata(index, id, version, routing, versionType, now); + this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, now, source); this.ingestMetadata = new HashMap<>(); - this.ingestMetadata.put(TIMESTAMP, ctxMap.getMetadata().getNow()); + this.ingestMetadata.put(TIMESTAMP, now); this.templateModel = initializeTemplateModel(); // initialize the index history by putting the current index into it @@ -101,6 +104,7 @@ public IngestDocument(String index, String id, long version, String routing, Ver */ public IngestDocument(IngestDocument other) { this( + other.originalMetadata.clone(), new IngestCtxMap(deepCopyMap(ensureNoSelfReferences(other.ctxMap.getSource())), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); @@ -134,7 +138,9 @@ public IngestDocument(Map sourceAndMetadata, Map } } } - this.ctxMap = new IngestCtxMap(source, new IngestDocMetadata(metadata, IngestCtxMap.getTimestamp(ingestMetadata))); + ZonedDateTime timestamp = IngestCtxMap.getTimestamp(ingestMetadata); + this.originalMetadata = new IngestDocMetadata(metadata, timestamp); + this.ctxMap = new IngestCtxMap(source, new IngestDocMetadata(metadata, timestamp)); this.ingestMetadata = new HashMap<>(ingestMetadata); this.templateModel = initializeTemplateModel(); } @@ -142,7 +148,8 @@ public IngestDocument(Map sourceAndMetadata, Map /** * Constructor to create an IngestDocument from its constituent maps. */ - IngestDocument(IngestCtxMap ctxMap, Map ingestMetadata) { + IngestDocument(IngestDocMetadata originalMetadata, IngestCtxMap ctxMap, Map ingestMetadata) { + this.originalMetadata = Objects.requireNonNull(originalMetadata); this.ctxMap = Objects.requireNonNull(ctxMap); this.ingestMetadata = Objects.requireNonNull(ingestMetadata); this.templateModel = initializeTemplateModel(); @@ -730,6 +737,13 @@ public org.elasticsearch.script.Metadata getMetadata() { return ctxMap.getMetadata(); } + /** + * Get the strongly typed metadata + */ + public org.elasticsearch.script.Metadata getOriginalMetadata() { + return originalMetadata; + } + /** * Get all source values in a Map */ diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java b/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java index 3998cf6db1aa5..603f6705b665a 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java @@ -45,7 +45,11 @@ public static IngestDocument ofIngestWithNullableVersion(Map sou metadata.put(key, source.remove(key)); } } - return new IngestDocument(new IngestCtxMap(source, TestIngestCtxMetadata.withNullableVersion(metadata)), ingestMetadata); + return new IngestDocument( + TestIngestCtxMetadata.withNullableVersion(metadata), + new IngestCtxMap(source, TestIngestCtxMetadata.withNullableVersion(metadata)), + ingestMetadata + ); } /** @@ -64,7 +68,11 @@ public static IngestDocument withDefaultVersion(Map sourceAndMet * can observe changes to the map directly. */ public static IngestDocument ofMetadataWithValidator(Map metadata, Map> properties) { - return new IngestDocument(new IngestCtxMap(new HashMap<>(), new TestIngestCtxMetadata(metadata, properties)), new HashMap<>()); + return new IngestDocument( + new TestIngestCtxMetadata(metadata, properties), + new IngestCtxMap(new HashMap<>(), new TestIngestCtxMetadata(metadata, properties)), + new HashMap<>() + ); } /** From b316177e64bc8bde4ba103750d20374dbb97ded2 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 9 Nov 2023 16:11:27 -0500 Subject: [PATCH 02/36] Add additional handler for IngestService which allows for intercepting failure store documents. --- .../elasticsearch/ingest/IngestService.java | 153 +++++++++++++----- 1 file changed, 110 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3a2a810dc61b5..c2eb9d465ba47 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; @@ -675,6 +676,41 @@ public void executeBulkRequest( final BiConsumer onFailure, final BiConsumer onCompletion, final String executorName + ) { + // FIXME: Remove in refactoring step + // Run ingest service as if failure store is disabled + executeBulkRequest( + numberOfActionRequests, + actionRequests, + onDropped, + (s) -> false, + (slot, targetIndex, e) -> onFailure.accept(slot, e), + onFailure, + onCompletion, + executorName + ); + } + + private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) {} + private static IngestPipelinesExecutionResult successResult() { + return new IngestPipelinesExecutionResult(true, true, null, null); + } + private static IngestPipelinesExecutionResult discardResult() { + return new IngestPipelinesExecutionResult(true, false, null, null); + } + private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { + return new IngestPipelinesExecutionResult(false, true, e, index); + } + + public void executeBulkRequest( + final int numberOfActionRequests, + final Iterable> actionRequests, + final IntConsumer onDropped, + final Predicate shouldStoreFailure, + final TriConsumer onStoreFailure, + final BiConsumer onFailure, + final BiConsumer onCompletion, + final String executorName ) { assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]"; @@ -708,14 +744,24 @@ protected void doRun() { totalMetrics.preIngest(); final int slot = i; final Releasable ref = refs.acquire(); + DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); + final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). - final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { + final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { @Override - public void onResponse(Boolean kept) { - assert kept != null; - if (kept == false) { - onDropped.accept(slot); + public void onResponse(IngestPipelinesExecutionResult result) { + assert result != null; + if (result.success) { + if (result.kept == false) { + onDropped.accept(slot); + } + } else { + // We were given a failure result in the onResponse method, so we must store the failure + // Recover the original document state, track a failed ingest, and pass it along + updateIndexRequestMetadata(indexRequest, ingestDocument.getOriginalMetadata()); + totalMetrics.ingestFailed(); + onStoreFailure.apply(slot, result.failedIndex, result.exception); } } @@ -731,11 +777,14 @@ public void onFailure(Exception e) { totalMetrics.postIngest(ingestTimeInNanos); ref.close(); }); - DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); - IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); - - executePipelines(pipelines, indexRequest, ingestDocument, documentListener); + executePipelines( + pipelines, + indexRequest, + ingestDocument, + shouldStoreFailure, + documentListener + ); indexRequest.setPipelinesHaveRun(); assert actionRequest.index() != null; @@ -825,7 +874,8 @@ private void executePipelines( final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final Predicate shouldStoreFailure, + final ActionListener listener ) { assert pipelines.hasNext(); PipelineSlot slot = pipelines.next(); @@ -835,13 +885,13 @@ private void executePipelines( // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet ingestDocument.resetReroute(); + final String originalIndex = indexRequest.indices()[0]; try { if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } indexRequest.addPipeline(pipelineId); - final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; @@ -855,12 +905,16 @@ private void executePipelines( ), e ); - listener.onFailure(e); + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(failAndStoreFor(originalIndex, e)); + } else { + listener.onFailure(e); + } return; // document failed! } if (keep == false) { - listener.onResponse(false); + listener.onResponse(discardResult()); return; // document dropped! } @@ -875,17 +929,20 @@ private void executePipelines( } catch (IllegalArgumentException ex) { // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. // In that case, we catch and wrap the exception, so we can include more details - listener.onFailure( - new IllegalArgumentException( - format( - "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - ex - ) + Exception documentContainsSelfReferenceException = new IllegalArgumentException( + format( + "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + ex ); + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(failAndStoreFor(originalIndex, documentContainsSelfReferenceException)); + } else { + listener.onFailure(documentContainsSelfReferenceException); + } return; // document failed! } @@ -895,17 +952,20 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { // final pipelines cannot change the target index (either directly or by way of a reroute) if (isFinalPipeline) { - listener.onFailure( - new IllegalStateException( - format( - "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", - pipelineId, - originalIndex, - newIndex, - indexRequest.id() - ) + Exception finalPipelineChangedIndexException = new IllegalStateException( + format( + "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", + pipelineId, + originalIndex, + newIndex, + indexRequest.id() ) ); + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(failAndStoreFor(originalIndex, finalPipelineChangedIndexException)); + } else { + listener.onFailure(finalPipelineChangedIndexException); + } return; // document failed! } @@ -914,16 +974,19 @@ private void executePipelines( if (cycle) { List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); indexCycle.add(newIndex); - listener.onFailure( - new IllegalStateException( - format( - "index cycle detected while processing pipeline [%s] for document [%s]: %s", - pipelineId, - indexRequest.id(), - indexCycle - ) + Exception indexCycleDetectedException = new IllegalStateException( + format( + "index cycle detected while processing pipeline [%s] for document [%s]: %s", + pipelineId, + indexRequest.id(), + indexCycle ) ); + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(failAndStoreFor(originalIndex, indexCycleDetectedException)); + } else { + listener.onFailure(indexCycleDetectedException); + } return; // document failed! } @@ -941,12 +1004,12 @@ private void executePipelines( } if (newPipelines.hasNext()) { - executePipelines(newPipelines, indexRequest, ingestDocument, listener); + executePipelines(newPipelines, indexRequest, ingestDocument, shouldStoreFailure, listener); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); cacheRawTimestamp(indexRequest, ingestDocument); - listener.onResponse(true); // document succeeded! + listener.onResponse(successResult()); // document succeeded! } }); } catch (Exception e) { @@ -954,7 +1017,11 @@ private void executePipelines( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), e ); - listener.onFailure(e); // document failed! + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(failAndStoreFor(originalIndex, e)); + } else { + listener.onFailure(e); // document failed! + } } } From b7b0e3c5e8463a1cd1e27b36112fa57aa6a01440 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 4 Dec 2023 17:27:10 -0500 Subject: [PATCH 03/36] Add logic for resolving if an index belongs to a data stream with a failure store. --- .../action/bulk/TransportBulkAction.java | 86 ++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index ea0399d0b87fe..8c1d5bec45591 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -42,11 +42,13 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -78,6 +80,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; @@ -336,7 +339,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executorName, l); + processBulkIndexIngestRequest(task, bulkRequest, executorName, metadata, l); } else { ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); } @@ -907,6 +910,7 @@ private void processBulkIndexIngestRequest( Task task, BulkRequest original, String executorName, + Metadata metadata, ActionListener listener ) { final long ingestStartTimeInNanos = System.nanoTime(); @@ -915,6 +919,8 @@ private void processBulkIndexIngestRequest( original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsDropped, + (indexName) -> shouldStoreFailure(indexName, metadata, System.currentTimeMillis()), + bulkRequestModifier::markItemForFailureStore, bulkRequestModifier::markItemAsFailed, (originalThread, exception) -> { if (exception != null) { @@ -962,6 +968,72 @@ public boolean isForceExecution() { ); } + static boolean shouldStoreFailure( + String indexName, + Metadata metadata, + long epochMillis + ) { + return resolveFailureStoreFromMetadata(indexName, metadata, epochMillis) + .or(() -> resolveFailureStoreFromTemplate(indexName, metadata)) + .orElse(false); + } + + private static Optional resolveFailureStoreFromMetadata( + String indexName, + Metadata metadata, + long epochMillis + ) { + if (indexName == null) { + return Optional.empty(); + } + + // Get index abstraction, resolving date math if it exists + IndexAbstraction indexAbstraction = metadata.getIndicesLookup() + .get(IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis)); + if (indexAbstraction == null) { + return Optional.empty(); + } + + // Resolve whatever the write index is for the abstraction, and check if it has a data stream associated with it + Index writeIndex = indexAbstraction.getWriteIndex(); + assert writeIndex != null : "Could not resolve write index for resource [" + indexName + "]"; + IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName()); + DataStream targetDataStream = writeAbstraction.getParentDataStream(); + + // We will store the failure in the failure store if the write target belongs to a data stream with a failure store, and if the + // the write target is not itself part of the data stream's failure store. We will not be storing failures for operations done + // against the failure store at this time. + return Optional.of( + targetDataStream != null + && targetDataStream.isFailureStore() + && targetDataStream.getFailureIndices().contains(writeIndex) == false + ); + } + + private static Optional resolveFailureStoreFromTemplate( + String indexName, + Metadata metadata + ) { + if (indexName == null) { + return Optional.empty(); + } + + // Check to see if the index name matches any templates such that an index would have been attributed + // We don't check v1 templates at all because failure stores can only exist on data streams via a v2 template + String template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false); + if (template != null) { + // Check if this is a data stream template or if it is just a normal index. + ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(template); + if (composableIndexTemplate.getDataStreamTemplate() != null) { + // Check if the data stream has the failure store enabled + return Optional.of(composableIndexTemplate.getDataStreamTemplate().hasFailureStore()); + } + } + + // Could not locate a failure store via template + return Optional.empty(); + } + static final class BulkRequestModifier implements Iterator> { final BulkRequest bulkRequest; @@ -1060,5 +1132,17 @@ synchronized void markItemAsFailed(int slot, Exception e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e); itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure)); } + + public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { + // Modify the request in the slot to point to the failure index for its data stream + + logger.warn( + "I would store the document in slot [{}] to the failure store for [{}] but I haven't figured out how to do that yet", + slot, + targetIndexName + ); + // For now, mark the item as failed + markItemAsFailed(slot, e); + } } } From 3dd68bbf402cb7b4ecefb0d53144c2514b5f43bb Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 4 Dec 2023 17:48:19 -0500 Subject: [PATCH 04/36] Do not route failures unless they target a data stream --- .../action/bulk/TransportBulkAction.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 8c1d5bec45591..083c590360644 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -990,24 +990,22 @@ private static Optional resolveFailureStoreFromMetadata( // Get index abstraction, resolving date math if it exists IndexAbstraction indexAbstraction = metadata.getIndicesLookup() .get(IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis)); - if (indexAbstraction == null) { + + // We only store failures if the failure is being written to a data stream, + // not when directly writing to backing indices/failure stores + if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) { return Optional.empty(); } - // Resolve whatever the write index is for the abstraction, and check if it has a data stream associated with it + // Locate the write index for the abstraction, and check if it has a data stream associated with it. + // This handles alias resolution as well as data stream resolution. Index writeIndex = indexAbstraction.getWriteIndex(); assert writeIndex != null : "Could not resolve write index for resource [" + indexName + "]"; IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName()); DataStream targetDataStream = writeAbstraction.getParentDataStream(); - // We will store the failure in the failure store if the write target belongs to a data stream with a failure store, and if the - // the write target is not itself part of the data stream's failure store. We will not be storing failures for operations done - // against the failure store at this time. - return Optional.of( - targetDataStream != null - && targetDataStream.isFailureStore() - && targetDataStream.getFailureIndices().contains(writeIndex) == false - ); + // We will store the failure if the write target belongs to a data stream with a failure store. + return Optional.of(targetDataStream != null && targetDataStream.isFailureStore()); } private static Optional resolveFailureStoreFromTemplate( @@ -1135,12 +1133,17 @@ synchronized void markItemAsFailed(int slot, Exception e) { public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { // Modify the request in the slot to point to the failure index for its data stream - logger.warn( "I would store the document in slot [{}] to the failure store for [{}] but I haven't figured out how to do that yet", slot, targetIndexName ); + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + // We hit an error during preprocessing a request so we: + // 1) Need to convert the original document into a failure document + // 2) Set its destination to be the targetIndexName's failure store + // 3) Mark the index request as being sent to the failure store for the target index for correct routing later on + // For now, mark the item as failed markItemAsFailed(slot, e); } From 2a47437b430d0fe0c9767cc6ae30aaf81b956859 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 13 Dec 2023 15:14:54 -0500 Subject: [PATCH 05/36] Convert errors into a failure document --- .../action/bulk/FailureStoreDocument.java | 88 +++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 13 ++- .../bulk/FailureStoreDocumentTests.java | 70 +++++++++++++++ 3 files changed, 169 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java create mode 100644 server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java new file mode 100644 index 0000000000000..bec5234d0bfd6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +public class FailureStoreDocument { + + private final IndexRequest source; + private final Exception exception; + private final String targetIndexName; + private final Supplier timeSupplier; + + public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName) { + this(source, exception, targetIndexName, System::currentTimeMillis); + } + + public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName, Supplier timeSupplier) { + this.source = Objects.requireNonNull(source, "source must not be null"); + this.exception = Objects.requireNonNull(exception, "exception must not be null"); + this.targetIndexName = Objects.requireNonNull(targetIndexName, "targetIndexName must not be null"); + this.timeSupplier = Objects.requireNonNull(timeSupplier, "timeSupplier must not be null"); + } + + public IndexRequest convert() throws IOException { + // This is a problem - We want to target the targetted index name for creation, but we want the document to end up in its failure + // store. We could target the failure store directly, but if it does not exist, then we need the auto create logic to somehow pick + // up that the parent data stream needs to be created. + // One option is to make use of the eventual flag to perform an operation on the failure store. Ughh who would have thought the + // dependencies would be swapped like that... + return new IndexRequest() + .index(targetIndexName) + .source(createSource()); + } + + private XContentBuilder createSource() throws IOException { + Throwable unwrapped = ExceptionsHelper.unwrapCause(exception); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + { + builder.timeField("@timestamp", timeSupplier.get()); + builder.startObject("document"); + { + if (source.id() != null) { + builder.field("id", source.id()); + } + if (source.routing() != null) { + builder.field("routing", source.routing()); + } + builder.field("index", source.index()); + // Unmapped source field + builder.startObject("source"); + { + builder.mapContents(source.sourceAsMap()); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject("error"); + { + builder.field("type", ElasticsearchException.getExceptionName(unwrapped)); + builder.field("message", unwrapped.getMessage()); + builder.field("stack_trace", ExceptionsHelper.stackTrace(unwrapped)); + // Further fields not yet tracked (Need to expose via specific exceptions) + // - pipeline + // - pipeline_trace + // - processor + } + builder.endObject(); + } + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 083c590360644..7c40857053a3d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -73,6 +73,7 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -1144,8 +1145,16 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception // 2) Set its destination to be the targetIndexName's failure store // 3) Mark the index request as being sent to the failure store for the target index for correct routing later on - // For now, mark the item as failed - markItemAsFailed(slot, e); + try { + IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert(); + bulkRequest.requests.set(slot, errorDocument); + + // For now, mark the item as failed + markItemAsFailed(slot, e); + } catch (IOException ex) { + ex.addSuppressed(e); + markItemAsFailed(slot, ex); + } } } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java new file mode 100644 index 0000000000000..6605779eb3f45 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.xcontent.ObjectPath; +import org.elasticsearch.xcontent.json.JsonXContent; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.startsWith; + +public class FailureStoreDocumentTests extends ESTestCase { + + public void testFailureStoreDocumentConverstion() throws Exception { + IndexRequest source = new IndexRequest("original_index") + .routing("fake_routing") + .id("1") + .source(JsonXContent.contentBuilder() + .startObject() + .field("key", "value") + .endObject() + ); + + // The exception will be wrapped for the test to make sure the converter correctly unwraps it + Exception exception = new ElasticsearchException("Test exception please ignore"); + exception = new RemoteTransportException("Test exception wrapper, please ignore", exception); + + String targetIndexName = "rerouted_index"; + long testTime = 1702357200000L; // 2023-12-12T05:00:00.000Z + + IndexRequest convertedRequest = new FailureStoreDocument(source, exception, targetIndexName, () -> testTime).convert(); + + // Retargeting write + assertThat(convertedRequest.id(), is(nullValue())); + assertThat(convertedRequest.routing(), is(nullValue())); + assertThat(convertedRequest.index(), is(equalTo(targetIndexName))); + + // Original document content is no longer in same place + assertThat("Expected original document to be modified", convertedRequest.sourceAsMap().get("key"), is(nullValue())); + + // Assert document contents + assertThat(ObjectPath.eval("@timestamp", convertedRequest.sourceAsMap()), is(equalTo("2023-12-12T05:00:00.000Z"))); + + assertThat(ObjectPath.eval("document.id", convertedRequest.sourceAsMap()), is(equalTo("1"))); + assertThat(ObjectPath.eval("document.routing", convertedRequest.sourceAsMap()), is(equalTo("fake_routing"))); + assertThat(ObjectPath.eval("document.index", convertedRequest.sourceAsMap()), is(equalTo("original_index"))); + assertThat(ObjectPath.eval("document.source.key", convertedRequest.sourceAsMap()), is(equalTo("value"))); + + assertThat(ObjectPath.eval("error.type", convertedRequest.sourceAsMap()), is(equalTo("exception"))); + assertThat(ObjectPath.eval("error.message", convertedRequest.sourceAsMap()), is(equalTo("Test exception please ignore"))); + assertThat( + ObjectPath.eval("error.stack_trace", convertedRequest.sourceAsMap()), + startsWith( + "org.elasticsearch.ElasticsearchException: Test exception please ignore\n" + + "\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion" + ) + ); + } +} From ee7cb89a86bd68573eab621e351b8d402fd7c60e Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 14 Dec 2023 16:27:35 -0500 Subject: [PATCH 06/36] Mark index request to be written to failure store --- .../action/bulk/FailureStoreDocument.java | 3 +- .../action/index/IndexRequest.java | 29 ++++++++++++++++++- .../bulk/FailureStoreDocumentTests.java | 2 ++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java index bec5234d0bfd6..f560fd3b68ca2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -44,7 +44,8 @@ public IndexRequest convert() throws IOException { // dependencies would be swapped like that... return new IndexRequest() .index(targetIndexName) - .source(createSource()); + .source(createSource()) + .setWriteToFailureStore(true); } private XContentBuilder createSource() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 12f7c21cba8e1..66f972ac52654 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.internal.Requests; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRouting; @@ -108,6 +109,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isPipelineResolved; private boolean requireAlias; + + // Transient variable as it is not serialized. This will eventually be replaced with an official change to index options. + private boolean writeToFailureStore = false; /** * This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document */ @@ -807,7 +811,21 @@ public boolean isRequireAlias() { @Override public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { - return ia.getWriteIndex(this, metadata); + if (DataStream.isFailureStoreEnabled() && writeToFailureStore) { + // TODO: Should this be a harder backstop than an assert statement? + assert ia.isDataStreamRelated() + : "Attempting to write a document to a failure store but the targeted index is not a data stream"; + // Resolve write index and get parent data stream to handle the case of dealing with an alias + String defaultWriteIndexName = ia.getWriteIndex().getName(); + DataStream dataStream = metadata.getIndicesLookup().get(defaultWriteIndexName).getParentDataStream(); + // TODO: Should this be a harder backstop than an assert statement? + assert dataStream.getFailureIndices().size() > 0 + : "Attempting to write a document to a failure store but the target data stream does not have one enabled"; + return dataStream.getFailureIndices().get(dataStream.getFailureIndices().size() - 1); + } else { + // Resolve as normal + return ia.getWriteIndex(this, metadata); + } } @Override @@ -820,6 +838,15 @@ public IndexRequest setRequireAlias(boolean requireAlias) { return this; } + public boolean isWriteToFailureStore() { + return writeToFailureStore; + } + + public IndexRequest setWriteToFailureStore(boolean writeToFailureStore) { + this.writeToFailureStore = writeToFailureStore; + return this; + } + public IndexRequest setListExecutedPipelines(boolean listExecutedPipelines) { this.listExecutedPipelines = listExecutedPipelines; return this; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java index 6605779eb3f45..3137cc58f2cd9 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java @@ -66,5 +66,7 @@ public void testFailureStoreDocumentConverstion() throws Exception { "\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion" ) ); + + assertThat(convertedRequest.isWriteToFailureStore(), is(true)); } } From 300b0012831650a7f0cbf08d455615a19f1525fc Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 14 Dec 2023 16:28:54 -0500 Subject: [PATCH 07/36] Actually redirect failure doc --- .../action/bulk/TransportBulkAction.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 7c40857053a3d..afadc31029aa2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -974,7 +974,7 @@ static boolean shouldStoreFailure( Metadata metadata, long epochMillis ) { - return resolveFailureStoreFromMetadata(indexName, metadata, epochMillis) + return DataStream.isFailureStoreEnabled() && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis) .or(() -> resolveFailureStoreFromTemplate(indexName, metadata)) .orElse(false); } @@ -1133,27 +1133,27 @@ synchronized void markItemAsFailed(int slot, Exception e) { } public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { - // Modify the request in the slot to point to the failure index for its data stream - logger.warn( - "I would store the document in slot [{}] to the failure store for [{}] but I haven't figured out how to do that yet", - slot, - targetIndexName - ); - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); - // We hit an error during preprocessing a request so we: - // 1) Need to convert the original document into a failure document - // 2) Set its destination to be the targetIndexName's failure store - // 3) Mark the index request as being sent to the failure store for the target index for correct routing later on - - try { - IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert(); - bulkRequest.requests.set(slot, errorDocument); - - // For now, mark the item as failed + if (DataStream.isFailureStoreEnabled() == false) { markItemAsFailed(slot, e); - } catch (IOException ex) { - ex.addSuppressed(e); - markItemAsFailed(slot, ex); + } else { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + try { + IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert(); + bulkRequest.requests.set(slot, errorDocument); + } catch (IOException ex) { + // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it if + // we need the info later. + ex.addSuppressed(e); + logger.debug( + () -> "Encountered exception while attempting to redirect a failed ingest operation: index [" + + targetIndexName + + "], source: [" + + indexRequest.source().utf8ToString() + + "]", + ex + ); + markItemAsFailed(slot, ex); + } } } } From d45ff24acb7fdd6d2985ea8a84df97e2396fe54c Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 14 Dec 2023 16:29:56 -0500 Subject: [PATCH 08/36] Fix infinite loop when creating fresh index requests for failures in the bulk request --- .../org/elasticsearch/action/bulk/FailureStoreDocument.java | 2 ++ .../org/elasticsearch/action/bulk/TransportBulkAction.java | 5 +++++ .../elasticsearch/action/bulk/FailureStoreDocumentTests.java | 2 ++ 3 files changed, 9 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java index f560fd3b68ca2..7c6b45df26627 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; @@ -45,6 +46,7 @@ public IndexRequest convert() throws IOException { return new IndexRequest() .index(targetIndexName) .source(createSource()) + .opType(DocWriteRequest.OpType.CREATE) .setWriteToFailureStore(true); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index afadc31029aa2..da2a09378336a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1139,6 +1139,11 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); try { IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert(); + // This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to + // the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. + errorDocument.isPipelineResolved(true); + errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); + errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); bulkRequest.requests.set(slot, errorDocument); } catch (IOException ex) { // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it if diff --git a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java index 3137cc58f2cd9..c5da0f6f22249 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteTransportException; @@ -45,6 +46,7 @@ public void testFailureStoreDocumentConverstion() throws Exception { assertThat(convertedRequest.id(), is(nullValue())); assertThat(convertedRequest.routing(), is(nullValue())); assertThat(convertedRequest.index(), is(equalTo(targetIndexName))); + assertThat(convertedRequest.opType(), is(DocWriteRequest.OpType.CREATE)); // Original document content is no longer in same place assertThat("Expected original document to be modified", convertedRequest.sourceAsMap().get("key"), is(nullValue())); From f1782b1cf238b2d28ab5e332a8c64af0af429d7b Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 14 Dec 2023 16:43:52 -0500 Subject: [PATCH 09/36] Update docs/changelog/103481.yaml --- docs/changelog/103481.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/103481.yaml diff --git a/docs/changelog/103481.yaml b/docs/changelog/103481.yaml new file mode 100644 index 0000000000000..f7c7c0b6eecc9 --- /dev/null +++ b/docs/changelog/103481.yaml @@ -0,0 +1,5 @@ +pr: 103481 +summary: Redirect failed ingest node operations to a failure store when available +area: Data streams +type: feature +issues: [] From 7cd959ae0fa4e91419b91a7b1b95599883a9df7e Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 8 Jan 2024 15:21:23 -0500 Subject: [PATCH 10/36] Refactor calls to ingest service to use new execute method --- .../elasticsearch/ingest/IngestService.java | 22 --- .../bulk/TransportBulkActionIngestTests.java | 22 ++- .../ingest/IngestServiceTests.java | 177 ++++++++++++++++-- 3 files changed, 182 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index c2eb9d465ba47..7f973a7d15cf8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -669,28 +669,6 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void executeBulkRequest( - final int numberOfActionRequests, - final Iterable> actionRequests, - final IntConsumer onDropped, - final BiConsumer onFailure, - final BiConsumer onCompletion, - final String executorName - ) { - // FIXME: Remove in refactoring step - // Run ingest service as if failure store is disabled - executeBulkRequest( - numberOfActionRequests, - actionRequests, - onDropped, - (s) -> false, - (slot, targetIndex, e) -> onFailure.accept(slot, e), - onFailure, - onCompletion, - executorName - ); - } - private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) {} private static IngestPipelinesExecutionResult successResult() { return new IngestPipelinesExecutionResult(true, true, null, null); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f30bceada65d9..7c4481efe6aa6 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -280,6 +280,8 @@ public void testIngestLocal() throws Exception { eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -322,6 +324,8 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -368,6 +372,8 @@ public void testIngestSystemLocal() throws Exception { eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.SYSTEM_WRITE) @@ -401,7 +407,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -441,7 +447,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -525,6 +531,8 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -573,6 +581,8 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -667,6 +677,8 @@ public void testFindDefaultPipelineFromTemplateMatch() { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -705,6 +717,8 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -732,6 +746,8 @@ public void testIngestCallbackExceptionHandled() throws Exception { eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) @@ -769,6 +785,8 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { eq(1), bulkDocsItr.capture(), any(), + any(), + any(), failureHandler.capture(), completionHandler.capture(), eq(Names.WRITE) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index d345197d88a23..11231e8225fb5 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -208,7 +208,16 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1111,6 +1120,8 @@ public String getType() { bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -1154,6 +1165,8 @@ public void testExecuteBulkPipelineDoesNotExist() { bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -1218,6 +1231,8 @@ public void close() { bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -1247,7 +1262,16 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1280,7 +1304,16 @@ public void testDynamicTemplates() throws Exception { CountDownLatch latch = new CountDownLatch(1); final BiConsumer failureHandler = (v, e) -> { throw new AssertionError("must never fail", e); }; final BiConsumer completionHandler = (t, e) -> latch.countDown(); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); latch.await(); assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz"))); } @@ -1301,7 +1334,16 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1355,7 +1397,16 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1404,7 +1455,16 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1453,7 +1513,16 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1496,7 +1565,16 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1554,6 +1632,8 @@ public void testBulkRequestExecutionWithFailures() throws Exception { numRequest, bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), requestItemErrorHandler, completionHandler, Names.WRITE @@ -1612,6 +1692,8 @@ public void testBulkRequestExecution() throws Exception { numRequest, bulkRequest.requests(), indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), requestItemErrorHandler, completionHandler, Names.WRITE @@ -1721,7 +1803,16 @@ public String execute() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_id2"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); { final IngestStats ingestStats = ingestService.stats(); @@ -1792,7 +1883,16 @@ public void testStats() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2)); @@ -1809,7 +1909,16 @@ public void testStats() throws Exception { assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1831,7 +1940,16 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1854,7 +1972,16 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + completionHandler, + Names.WRITE + ); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1946,6 +2073,8 @@ public String getDescription() { bulkRequest.numberOfActions(), bulkRequest.requests(), dropHandler, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), failureHandler, completionHandler, Names.WRITE @@ -2030,7 +2159,16 @@ public void testCBORParsing() throws Exception { .setPipeline("_id") .setFinalPipeline("_none"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); } assertThat(reference.get(), is(instanceOf(byte[].class))); @@ -2101,7 +2239,16 @@ public void testSetsRawTimestamp() { bulkRequest.add(indexRequest6); bulkRequest.add(indexRequest7); bulkRequest.add(indexRequest8); - ingestService.executeBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest( + 8, + bulkRequest.requests(), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + (integer, e) -> {}, + (thread, e) -> {}, + Names.WRITE + ); assertThat(indexRequest1.getRawTimestamp(), nullValue()); assertThat(indexRequest2.getRawTimestamp(), nullValue()); From 947afd40ef73363e5f9fb95143c7838535afdb38 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 9 Jan 2024 13:17:39 -0500 Subject: [PATCH 11/36] Ingest Service changes tested --- .../ingest/IngestServiceTests.java | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 11231e8225fb5..26aa5b1e0454f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -91,6 +92,7 @@ import java.util.function.Consumer; import java.util.function.IntConsumer; import java.util.function.LongSupplier; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -1643,6 +1645,184 @@ public void testBulkRequestExecutionWithFailures() throws Exception { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } + public void testExecuteFailureRedirection() throws Exception { + final CompoundProcessor processor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Map.of( + "mock", + (factories, tag, description, config) -> processor, + "set", + (factories, tag, description, config) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail()) + ) + ); + PutPipelineRequest putRequest1 = new PutPipelineRequest( + "_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + // given that set -> fail() above, it's a failure if a document executes against this pipeline + PutPipelineRequest putRequest2 = new PutPipelineRequest( + "_id2", + new BytesArray("{\"processors\": [{\"set\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest1, clusterState); + clusterState = executePut(putRequest2, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .source(Map.of()) + .setPipeline("_id1") + .setFinalPipeline("_id2"); + doThrow(new RuntimeException()).when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + final Predicate redirectCheck = (idx) -> indexRequest.index().equals(idx); + @SuppressWarnings("unchecked") + final TriConsumer redirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + redirectCheck, + redirectHandler, + failureHandler, + completionHandler, + Names.WRITE + ); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + verify(redirectHandler, times(1)).apply(eq(0), eq(indexRequest.index()), any(RuntimeException.class)); + verifyNoInteractions(failureHandler); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + + public void testExecuteFailureRedirectionWithNestedOnFailure() throws Exception { + final Processor processor = mock(Processor.class); + when(processor.isAsync()).thenReturn(true); + final Processor onFailureProcessor = mock(Processor.class); + when(onFailureProcessor.isAsync()).thenReturn(true); + final Processor onFailureOnFailureProcessor = mock(Processor.class); + when(onFailureOnFailureProcessor.isAsync()).thenReturn(true); + final List processors = List.of(onFailureProcessor); + final List onFailureProcessors = List.of(onFailureOnFailureProcessor); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(new CompoundProcessor(false, processors, onFailureProcessors)) + ); + IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> compoundProcessor)); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .source(Map.of()) + .setPipeline("_id") + .setFinalPipeline("_none"); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + doThrow(new RuntimeException()).when(onFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + doThrow(new RuntimeException()).when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + final Predicate redirectPredicate = (idx) -> indexRequest.index().equals(idx); + @SuppressWarnings("unchecked") + final TriConsumer redirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 1, + List.of(indexRequest), + indexReq -> {}, + redirectPredicate, + redirectHandler, + failureHandler, + completionHandler, + Names.WRITE + ); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); + verify(redirectHandler, times(1)).apply(eq(0), eq(indexRequest.index()), any(RuntimeException.class)); + verifyNoInteractions(failureHandler); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + + public void testBulkRequestExecutionWithRedirectedFailures() throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + int numRequest = scaledRandomIntBetween(8, 64); + int numIndexRequests = 0; + for (int i = 0; i < numRequest; i++) { + DocWriteRequest request; + if (randomBoolean()) { + if (randomBoolean()) { + request = new DeleteRequest("_index", "_id"); + } else { + request = new UpdateRequest("_index", "_id"); + } + } else { + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + request = indexRequest; + numIndexRequests++; + } + bulkRequest.add(request); + } + + CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.isAsync()).thenReturn(true); + when(processor.getProcessors()).thenReturn(List.of(mock(Processor.class))); + Exception error = new RuntimeException(); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, error); + return null; + }).when(processor).execute(any(), any()); + IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + TriConsumer requestItemRedirectHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + numRequest, + bulkRequest.requests(), + indexReq -> {}, + (s) -> true, + requestItemRedirectHandler, + requestItemErrorHandler, + completionHandler, + Names.WRITE + ); + + verify(requestItemRedirectHandler, times(numIndexRequests)).apply(anyInt(), anyString(), argThat(e -> e.getCause().equals(error))); + verifyNoInteractions(requestItemErrorHandler); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + public void testBulkRequestExecution() throws Exception { BulkRequest bulkRequest = new BulkRequest(); String pipelineId = "_id"; From 4a8e08387ba71b762e02a7c1d60a046bbb8077cd Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 10 Jan 2024 12:41:37 -0500 Subject: [PATCH 12/36] Add redirection test to TransportBulkActionIngestTests --- .../bulk/TransportBulkActionIngestTests.java | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 7c4481efe6aa6..db4b792b6933d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.Nullable; @@ -54,13 +55,16 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Predicate; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; @@ -82,6 +86,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { */ private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline"; private static final String WITH_DEFAULT_PIPELINE_ALIAS = "alias_for_index_with_default_pipeline"; + private static final String WITH_FAILURE_STORE_ENABLED = "data-stream-failure-store-enabled"; private static final Settings SETTINGS = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); @@ -95,6 +100,10 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor + ArgumentCaptor> redirectPredicate; + @Captor + ArgumentCaptor> redirectHandler; + @Captor ArgumentCaptor> failureHandler; @Captor ArgumentCaptor> completionHandler; @@ -174,7 +183,7 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction> req = bulkDocsItr.getValue().iterator(); failureHandler.getValue().accept(0, exception); // have an exception for our one index request indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); + assertTrue(redirectPredicate.getValue().test(WITH_FAILURE_STORE_ENABLED + "-1")); // ensure redirects on failure store data stream + assertFalse(redirectPredicate.getValue().test(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices + assertFalse(redirectPredicate.getValue().test("index")); // no redirects for non-existant indices with no templates + redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2) + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyNoMoreInteractions(transportService); From 36483643f95bbddc457f3087c1750125d311fb54 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 10 Jan 2024 14:02:57 -0500 Subject: [PATCH 13/36] Add rest tests --- .../190_failure_store_redirection.yml | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml new file mode 100644 index 0000000000000..b9621977ff3aa --- /dev/null +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml @@ -0,0 +1,110 @@ +--- +teardown: + - do: + indices.delete_data_stream: + name: logs-foobar + ignore: 404 + + - do: + indices.delete: + index: .fs-logs-foobar-* + ignore: 404 + + - do: + indices.delete_index_template: + name: generic_logs_template + ignore: 404 + + - do: + ingest.delete_pipeline: + id: "failing_pipeline" + ignore: 404 + +--- +"Redirect ingest failure in data stream to failure store": + - skip: + version: " - 8.12.99" + reason: "data stream failure stores only redirect ingest failures in 8.13+" + features: [allowed_warnings, contains] + + - do: + ingest.put_pipeline: + id: "failing_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "fail" : { + "message" : "error_message" + } + } + ] + } + - match: { acknowledged: true } + + - do: + allowed_warnings: + - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" + indices.put_index_template: + name: generic_logs_template + body: + index_patterns: logs-* + data_stream: + failure_store: true + template: + settings: + number_of_shards: 1 + number_of_replicas: 1 + index: + default_pipeline: "failing_pipeline" + + - do: + index: + index: logs-foobar + refresh: true + body: + '@timestamp': '2020-12-12' + foo: bar + + - do: + indices.get_data_stream: + name: logs-foobar + - match: { data_streams.0.name: logs-foobar } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - length: { data_streams.0.indices: 1 } + - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { data_streams.0.failure_store: true } + - length: { data_streams.0.failure_indices: 1 } + - match: { data_streams.0.failure_indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + + - do: + search: + index: logs-foobar + body: { query: { match_all: {} } } + - length: { hits.hits: 0 } + + - do: + search: + index: .fs-logs-foobar-* + - length: { hits.hits: 1 } + - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" } + - exists: hits.hits.0._source.@timestamp + - not_exists: hits.hits.0._source.foo + - not_exists: hits.hits.0._source.document.id + - match: { hits.hits.0._source.document.index: 'logs-foobar' } + - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' } + - match: { hits.hits.0._source.document.source.foo: 'bar' } + - match: { hits.hits.0._source.error.type: 'fail_processor_exception' } + - match: { hits.hits.0._source.error.message: 'error_message' } + - contains: { hits.hits.0._source.error.stack_trace: 'org.elasticsearch.ingest.common.FailProcessorException: error_message' } + + - do: + indices.delete_data_stream: + name: logs-foobar + - is_true: acknowledged + + - do: + indices.delete: + index: .fs-logs-foobar-* + - is_true: acknowledged From cccef0a34574f687041cb5b8d72e3ca484f5d15f Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 10 Jan 2024 14:29:19 -0500 Subject: [PATCH 14/36] precommit --- .../action/bulk/FailureStoreDocument.java | 3 +- .../action/bulk/TransportBulkAction.java | 24 ++----- .../elasticsearch/ingest/IngestService.java | 64 +++++++++---------- .../bulk/FailureStoreDocumentTests.java | 13 ++-- 4 files changed, 44 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java index 7c6b45df26627..1a2b8680ae111 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -43,8 +43,7 @@ public IndexRequest convert() throws IOException { // up that the parent data stream needs to be created. // One option is to make use of the eventual flag to perform an operation on the failure store. Ughh who would have thought the // dependencies would be swapped like that... - return new IndexRequest() - .index(targetIndexName) + return new IndexRequest().index(targetIndexName) .source(createSource()) .opType(DocWriteRequest.OpType.CREATE) .setWriteToFailureStore(true); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index da2a09378336a..610c70d9a23b0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -969,21 +969,14 @@ public boolean isForceExecution() { ); } - static boolean shouldStoreFailure( - String indexName, - Metadata metadata, - long epochMillis - ) { - return DataStream.isFailureStoreEnabled() && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis) - .or(() -> resolveFailureStoreFromTemplate(indexName, metadata)) - .orElse(false); + static boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) { + return DataStream.isFailureStoreEnabled() + && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or( + () -> resolveFailureStoreFromTemplate(indexName, metadata) + ).orElse(false); } - private static Optional resolveFailureStoreFromMetadata( - String indexName, - Metadata metadata, - long epochMillis - ) { + private static Optional resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) { if (indexName == null) { return Optional.empty(); } @@ -1009,10 +1002,7 @@ private static Optional resolveFailureStoreFromMetadata( return Optional.of(targetDataStream != null && targetDataStream.isFailureStore()); } - private static Optional resolveFailureStoreFromTemplate( - String indexName, - Metadata metadata - ) { + private static Optional resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { if (indexName == null) { return Optional.empty(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 7f973a7d15cf8..eaa360254288d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -670,12 +670,15 @@ void validatePipeline(Map ingestInfos, String pipelin } private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) {} + private static IngestPipelinesExecutionResult successResult() { return new IngestPipelinesExecutionResult(true, true, null, null); } + private static IngestPipelinesExecutionResult discardResult() { return new IngestPipelinesExecutionResult(true, false, null, null); } + private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { return new IngestPipelinesExecutionResult(false, true, e, index); } @@ -726,43 +729,40 @@ protected void doRun() { final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). - final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { - @Override - public void onResponse(IngestPipelinesExecutionResult result) { - assert result != null; - if (result.success) { - if (result.kept == false) { - onDropped.accept(slot); + final ActionListener documentListener = ActionListener.runAfter( + new ActionListener<>() { + @Override + public void onResponse(IngestPipelinesExecutionResult result) { + assert result != null; + if (result.success) { + if (result.kept == false) { + onDropped.accept(slot); + } + } else { + // We were given a failure result in the onResponse method, so we must store the failure + // Recover the original document state, track a failed ingest, and pass it along + updateIndexRequestMetadata(indexRequest, ingestDocument.getOriginalMetadata()); + totalMetrics.ingestFailed(); + onStoreFailure.apply(slot, result.failedIndex, result.exception); } - } else { - // We were given a failure result in the onResponse method, so we must store the failure - // Recover the original document state, track a failed ingest, and pass it along - updateIndexRequestMetadata(indexRequest, ingestDocument.getOriginalMetadata()); - totalMetrics.ingestFailed(); - onStoreFailure.apply(slot, result.failedIndex, result.exception); } - } - @Override - public void onFailure(Exception e) { - totalMetrics.ingestFailed(); - onFailure.accept(slot, e); + @Override + public void onFailure(Exception e) { + totalMetrics.ingestFailed(); + onFailure.accept(slot, e); + } + }, + () -> { + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + ref.close(); } - }, () -> { - // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate - // that we're finished with this document - final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - totalMetrics.postIngest(ingestTimeInNanos); - ref.close(); - }); - - executePipelines( - pipelines, - indexRequest, - ingestDocument, - shouldStoreFailure, - documentListener ); + + executePipelines(pipelines, indexRequest, ingestDocument, shouldStoreFailure, documentListener); indexRequest.setPipelinesHaveRun(); assert actionRequest.index() != null; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java index c5da0f6f22249..14bb58fa28ee9 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java @@ -24,14 +24,9 @@ public class FailureStoreDocumentTests extends ESTestCase { public void testFailureStoreDocumentConverstion() throws Exception { - IndexRequest source = new IndexRequest("original_index") - .routing("fake_routing") + IndexRequest source = new IndexRequest("original_index").routing("fake_routing") .id("1") - .source(JsonXContent.contentBuilder() - .startObject() - .field("key", "value") - .endObject() - ); + .source(JsonXContent.contentBuilder().startObject().field("key", "value").endObject()); // The exception will be wrapped for the test to make sure the converter correctly unwraps it Exception exception = new ElasticsearchException("Test exception please ignore"); @@ -64,8 +59,8 @@ public void testFailureStoreDocumentConverstion() throws Exception { assertThat( ObjectPath.eval("error.stack_trace", convertedRequest.sourceAsMap()), startsWith( - "org.elasticsearch.ElasticsearchException: Test exception please ignore\n" + - "\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion" + "org.elasticsearch.ElasticsearchException: Test exception please ignore\n" + + "\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion" ) ); From 59f250b0b07f67ae23e1c02c0b0f465fcd2e8f2c Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 10 Jan 2024 14:56:23 -0500 Subject: [PATCH 15/36] Comment cleanup --- .../org/elasticsearch/action/bulk/FailureStoreDocument.java | 5 ----- .../java/org/elasticsearch/action/index/IndexRequest.java | 5 ++++- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java index 1a2b8680ae111..e234c8962b66c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -38,11 +38,6 @@ public FailureStoreDocument(IndexRequest source, Exception exception, String tar } public IndexRequest convert() throws IOException { - // This is a problem - We want to target the targetted index name for creation, but we want the document to end up in its failure - // store. We could target the failure store directly, but if it does not exist, then we need the auto create logic to somehow pick - // up that the parent data stream needs to be created. - // One option is to make use of the eventual flag to perform an operation on the failure store. Ughh who would have thought the - // dependencies would be swapped like that... return new IndexRequest().index(targetIndexName) .source(createSource()) .opType(DocWriteRequest.OpType.CREATE) diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 66f972ac52654..f82c3232b319f 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -110,8 +110,11 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean requireAlias; - // Transient variable as it is not serialized. This will eventually be replaced with an official change to index options. + /** + * Transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. + */ private boolean writeToFailureStore = false; + /** * This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document */ diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 9a9cd1f6fb762..e51dd2d159871 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -738,7 +738,7 @@ public org.elasticsearch.script.Metadata getMetadata() { } /** - * Get the strongly typed metadata + * Get the strongly typed metadata, unmodified, as it existed when the ingest document was first created */ public org.elasticsearch.script.Metadata getOriginalMetadata() { return originalMetadata; From 0af09ee7d75b8e6f4c4ddde52a3e53d7c6718166 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 12 Jan 2024 17:04:17 -0500 Subject: [PATCH 16/36] Add javadoc to FailureStoreDocument, convert it to static utility --- .../action/bulk/FailureStoreDocument.java | 60 +++++++++++++------ .../action/bulk/TransportBulkAction.java | 2 +- .../bulk/FailureStoreDocumentTests.java | 2 +- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java index e234c8962b66c..e0d6e8200e86d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java @@ -19,32 +19,58 @@ import java.util.Objects; import java.util.function.Supplier; -public class FailureStoreDocument { - - private final IndexRequest source; - private final Exception exception; - private final String targetIndexName; - private final Supplier timeSupplier; +/** + * Transforms an indexing request using error information into a new index request to be stored in a data stream's failure store. + */ +public final class FailureStoreDocument { - public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName) { - this(source, exception, targetIndexName, System::currentTimeMillis); - } + private FailureStoreDocument() {} - public FailureStoreDocument(IndexRequest source, Exception exception, String targetIndexName, Supplier timeSupplier) { - this.source = Objects.requireNonNull(source, "source must not be null"); - this.exception = Objects.requireNonNull(exception, "exception must not be null"); - this.targetIndexName = Objects.requireNonNull(targetIndexName, "targetIndexName must not be null"); - this.timeSupplier = Objects.requireNonNull(timeSupplier, "timeSupplier must not be null"); + /** + * Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a + * new {@link IndexRequest} that can be stored in a data stream's failure store. + * @param source The original request that has failed to be ingested + * @param exception The exception that was thrown that caused the request to fail to be ingested + * @param targetIndexName The index that the request was targeting at time of failure + * @return A new {@link IndexRequest} with a failure store compliant structure + * @throws IOException If there is a problem when the document's new source is serialized + */ + public static IndexRequest transformFailedRequest(IndexRequest source, Exception exception, String targetIndexName) throws IOException { + return transformFailedRequest(source, exception, targetIndexName, System::currentTimeMillis); } - public IndexRequest convert() throws IOException { + /** + * Combines an {@link IndexRequest} that has failed during the bulk process with the error thrown for that request. The result is a + * new {@link IndexRequest} that can be stored in a data stream's failure store. + * @param source The original request that has failed to be ingested + * @param exception The exception that was thrown that caused the request to fail to be ingested + * @param targetIndexName The index that the request was targeting at time of failure + * @param timeSupplier Supplies the value for the document's timestamp + * @return A new {@link IndexRequest} with a failure store compliant structure + * @throws IOException If there is a problem when the document's new source is serialized + */ + public static IndexRequest transformFailedRequest( + IndexRequest source, + Exception exception, + String targetIndexName, + Supplier timeSupplier + ) throws IOException { return new IndexRequest().index(targetIndexName) - .source(createSource()) + .source(createSource(source, exception, targetIndexName, timeSupplier)) .opType(DocWriteRequest.OpType.CREATE) .setWriteToFailureStore(true); } - private XContentBuilder createSource() throws IOException { + private static XContentBuilder createSource( + IndexRequest source, + Exception exception, + String targetIndexName, + Supplier timeSupplier + ) throws IOException { + Objects.requireNonNull(source, "source must not be null"); + Objects.requireNonNull(exception, "exception must not be null"); + Objects.requireNonNull(targetIndexName, "targetIndexName must not be null"); + Objects.requireNonNull(timeSupplier, "timeSupplier must not be null"); Throwable unwrapped = ExceptionsHelper.unwrapCause(exception); XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 610c70d9a23b0..11a63664717a4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1128,7 +1128,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception } else { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); try { - IndexRequest errorDocument = new FailureStoreDocument(indexRequest, e, targetIndexName).convert(); + IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName); // This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to // the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. errorDocument.isPipelineResolved(true); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java index 14bb58fa28ee9..92fa67e9a6ffc 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java @@ -35,7 +35,7 @@ public void testFailureStoreDocumentConverstion() throws Exception { String targetIndexName = "rerouted_index"; long testTime = 1702357200000L; // 2023-12-12T05:00:00.000Z - IndexRequest convertedRequest = new FailureStoreDocument(source, exception, targetIndexName, () -> testTime).convert(); + IndexRequest convertedRequest = FailureStoreDocument.transformFailedRequest(source, exception, targetIndexName, () -> testTime); // Retargeting write assertThat(convertedRequest.id(), is(nullValue())); From 4a5b8e6860d237f8d6fe41dd1b0958cea5a1df1a Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 12 Jan 2024 17:10:53 -0500 Subject: [PATCH 17/36] Refactor IngestPipelinesExecutionResult to use static constants where possible. Pull the fail and store method into it for consistency sake --- .../elasticsearch/ingest/IngestService.java | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index eaa360254288d..aea294c8c814c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -669,18 +669,12 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } - private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) {} - - private static IngestPipelinesExecutionResult successResult() { - return new IngestPipelinesExecutionResult(true, true, null, null); - } - - private static IngestPipelinesExecutionResult discardResult() { - return new IngestPipelinesExecutionResult(true, false, null, null); - } - - private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { - return new IngestPipelinesExecutionResult(false, true, e, index); + private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) { + private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); + private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); + private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { + return new IngestPipelinesExecutionResult(false, true, e, index); + } } public void executeBulkRequest( @@ -884,7 +878,7 @@ private void executePipelines( e ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(failAndStoreFor(originalIndex, e)); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); } else { listener.onFailure(e); } @@ -892,7 +886,7 @@ private void executePipelines( } if (keep == false) { - listener.onResponse(discardResult()); + listener.onResponse(IngestPipelinesExecutionResult.DISCARD_RESULT); return; // document dropped! } @@ -917,7 +911,7 @@ private void executePipelines( ex ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(failAndStoreFor(originalIndex, documentContainsSelfReferenceException)); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, documentContainsSelfReferenceException)); } else { listener.onFailure(documentContainsSelfReferenceException); } @@ -940,7 +934,9 @@ private void executePipelines( ) ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(failAndStoreFor(originalIndex, finalPipelineChangedIndexException)); + listener.onResponse( + IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, finalPipelineChangedIndexException) + ); } else { listener.onFailure(finalPipelineChangedIndexException); } @@ -961,7 +957,7 @@ private void executePipelines( ) ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(failAndStoreFor(originalIndex, indexCycleDetectedException)); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, indexCycleDetectedException)); } else { listener.onFailure(indexCycleDetectedException); } @@ -987,7 +983,7 @@ private void executePipelines( // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); cacheRawTimestamp(indexRequest, ingestDocument); - listener.onResponse(successResult()); // document succeeded! + listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded! } }); } catch (Exception e) { @@ -996,7 +992,7 @@ private void executePipelines( e ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(failAndStoreFor(originalIndex, e)); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); } else { listener.onFailure(e); // document failed! } From 523ae5ab6cc25307d62c58eee53907a9b7f7c998 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 12 Jan 2024 17:12:34 -0500 Subject: [PATCH 18/36] Replace kept with shouldKeep --- .../src/main/java/org/elasticsearch/ingest/IngestService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index aea294c8c814c..9b6f49445e74c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -669,7 +669,7 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } - private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) { + private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) { private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { @@ -729,7 +729,7 @@ protected void doRun() { public void onResponse(IngestPipelinesExecutionResult result) { assert result != null; if (result.success) { - if (result.kept == false) { + if (result.shouldKeep == false) { onDropped.accept(slot); } } else { From 8e66e25d33922f9c648f755cf49baf07c958773e Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 16 Jan 2024 15:35:55 -0500 Subject: [PATCH 19/36] Add executeBulkRequest javadoc --- .../elasticsearch/ingest/IngestService.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9b6f49445e74c..c40752c09a6cc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -677,6 +677,26 @@ private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exce } } + /** + * Executes all applicable pipelines for a collection of documents. + * @param numberOfActionRequests The total number of requests to process. + * @param actionRequests The collection of requests to be processed. + * @param onDropped A callback executed when a document is dropped by a pipeline. + * Accepts the slot in the collection of requests that the document occupies. + * @param shouldStoreFailure A predicate executed on each ingest failure to determine if the + * failure should be stored somewhere. + * @param onStoreFailure A callback executed when a document fails ingest but the failure should + * be persisted elsewhere. Accepts the slot in the collection of requests + * that the document occupies, the index name that the request was targeting + * at the time of failure, and the exception that the document encountered. + * @param onFailure A callback executed when a document fails ingestion and does not need to be + * persisted. Accepts the slot in the collection of requests that the document + * occupies, and the exception that the document encountered. + * @param onCompletion A callback executed once all documents have been processed. Accepts the thread + * that ingestion completed on or an exception in the event that the entire operation + * has failed. + * @param executorName Which executor the bulk request should be executed on. + */ public void executeBulkRequest( final int numberOfActionRequests, final Iterable> actionRequests, From 2f741b4e02a64f3184852433675d25d3a8d1abeb Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 16 Jan 2024 15:49:26 -0500 Subject: [PATCH 20/36] Simplify exception names --- .../elasticsearch/ingest/IngestService.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index c40752c09a6cc..7971435af87d6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -921,7 +921,7 @@ private void executePipelines( } catch (IllegalArgumentException ex) { // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. // In that case, we catch and wrap the exception, so we can include more details - Exception documentContainsSelfReferenceException = new IllegalArgumentException( + Exception wrappedException = new IllegalArgumentException( format( "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", pipelineId, @@ -931,9 +931,9 @@ private void executePipelines( ex ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, documentContainsSelfReferenceException)); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, wrappedException)); } else { - listener.onFailure(documentContainsSelfReferenceException); + listener.onFailure(wrappedException); } return; // document failed! } @@ -944,7 +944,7 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { // final pipelines cannot change the target index (either directly or by way of a reroute) if (isFinalPipeline) { - Exception finalPipelineChangedIndexException = new IllegalStateException( + Exception ex = new IllegalStateException( format( "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", pipelineId, @@ -955,10 +955,10 @@ private void executePipelines( ); if (shouldStoreFailure.test(originalIndex)) { listener.onResponse( - IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, finalPipelineChangedIndexException) + IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, ex) ); } else { - listener.onFailure(finalPipelineChangedIndexException); + listener.onFailure(ex); } return; // document failed! } @@ -968,7 +968,7 @@ private void executePipelines( if (cycle) { List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); indexCycle.add(newIndex); - Exception indexCycleDetectedException = new IllegalStateException( + Exception ex = new IllegalStateException( format( "index cycle detected while processing pipeline [%s] for document [%s]: %s", pipelineId, @@ -977,9 +977,9 @@ private void executePipelines( ) ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, indexCycleDetectedException)); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, ex)); } else { - listener.onFailure(indexCycleDetectedException); + listener.onFailure(ex); } return; // document failed! } From 786e0d5ff5bffa22298513baed425d27e0220386 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 16 Jan 2024 15:56:57 -0500 Subject: [PATCH 21/36] Use cached timestamp in failure store detection. --- .../java/org/elasticsearch/action/bulk/TransportBulkAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 11a63664717a4..d74a4e64df594 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -920,7 +920,7 @@ private void processBulkIndexIngestRequest( original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsDropped, - (indexName) -> shouldStoreFailure(indexName, metadata, System.currentTimeMillis()), + (indexName) -> shouldStoreFailure(indexName, metadata, threadPool.absoluteTimeInMillis()), bulkRequestModifier::markItemForFailureStore, bulkRequestModifier::markItemAsFailed, (originalThread, exception) -> { From bfb4343d4c8f8ae8d3d903f2108af0b6dc2be605 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 17 Jan 2024 03:33:13 -0500 Subject: [PATCH 22/36] Add javadoc to bulk action methods --- .../action/bulk/TransportBulkAction.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index d74a4e64df594..043e4f26a4cf3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -969,6 +969,15 @@ public boolean isForceExecution() { ); } + /** + * Determines if an index name is associated with either an existing data stream or a template + * for one that has the failure store enabled. + * @param indexName The index name to check. + * @param metadata Cluster state metadata. + * @param epochMillis A timestamp to use when resolving date math in the index name. + * @return true if the given index name corresponds to a data stream with a failure store, + * or if it matches a template that has a data stream failure store enabled. + */ static boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) { return DataStream.isFailureStoreEnabled() && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or( @@ -976,6 +985,13 @@ && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or( ).orElse(false); } + /** + * Determines if an index name is associated with an existing data stream that has a failure store enabled. + * @param indexName The index name to check. + * @param metadata Cluster state metadata. + * @param epochMillis A timestamp to use when resolving date math in the index name. + * @return true if the given index name corresponds to an existing data stream with a failure store enabled. + */ private static Optional resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) { if (indexName == null) { return Optional.empty(); @@ -1002,6 +1018,12 @@ private static Optional resolveFailureStoreFromMetadata(String indexNam return Optional.of(targetDataStream != null && targetDataStream.isFailureStore()); } + /** + * Determines if an index name is associated with an index template that has a data stream failure store enabled. + * @param indexName The index name to check. + * @param metadata Cluster state metadata. + * @return true if the given index name corresponds to an index template with a data stream failure store enabled. + */ private static Optional resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { if (indexName == null) { return Optional.empty(); @@ -1023,6 +1045,13 @@ private static Optional resolveFailureStoreFromTemplate(String indexNam return Optional.empty(); } + /** + * Manages mutations to a bulk request that arise from the application of ingest pipelines. The modifier acts as an iterator over the + * documents of a bulk request, keeping a record of all dropped and failed write requests in the overall bulk operation. + * Once all pipelines have been applied, the modifier is used to create a new bulk request that will be used for executing the + * remaining writes. When this final bulk operation is completed, the modifier is used to combine the results with those from the + * ingest service to create the final bulk response. + */ static final class BulkRequestModifier implements Iterator> { final BulkRequest bulkRequest; @@ -1049,6 +1078,13 @@ public boolean hasNext() { return (currentSlot + 1) < bulkRequest.requests().size(); } + /** + * Creates a new bulk request containing all documents from the original bulk request that have not been marked as failed + * or dropped. Any failed or dropped documents are tracked as a side effect of this call so that they may be reflected in the + * final bulk response. + * + * @return A new bulk request without the write operations removed during any ingest pipeline executions. + */ BulkRequest getBulkRequest() { if (itemResponses.isEmpty()) { return bulkRequest; @@ -1071,6 +1107,15 @@ BulkRequest getBulkRequest() { } } + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + * + * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result. + * @param actionListener The action listener that expects the final bulk response. + * @return An action listener that combines ingest failure results with the results from writing the remaining documents. + */ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { return actionListener.map( @@ -1091,6 +1136,10 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, } } + /** + * Mark the document at the given slot in the bulk request as having been dropped by the ingest service. + * @param slot the slot in the bulk request to mark as dropped. + */ synchronized void markItemAsDropped(int slot) { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); failedSlots.set(slot); @@ -1111,6 +1160,11 @@ synchronized void markItemAsDropped(int slot) { ); } + /** + * Mark the document at the given slot in the bulk request as having failed in the ingest service. + * @param slot the slot in the bulk request to mark as failed. + * @param e the failure encountered. + */ synchronized void markItemAsFailed(int slot, Exception e) { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); // We hit a error during preprocessing a request, so we: @@ -1122,6 +1176,13 @@ synchronized void markItemAsFailed(int slot, Exception e) { itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure)); } + /** + * Mark the document at the given slot in the bulk request as having failed in the ingest service. The document will be redirected + * to a data stream's failure store. + * @param slot the slot in the bulk request to redirect. + * @param targetIndexName the index that the document was targetting at the time of failure. + * @param e the failure encountered. + */ public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { if (DataStream.isFailureStoreEnabled() == false) { markItemAsFailed(slot, e); From c86f26f561b1007c36228612d983c54a8e3f3385 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 17 Jan 2024 03:36:22 -0500 Subject: [PATCH 23/36] Suppress the more niche exception --- .../elasticsearch/action/bulk/TransportBulkAction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 043e4f26a4cf3..34e68d92a340e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1196,19 +1196,19 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); bulkRequest.requests.set(slot, errorDocument); - } catch (IOException ex) { + } catch (IOException ioException) { // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it if // we need the info later. - ex.addSuppressed(e); + e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one. logger.debug( () -> "Encountered exception while attempting to redirect a failed ingest operation: index [" + targetIndexName + "], source: [" + indexRequest.source().utf8ToString() + "]", - ex + ioException ); - markItemAsFailed(slot, ex); + markItemAsFailed(slot, e); } } } From 1263146b8cb8ce8baeec2627b06a82bc8ea45eb9 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 17 Jan 2024 13:58:40 -0500 Subject: [PATCH 24/36] Fully null safe index requests for marking items for failure store. --- .../action/bulk/TransportBulkAction.java | 59 +++++++++++++------ 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 34e68d92a340e..2785b0beb497a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1180,7 +1180,7 @@ synchronized void markItemAsFailed(int slot, Exception e) { * Mark the document at the given slot in the bulk request as having failed in the ingest service. The document will be redirected * to a data stream's failure store. * @param slot the slot in the bulk request to redirect. - * @param targetIndexName the index that the document was targetting at the time of failure. + * @param targetIndexName the index that the document was targeting at the time of failure. * @param e the failure encountered. */ public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { @@ -1188,27 +1188,48 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception markItemAsFailed(slot, e); } else { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); - try { - IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName); - // This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to - // the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. - errorDocument.isPipelineResolved(true); - errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); - errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); - bulkRequest.requests.set(slot, errorDocument); - } catch (IOException ioException) { - // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it if - // we need the info later. - e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one. + if (indexRequest == null) { + // This is unlikely to happen ever since only index and update operations are considered for ingest, but if it does + // happen, attempt to trip an assertion. If running in production, be defensive: Mark it as failed as normal, and log + // the info for later debugging if needed. + assert false + : "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. " + + "type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + + targetIndexName + + "]"; + markItemAsFailed(slot, e); logger.debug( - () -> "Encountered exception while attempting to redirect a failed ingest operation: index [" + () -> "Attempted to redirect an invalid write operation after ingest failure - type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + targetIndexName - + "], source: [" - + indexRequest.source().utf8ToString() - + "]", - ioException + + "]" ); - markItemAsFailed(slot, e); + } else { + try { + IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName); + // This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to + // the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. + errorDocument.isPipelineResolved(true); + errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); + errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); + bulkRequest.requests.set(slot, errorDocument); + } catch (IOException ioException) { + // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it + // if we need the info later. + e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one. + logger.debug( + () -> "Encountered exception while attempting to redirect a failed ingest operation: index [" + + targetIndexName + + "], source: [" + + indexRequest.source().utf8ToString() + + "]", + ioException + ); + markItemAsFailed(slot, e); + } } } } From a2ab635a95c7c71d6e866a299cb34eb77c65ae50 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 29 Jan 2024 14:31:34 -0500 Subject: [PATCH 25/36] Harden the checks in IndexRequest --- .../action/index/IndexRequest.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 9f06bdb584272..4506f3d7101c4 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.index; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; @@ -828,15 +829,19 @@ public IndexRequest setRequireDataStream(boolean requireDataStream) { @Override public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { if (DataStream.isFailureStoreEnabled() && writeToFailureStore) { - // TODO: Should this be a harder backstop than an assert statement? - assert ia.isDataStreamRelated() - : "Attempting to write a document to a failure store but the targeted index is not a data stream"; + if (ia.isDataStreamRelated() == false) { + throw new ElasticsearchException( + "Attempting to write a document to a failure store but the targeted index is not a data stream" + ); + } // Resolve write index and get parent data stream to handle the case of dealing with an alias String defaultWriteIndexName = ia.getWriteIndex().getName(); DataStream dataStream = metadata.getIndicesLookup().get(defaultWriteIndexName).getParentDataStream(); - // TODO: Should this be a harder backstop than an assert statement? - assert dataStream.getFailureIndices().size() > 0 - : "Attempting to write a document to a failure store but the target data stream does not have one enabled"; + if (dataStream.getFailureIndices().size() < 1) { + throw new ElasticsearchException( + "Attempting to write a document to a failure store but the target data stream does not have one enabled" + ); + } return dataStream.getFailureIndices().get(dataStream.getFailureIndices().size() - 1); } else { // Resolve as normal From 13f311aa860e64ba5c5ac7f2cdc22490a963478d Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 29 Jan 2024 15:57:01 -0500 Subject: [PATCH 26/36] precommit --- .../java/org/elasticsearch/action/index/IndexRequest.java | 1 - .../main/java/org/elasticsearch/ingest/IngestService.java | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 4506f3d7101c4..3f4eacbda64f7 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -112,7 +112,6 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean requireDataStream; - /** * Transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. */ diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 7971435af87d6..d35fc785e7901 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -670,6 +670,7 @@ void validatePipeline(Map ingestInfos, String pipelin } private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) { + private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { @@ -954,9 +955,7 @@ private void executePipelines( ) ); if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse( - IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, ex) - ); + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, ex)); } else { listener.onFailure(ex); } From b78932cc88812635829dca7b6aa7f172b99404a0 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 30 Jan 2024 12:18:44 -0500 Subject: [PATCH 27/36] Small doc updates related to double count bug. --- .../org/elasticsearch/action/bulk/TransportBulkAction.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index ebd8262f463b8..b378e2110d8e6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1269,11 +1269,12 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception if (DataStream.isFailureStoreEnabled() == false) { markItemAsFailed(slot, e); } else { + // We get the index write request to find the source of the failed document IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); if (indexRequest == null) { - // This is unlikely to happen ever since only index and update operations are considered for ingest, but if it does - // happen, attempt to trip an assertion. If running in production, be defensive: Mark it as failed as normal, and log - // the info for later debugging if needed. + // This is unlikely to happen ever since only source oriented operations (index, create, upsert) are considered for + // ingest, but if it does happen, attempt to trip an assertion. If running in production, be defensive: Mark it failed + // as normal, and log the info for later debugging if needed. assert false : "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. " + "type: [" From 18617efd7a25ca411a3ed8c44d3c560d141fd6a9 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 30 Jan 2024 12:31:25 -0500 Subject: [PATCH 28/36] Fix a mistake from the pr comment updates --- .../src/main/java/org/elasticsearch/ingest/IngestService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index d35fc785e7901..2f703d0caeffd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -672,7 +672,7 @@ void validatePipeline(Map ingestInfos, String pipelin private record IngestPipelinesExecutionResult(boolean success, boolean shouldKeep, Exception exception, String failedIndex) { private static final IngestPipelinesExecutionResult SUCCESSFUL_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); - private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, true, null, null); + private static final IngestPipelinesExecutionResult DISCARD_RESULT = new IngestPipelinesExecutionResult(true, false, null, null); private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) { return new IngestPipelinesExecutionResult(false, true, e, index); } From a680f5c55a59ed70ccc3e13af0c7ecac76fefb86 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 31 Jan 2024 16:10:42 -0500 Subject: [PATCH 29/36] Persist the metadata in the ingest service instead of on ingest document. --- .../src/main/java/org/elasticsearch/ingest/IngestService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2f703d0caeffd..6e7888dbf6b86 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -742,6 +742,7 @@ protected void doRun() { final Releasable ref = refs.acquire(); DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); + final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). final ActionListener documentListener = ActionListener.runAfter( @@ -756,7 +757,7 @@ public void onResponse(IngestPipelinesExecutionResult result) { } else { // We were given a failure result in the onResponse method, so we must store the failure // Recover the original document state, track a failed ingest, and pass it along - updateIndexRequestMetadata(indexRequest, ingestDocument.getOriginalMetadata()); + updateIndexRequestMetadata(indexRequest, originalDocumentMetadata); totalMetrics.ingestFailed(); onStoreFailure.apply(slot, result.failedIndex, result.exception); } From 46ec4255b9d711d565d3d3da7e6b4cdb8d1721b5 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 31 Jan 2024 16:11:47 -0500 Subject: [PATCH 30/36] Revert adding the original metadata to an ingest document --- .../elasticsearch/ingest/IngestDocument.java | 22 ++++--------------- .../ingest/TestIngestDocument.java | 12 ++-------- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index e51dd2d159871..31d947d548ccf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -52,7 +52,6 @@ public final class IngestDocument { public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: "; static final String TIMESTAMP = "timestamp"; - private final IngestDocMetadata originalMetadata; private final IngestCtxMap ctxMap; private final Map ingestMetadata; @@ -82,11 +81,9 @@ public final class IngestDocument { private boolean reroute = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { - ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - this.originalMetadata = new IngestDocMetadata(index, id, version, routing, versionType, now); - this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, now, source); + this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); this.ingestMetadata = new HashMap<>(); - this.ingestMetadata.put(TIMESTAMP, now); + this.ingestMetadata.put(TIMESTAMP, ctxMap.getMetadata().getNow()); this.templateModel = initializeTemplateModel(); // initialize the index history by putting the current index into it @@ -104,7 +101,6 @@ public IngestDocument(String index, String id, long version, String routing, Ver */ public IngestDocument(IngestDocument other) { this( - other.originalMetadata.clone(), new IngestCtxMap(deepCopyMap(ensureNoSelfReferences(other.ctxMap.getSource())), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); @@ -138,9 +134,7 @@ public IngestDocument(Map sourceAndMetadata, Map } } } - ZonedDateTime timestamp = IngestCtxMap.getTimestamp(ingestMetadata); - this.originalMetadata = new IngestDocMetadata(metadata, timestamp); - this.ctxMap = new IngestCtxMap(source, new IngestDocMetadata(metadata, timestamp)); + this.ctxMap = new IngestCtxMap(source, new IngestDocMetadata(metadata, IngestCtxMap.getTimestamp(ingestMetadata))); this.ingestMetadata = new HashMap<>(ingestMetadata); this.templateModel = initializeTemplateModel(); } @@ -148,8 +142,7 @@ public IngestDocument(Map sourceAndMetadata, Map /** * Constructor to create an IngestDocument from its constituent maps. */ - IngestDocument(IngestDocMetadata originalMetadata, IngestCtxMap ctxMap, Map ingestMetadata) { - this.originalMetadata = Objects.requireNonNull(originalMetadata); + IngestDocument(IngestCtxMap ctxMap, Map ingestMetadata) { this.ctxMap = Objects.requireNonNull(ctxMap); this.ingestMetadata = Objects.requireNonNull(ingestMetadata); this.templateModel = initializeTemplateModel(); @@ -737,13 +730,6 @@ public org.elasticsearch.script.Metadata getMetadata() { return ctxMap.getMetadata(); } - /** - * Get the strongly typed metadata, unmodified, as it existed when the ingest document was first created - */ - public org.elasticsearch.script.Metadata getOriginalMetadata() { - return originalMetadata; - } - /** * Get all source values in a Map */ diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java b/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java index 603f6705b665a..3998cf6db1aa5 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java @@ -45,11 +45,7 @@ public static IngestDocument ofIngestWithNullableVersion(Map sou metadata.put(key, source.remove(key)); } } - return new IngestDocument( - TestIngestCtxMetadata.withNullableVersion(metadata), - new IngestCtxMap(source, TestIngestCtxMetadata.withNullableVersion(metadata)), - ingestMetadata - ); + return new IngestDocument(new IngestCtxMap(source, TestIngestCtxMetadata.withNullableVersion(metadata)), ingestMetadata); } /** @@ -68,11 +64,7 @@ public static IngestDocument withDefaultVersion(Map sourceAndMet * can observe changes to the map directly. */ public static IngestDocument ofMetadataWithValidator(Map metadata, Map> properties) { - return new IngestDocument( - new TestIngestCtxMetadata(metadata, properties), - new IngestCtxMap(new HashMap<>(), new TestIngestCtxMetadata(metadata, properties)), - new HashMap<>() - ); + return new IngestDocument(new IngestCtxMap(new HashMap<>(), new TestIngestCtxMetadata(metadata, properties)), new HashMap<>()); } /** From c60fd8a6c168731f79dc304e3258c3a8a6758f90 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 1 Feb 2024 00:46:44 -0500 Subject: [PATCH 31/36] Rework the exception handling logic into a consumer so that it isn't repeated. --- .../elasticsearch/ingest/IngestService.java | 82 ++++++++----------- 1 file changed, 36 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 6e7888dbf6b86..1f82ebd786e98 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -880,6 +880,13 @@ private void executePipelines( // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet ingestDocument.resetReroute(); final String originalIndex = indexRequest.indices()[0]; + final Consumer exceptionHandler = (Exception e) -> { + if (shouldStoreFailure.test(originalIndex)) { + listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); + } else { + listener.onFailure(e); + } + }; try { if (pipeline == null) { @@ -899,11 +906,7 @@ private void executePipelines( ), e ); - if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); - } else { - listener.onFailure(e); - } + exceptionHandler.accept(e); return; // document failed! } @@ -923,20 +926,17 @@ private void executePipelines( } catch (IllegalArgumentException ex) { // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. // In that case, we catch and wrap the exception, so we can include more details - Exception wrappedException = new IllegalArgumentException( - format( - "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - ex + exceptionHandler.accept( + new IllegalArgumentException( + format( + "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + ex + ) ); - if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, wrappedException)); - } else { - listener.onFailure(wrappedException); - } return; // document failed! } @@ -946,20 +946,17 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { // final pipelines cannot change the target index (either directly or by way of a reroute) if (isFinalPipeline) { - Exception ex = new IllegalStateException( - format( - "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", - pipelineId, - originalIndex, - newIndex, - indexRequest.id() + exceptionHandler.accept( + new IllegalStateException( + format( + "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", + pipelineId, + originalIndex, + newIndex, + indexRequest.id() + ) ) ); - if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, ex)); - } else { - listener.onFailure(ex); - } return; // document failed! } @@ -968,19 +965,16 @@ private void executePipelines( if (cycle) { List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); indexCycle.add(newIndex); - Exception ex = new IllegalStateException( - format( - "index cycle detected while processing pipeline [%s] for document [%s]: %s", - pipelineId, - indexRequest.id(), - indexCycle + exceptionHandler.accept( + new IllegalStateException( + format( + "index cycle detected while processing pipeline [%s] for document [%s]: %s", + pipelineId, + indexRequest.id(), + indexCycle + ) ) ); - if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, ex)); - } else { - listener.onFailure(ex); - } return; // document failed! } @@ -1011,11 +1005,7 @@ private void executePipelines( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), e ); - if (shouldStoreFailure.test(originalIndex)) { - listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e)); - } else { - listener.onFailure(e); // document failed! - } + exceptionHandler.accept(e); // document failed } } From 5142491b17f256709bb0378958a80c32134ecd72 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 1 Feb 2024 17:52:39 -0500 Subject: [PATCH 32/36] Add dev time assertion in markItemForFailureStore --- .../org/elasticsearch/action/bulk/TransportBulkAction.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b378e2110d8e6..d11192206fb96 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1267,6 +1267,9 @@ synchronized void markItemAsDropped(int slot) { */ public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { if (DataStream.isFailureStoreEnabled() == false) { + // Assert false for development, but if we somehow find ourselves here, default to failure logic. + assert false : "Attempting to route a failed write request type to a failure store but the failure store is not enabled! " + + "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; markItemAsFailed(slot, e); } else { // We get the index write request to find the source of the failed document From 1950dfc07ab424983f599d72eba5d339e6929096 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 2 Feb 2024 00:19:16 -0500 Subject: [PATCH 33/36] Add unit tests for checking data streams for failure store compatibility. --- .../action/bulk/TransportBulkActionTests.java | 98 +++++++++++++++++++ .../metadata/DataStreamTestHelper.java | 34 ++++++- 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index c3a1747902893..6f3767892e7a4 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -21,7 +21,9 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexAbstraction.ConcreteIndex; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -52,6 +54,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -61,6 +64,7 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; public class TransportBulkActionTests extends ESTestCase { @@ -336,6 +340,100 @@ public void testRejectionAfterCreateIndexIsPropagated() throws Exception { } } + public void testResolveFailureStoreFromMetadata() throws Exception { + assumeThat(DataStream.isFailureStoreEnabled(), is(true)); + + String dataStreamWithFailureStore = "test-data-stream-failure-enabled"; + String dataStreamWithoutFailureStore = "test-data-stream-failure-disabled"; + long testTime = randomMillisUpToYear9999(); + + IndexMetadata backingIndex1 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithFailureStore, testTime).build(); + IndexMetadata backingIndex2 = DataStreamTestHelper.createFirstBackingIndex(dataStreamWithoutFailureStore, testTime).build(); + IndexMetadata failureStoreIndex1 = DataStreamTestHelper.createFirstFailureStore(dataStreamWithFailureStore, testTime).build(); + + Metadata metadata = Metadata.builder() + .dataStreams( + Map.of( + dataStreamWithFailureStore, + DataStreamTestHelper.newInstance( + dataStreamWithFailureStore, + List.of(backingIndex1.getIndex()), + 1L, + Map.of(), + false, + null, + List.of(failureStoreIndex1.getIndex()) + ), + dataStreamWithoutFailureStore, + DataStreamTestHelper.newInstance( + dataStreamWithoutFailureStore, + List.of(backingIndex2.getIndex()), + 1L, + Map.of(), + false, + null, + List.of() + ) + ), + Map.of() + ) + .indices( + Map.of( + backingIndex1.getIndex().getName(), + backingIndex1, + backingIndex2.getIndex().getName(), + backingIndex2, + failureStoreIndex1.getIndex().getName(), + failureStoreIndex1 + ) + ) + .build(); + + // Data stream with failure store should store failures + assertThat(TransportBulkAction.shouldStoreFailure(dataStreamWithFailureStore, metadata, testTime), is(true)); + // Data stream without failure store should not + assertThat(TransportBulkAction.shouldStoreFailure(dataStreamWithoutFailureStore, metadata, testTime), is(false)); + // An index should not be considered for failure storage + assertThat(TransportBulkAction.shouldStoreFailure(backingIndex1.getIndex().getName(), metadata, testTime), is(false)); + // even if that index is itself a failure store + assertThat(TransportBulkAction.shouldStoreFailure(failureStoreIndex1.getIndex().getName(), metadata, testTime), is(false)); + } + + public void testResolveFailureStoreFromTemplate() throws Exception { + assumeThat(DataStream.isFailureStoreEnabled(), is(true)); + + String dsTemplateWithFailureStore = "test-data-stream-failure-enabled"; + String dsTemplateWithoutFailureStore = "test-data-stream-failure-disabled"; + String indexTemplate = "test-index"; + long testTime = randomMillisUpToYear9999(); + + Metadata metadata = Metadata.builder() + .indexTemplates( + Map.of( + dsTemplateWithFailureStore, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dsTemplateWithFailureStore + "-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true)) + .build(), + dsTemplateWithoutFailureStore, + ComposableIndexTemplate.builder() + .indexPatterns(List.of(dsTemplateWithoutFailureStore + "-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, false)) + .build(), + indexTemplate, + ComposableIndexTemplate.builder().indexPatterns(List.of(indexTemplate + "-*")).build() + ) + ) + .build(); + + // Data stream with failure store should store failures + assertThat(TransportBulkAction.shouldStoreFailure(dsTemplateWithFailureStore + "-1", metadata, testTime), is(true)); + // Data stream without failure store should not + assertThat(TransportBulkAction.shouldStoreFailure(dsTemplateWithoutFailureStore + "-1", metadata, testTime), is(false)); + // An index template should not be considered for failure storage + assertThat(TransportBulkAction.shouldStoreFailure(indexTemplate + "-1", metadata, testTime), is(false)); + } + private BulkRequest buildBulkRequest(List indices) { BulkRequest request = new BulkRequest(); for (String index : indices) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index d0b30bff92f3e..3a47e0885f2d2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -124,7 +124,20 @@ public static DataStream newInstance( @Nullable DataStreamLifecycle lifecycle, List failureStores ) { - return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, failureStores); + return new DataStream( + name, + indices, + generation, + metadata, + false, + replicated, + false, + false, + null, + lifecycle, + failureStores.size() > 0, + failureStores + ); } public static String getLegacyDefaultBackingIndexName( @@ -169,6 +182,25 @@ public static IndexMetadata.Builder createBackingIndex(String dataStreamName, in .numberOfReplicas(NUMBER_OF_REPLICAS); } + public static IndexMetadata.Builder createFirstFailureStore(String dataStreamName) { + return createFailureStore(dataStreamName, 1, System.currentTimeMillis()); + } + + public static IndexMetadata.Builder createFirstFailureStore(String dataStreamName, long epochMillis) { + return createFailureStore(dataStreamName, 1, epochMillis); + } + + public static IndexMetadata.Builder createFailureStore(String dataStreamName, int generation) { + return createFailureStore(dataStreamName, generation, System.currentTimeMillis()); + } + + public static IndexMetadata.Builder createFailureStore(String dataStreamName, int generation, long epochMillis) { + return IndexMetadata.builder(DataStream.getDefaultFailureStoreName(dataStreamName, generation, epochMillis)) + .settings(SETTINGS) + .numberOfShards(NUMBER_OF_SHARDS) + .numberOfReplicas(NUMBER_OF_REPLICAS); + } + public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) { return IndexMetadata.builder(index.getName()) .settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID())) From febae2902355cf29fa1c310e7e7c6b6782e21e6b Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 2 Feb 2024 00:24:20 -0500 Subject: [PATCH 34/36] spotless --- .../org/elasticsearch/action/bulk/TransportBulkAction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index d11192206fb96..5763f0a20a033 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1268,8 +1268,9 @@ synchronized void markItemAsDropped(int slot) { public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { if (DataStream.isFailureStoreEnabled() == false) { // Assert false for development, but if we somehow find ourselves here, default to failure logic. - assert false : "Attempting to route a failed write request type to a failure store but the failure store is not enabled! " + - "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; + assert false + : "Attempting to route a failed write request type to a failure store but the failure store is not enabled! " + + "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; markItemAsFailed(slot, e); } else { // We get the index write request to find the source of the failed document From 01d4ea630f4282f943312ed7d6df5ce03658e1e0 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 2 Feb 2024 01:02:18 -0500 Subject: [PATCH 35/36] Re-apply changes to surprise refactoring --- .../action/bulk/BulkRequestModifier.java | 109 +++++++++++++++++- 1 file changed, 106 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index e42ddd41b0b0a..dfeb229b05a8a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -8,16 +8,22 @@ package org.elasticsearch.action.bulk; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SparseFixedBitSet; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Assertions; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.ingest.IngestService; +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -30,8 +36,17 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +/** + * Manages mutations to a bulk request that arise from the application of ingest pipelines. The modifier acts as an iterator over the + * documents of a bulk request, keeping a record of all dropped and failed write requests in the overall bulk operation. + * Once all pipelines have been applied, the modifier is used to create a new bulk request that will be used for executing the + * remaining writes. When this final bulk operation is completed, the modifier is used to combine the results with those from the + * ingest service to create the final bulk response. + */ final class BulkRequestModifier implements Iterator> { + private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class); + private static final String DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated"; final BulkRequest bulkRequest; @@ -58,6 +73,13 @@ public boolean hasNext() { return (currentSlot + 1) < bulkRequest.requests().size(); } + /** + * Creates a new bulk request containing all documents from the original bulk request that have not been marked as failed + * or dropped. Any failed or dropped documents are tracked as a side effect of this call so that they may be reflected in the + * final bulk response. + * + * @return A new bulk request without the write operations removed during any ingest pipeline executions. + */ BulkRequest getBulkRequest() { if (itemResponses.isEmpty()) { return bulkRequest; @@ -80,6 +102,15 @@ BulkRequest getBulkRequest() { } } + /** + * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the + * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest + * service with the results returned from running the remaining write operations. + * + * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result. + * @param actionListener The action listener that expects the final bulk response. + * @return An action listener that combines ingest failure results with the results from writing the remaining documents. + */ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { return actionListener.map( @@ -122,9 +153,9 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte .collect(Collectors.toSet()); assert Sets.haveEmptyIntersection(failedIds, responseIds) : "bulk item response slots cannot have failed and been processed in the subsequent bulk request, failed ids: " - + failedIds - + ", response ids: " - + responseIds; + + failedIds + + ", response ids: " + + responseIds; // check for the correct number of responses final int expectedResponseCount = bulkRequest.requests.size(); @@ -138,6 +169,11 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte } } + /** + * Mark the document at the given slot in the bulk request as having failed in the ingest service. + * @param slot the slot in the bulk request to mark as failed. + * @param e the failure encountered. + */ synchronized void markItemAsFailed(int slot, Exception e) { final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); @@ -150,6 +186,10 @@ synchronized void markItemAsFailed(int slot, Exception e) { itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure)); } + /** + * Mark the document at the given slot in the bulk request as having been dropped by the ingest service. + * @param slot the slot in the bulk request to mark as dropped. + */ synchronized void markItemAsDropped(int slot) { final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); @@ -164,4 +204,67 @@ synchronized void markItemAsDropped(int slot) { ); itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped)); } + + /** + * Mark the document at the given slot in the bulk request as having failed in the ingest service. The document will be redirected + * to a data stream's failure store. + * @param slot the slot in the bulk request to redirect. + * @param targetIndexName the index that the document was targeting at the time of failure. + * @param e the failure encountered. + */ + public void markItemForFailureStore(int slot, String targetIndexName, Exception e) { + if (DataStream.isFailureStoreEnabled() == false) { + // Assert false for development, but if we somehow find ourselves here, default to failure logic. + assert false + : "Attempting to route a failed write request type to a failure store but the failure store is not enabled! " + + "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; + markItemAsFailed(slot, e); + } else { + // We get the index write request to find the source of the failed document + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(bulkRequest.requests().get(slot)); + if (indexRequest == null) { + // This is unlikely to happen ever since only source oriented operations (index, create, upsert) are considered for + // ingest, but if it does happen, attempt to trip an assertion. If running in production, be defensive: Mark it failed + // as normal, and log the info for later debugging if needed. + assert false + : "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. " + + "type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + + targetIndexName + + "]"; + markItemAsFailed(slot, e); + logger.debug( + () -> "Attempted to redirect an invalid write operation after ingest failure - type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + + targetIndexName + + "]" + ); + } else { + try { + IndexRequest errorDocument = FailureStoreDocument.transformFailedRequest(indexRequest, e, targetIndexName); + // This is a fresh index request! We need to do some preprocessing on it. If we do not, when this is returned to + // the bulk action, the action will see that it hasn't been processed by ingest yet and attempt to ingest it again. + errorDocument.isPipelineResolved(true); + errorDocument.setPipeline(IngestService.NOOP_PIPELINE_NAME); + errorDocument.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); + bulkRequest.requests.set(slot, errorDocument); + } catch (IOException ioException) { + // This is unlikely to happen because the conversion is so simple, but be defensive and attempt to report about it + // if we need the info later. + e.addSuppressed(ioException); // Prefer to return the original exception to the end user instead of this new one. + logger.debug( + () -> "Encountered exception while attempting to redirect a failed ingest operation: index [" + + targetIndexName + + "], source: [" + + indexRequest.source().utf8ToString() + + "]", + ioException + ); + markItemAsFailed(slot, e); + } + } + } + } } From c83fd50a6f221dfa4fe37bc0162cb3829e884cae Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 2 Feb 2024 01:03:21 -0500 Subject: [PATCH 36/36] spotless please --- .../action/bulk/BulkRequestModifier.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index dfeb229b05a8a..5e630bf9cdef5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -153,9 +153,9 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte .collect(Collectors.toSet()); assert Sets.haveEmptyIntersection(failedIds, responseIds) : "bulk item response slots cannot have failed and been processed in the subsequent bulk request, failed ids: " - + failedIds - + ", response ids: " - + responseIds; + + failedIds + + ", response ids: " + + responseIds; // check for the correct number of responses final int expectedResponseCount = bulkRequest.requests.size(); @@ -217,7 +217,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception // Assert false for development, but if we somehow find ourselves here, default to failure logic. assert false : "Attempting to route a failed write request type to a failure store but the failure store is not enabled! " - + "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; + + "This should be guarded against in TransportBulkAction#shouldStoreFailure()"; markItemAsFailed(slot, e); } else { // We get the index write request to find the source of the failed document @@ -228,11 +228,11 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception // as normal, and log the info for later debugging if needed. assert false : "Attempting to mark invalid write request type for failure store. Only IndexRequest or UpdateRequest allowed. " - + "type: [" - + bulkRequest.requests().get(slot).getClass().getName() - + "], index: [" - + targetIndexName - + "]"; + + "type: [" + + bulkRequest.requests().get(slot).getClass().getName() + + "], index: [" + + targetIndexName + + "]"; markItemAsFailed(slot, e); logger.debug( () -> "Attempted to redirect an invalid write operation after ingest failure - type: ["